1 package org.oxerr.huobi.xchange.service.streaming; 2 3 import static com.xeiam.xchange.service.streaming.ExchangeEventType.DEPTH; 4 import static com.xeiam.xchange.service.streaming.ExchangeEventType.TICKER; 5 import static org.oxerr.huobi.xchange.service.streaming.HuobiSocketIOAdapters.adaptTicker; 6 7 import java.util.concurrent.BlockingQueue; 8 9 import org.oxerr.huobi.websocket.HuobiSocketClient; 10 import org.oxerr.huobi.websocket.dto.Depth; 11 import org.oxerr.huobi.websocket.dto.Percent; 12 import org.oxerr.huobi.websocket.dto.response.Response; 13 import org.oxerr.huobi.websocket.dto.response.historydata.ReqMarketDepthResponse; 14 import org.oxerr.huobi.websocket.dto.response.historydata.ReqMarketDepthTopResponse; 15 import org.oxerr.huobi.websocket.dto.response.historydata.ReqTradeDetailTopResponse; 16 import org.oxerr.huobi.websocket.dto.response.marketdata.MarketDepthDiff; 17 import org.oxerr.huobi.websocket.dto.response.marketdata.MarketDepthTopDiff; 18 import org.oxerr.huobi.websocket.dto.response.marketdata.MarketOverview; 19 import org.oxerr.huobi.websocket.dto.response.marketdata.TradeDetail; 20 import org.oxerr.huobi.websocket.dto.response.payload.Payload; 21 import org.oxerr.huobi.websocket.event.ResponseAdapter; 22 import org.slf4j.Logger; 23 import org.slf4j.LoggerFactory; 24 25 import com.xeiam.xchange.dto.marketdata.OrderBook; 26 import com.xeiam.xchange.dto.marketdata.Trade; 27 import com.xeiam.xchange.service.streaming.ExchangeEvent; 28 import com.xeiam.xchange.service.streaming.ExchangeEventType; 29 30 public class HuobiExchangeEventListener extends ResponseAdapter { 31 32 private final Logger log = LoggerFactory 33 .getLogger(HuobiExchangeEventListener.class); 34 private final HuobiSocketClient client; 35 private final BlockingQueue<ExchangeEvent> consumerEventQueue; 36 37 private volatile Depth depth; 38 private volatile Depth depthTop; 39 private volatile OrderBook orderBook; 40 41 public HuobiExchangeEventListener(HuobiSocketClient client, 42 BlockingQueue<ExchangeEvent> consumerEventQueue) { 43 this.client = client; 44 this.consumerEventQueue = consumerEventQueue; 45 } 46 47 @Override 48 public void onResponse(Response<? extends Payload> response) { 49 if (response instanceof MarketOverview) { 50 MarketOverview marketOverview = (MarketOverview) response; 51 ExchangeEvent event = new HuobiExchangeEvent(TICKER, response, 52 adaptTicker(marketOverview.getPayload())); 53 putEvent(event); 54 } else if (response instanceof ReqMarketDepthResponse) { 55 ReqMarketDepthResponse reqMarketDepthResponse = (ReqMarketDepthResponse) response; 56 depth = reqMarketDepthResponse.getPayload(); 57 putDepthEvent(reqMarketDepthResponse); 58 } else if (response instanceof ReqMarketDepthTopResponse) { 59 ReqMarketDepthTopResponse reqMarketDepthTopResponse = (ReqMarketDepthTopResponse) response; 60 depthTop = reqMarketDepthTopResponse.getPayload(); 61 putDepthEvent(reqMarketDepthTopResponse); 62 } else if (response instanceof MarketDepthDiff) { 63 MarketDepthDiff marketDepthDiff = (MarketDepthDiff) response; 64 if (depth != null) { 65 try { 66 depth.merge(marketDepthDiff.getPayload()); 67 putDepthEvent(marketDepthDiff); 68 } catch (IllegalArgumentException e) { 69 log.debug("{}", e.getMessage()); 70 client.reqMarketDepth(depth.getSymbolId(), Percent.PERCENT10); 71 } 72 } 73 } else if (response instanceof MarketDepthTopDiff) { 74 MarketDepthTopDiff marketDepthTopDiff = (MarketDepthTopDiff) response; 75 if (depthTop != null) { 76 try { 77 depthTop.merge(marketDepthTopDiff.getPayload()); 78 putDepthEvent(marketDepthTopDiff); 79 } catch (IllegalArgumentException e) { 80 log.debug("{}", e.getMessage()); 81 client.reqMarketDepthTop(depthTop.getSymbolId()); 82 } 83 } 84 } else if (response instanceof ReqTradeDetailTopResponse) { 85 ReqTradeDetailTopResponse reqTradeDetailTopResponse = (ReqTradeDetailTopResponse) response; 86 putTradeEvent(reqTradeDetailTopResponse, reqTradeDetailTopResponse.getPayload()); 87 } else if (response instanceof TradeDetail) { 88 TradeDetail tradeDetail = (TradeDetail) response; 89 putTradeEvent(response, tradeDetail.getPayload()); 90 } else { 91 log.debug("Unadaptable response: {}", response); 92 } 93 } 94 95 private void putDepthEvent(Response<? extends Payload> response) { 96 orderBook = HuobiSocketIOAdapters.adaptOrderBook(depth); 97 ExchangeEvent event = new HuobiExchangeEvent(DEPTH, response, orderBook); 98 putEvent(event); 99 } 100 101 private void putTradeEvent(Response<? extends Payload> response, 102 org.oxerr.huobi.websocket.dto.TradeDetail tradeDetail) { 103 Trade[] trades = HuobiSocketIOAdapters.adaptTrades(tradeDetail); 104 for (Trade trade : trades) { 105 putEvent(new HuobiExchangeEvent(ExchangeEventType.TRADE, response, trade)); 106 } 107 } 108 109 private void putEvent(ExchangeEvent event) { 110 try { 111 consumerEventQueue.put(event); 112 } catch (InterruptedException e) { 113 throw new RuntimeException(e); 114 } 115 } 116 117 }