View Javadoc
1   package org.oxerr.huobi.xchange.service.streaming;
2   
3   import static com.xeiam.xchange.service.streaming.ExchangeEventType.DEPTH;
4   import static com.xeiam.xchange.service.streaming.ExchangeEventType.TICKER;
5   import static org.oxerr.huobi.xchange.service.streaming.HuobiSocketIOAdapters.adaptTicker;
6   
7   import java.util.concurrent.BlockingQueue;
8   
9   import org.oxerr.huobi.websocket.HuobiSocketClient;
10  import org.oxerr.huobi.websocket.dto.Depth;
11  import org.oxerr.huobi.websocket.dto.Percent;
12  import org.oxerr.huobi.websocket.dto.response.Response;
13  import org.oxerr.huobi.websocket.dto.response.historydata.ReqMarketDepthResponse;
14  import org.oxerr.huobi.websocket.dto.response.historydata.ReqMarketDepthTopResponse;
15  import org.oxerr.huobi.websocket.dto.response.historydata.ReqTradeDetailTopResponse;
16  import org.oxerr.huobi.websocket.dto.response.marketdata.MarketDepthDiff;
17  import org.oxerr.huobi.websocket.dto.response.marketdata.MarketDepthTopDiff;
18  import org.oxerr.huobi.websocket.dto.response.marketdata.MarketOverview;
19  import org.oxerr.huobi.websocket.dto.response.marketdata.TradeDetail;
20  import org.oxerr.huobi.websocket.dto.response.payload.Payload;
21  import org.oxerr.huobi.websocket.event.ResponseAdapter;
22  import org.slf4j.Logger;
23  import org.slf4j.LoggerFactory;
24  
25  import com.xeiam.xchange.dto.marketdata.OrderBook;
26  import com.xeiam.xchange.dto.marketdata.Trade;
27  import com.xeiam.xchange.service.streaming.ExchangeEvent;
28  import com.xeiam.xchange.service.streaming.ExchangeEventType;
29  
30  public class HuobiExchangeEventListener extends ResponseAdapter {
31  
32  	private final Logger log = LoggerFactory
33  			.getLogger(HuobiExchangeEventListener.class);
34  	private final HuobiSocketClient client;
35  	private final BlockingQueue<ExchangeEvent> consumerEventQueue;
36  
37  	private volatile Depth depth;
38  	private volatile Depth depthTop;
39  	private volatile OrderBook orderBook;
40  
41  	public HuobiExchangeEventListener(HuobiSocketClient client,
42  			BlockingQueue<ExchangeEvent> consumerEventQueue) {
43  		this.client = client;
44  		this.consumerEventQueue = consumerEventQueue;
45  	}
46  
47  	@Override
48  	public void onResponse(Response<? extends Payload> response) {
49  		if (response instanceof MarketOverview) {
50  			MarketOverview marketOverview = (MarketOverview) response;
51  			ExchangeEvent event = new HuobiExchangeEvent(TICKER, response,
52  					adaptTicker(marketOverview.getPayload()));
53  			putEvent(event);
54  		} else if (response instanceof ReqMarketDepthResponse) {
55  			ReqMarketDepthResponse reqMarketDepthResponse = (ReqMarketDepthResponse) response;
56  			depth = reqMarketDepthResponse.getPayload();
57  			putDepthEvent(reqMarketDepthResponse);
58  		} else if (response instanceof ReqMarketDepthTopResponse) {
59  			ReqMarketDepthTopResponse reqMarketDepthTopResponse = (ReqMarketDepthTopResponse) response;
60  			depthTop = reqMarketDepthTopResponse.getPayload();
61  			putDepthEvent(reqMarketDepthTopResponse);
62  		} else if (response instanceof MarketDepthDiff) {
63  			MarketDepthDiff marketDepthDiff = (MarketDepthDiff) response;
64  			if (depth != null) {
65  				try {
66  					depth.merge(marketDepthDiff.getPayload());
67  					putDepthEvent(marketDepthDiff);
68  				} catch (IllegalArgumentException e) {
69  					log.debug("{}", e.getMessage());
70  					client.reqMarketDepth(depth.getSymbolId(), Percent.PERCENT10);
71  				}
72  			}
73  		} else if (response instanceof MarketDepthTopDiff) {
74  			MarketDepthTopDiff marketDepthTopDiff = (MarketDepthTopDiff) response;
75  			if (depthTop != null) {
76  				try {
77  					depthTop.merge(marketDepthTopDiff.getPayload());
78  					putDepthEvent(marketDepthTopDiff);
79  				} catch (IllegalArgumentException e) {
80  					log.debug("{}", e.getMessage());
81  					client.reqMarketDepthTop(depthTop.getSymbolId());
82  				}
83  			}
84  		} else if (response instanceof ReqTradeDetailTopResponse) {
85  			ReqTradeDetailTopResponse reqTradeDetailTopResponse = (ReqTradeDetailTopResponse) response;
86  			putTradeEvent(reqTradeDetailTopResponse, reqTradeDetailTopResponse.getPayload());
87  		} else if (response instanceof TradeDetail) {
88  			TradeDetail tradeDetail = (TradeDetail) response;
89  			putTradeEvent(response, tradeDetail.getPayload());
90  		} else {
91  			log.debug("Unadaptable response: {}", response);
92  		}
93  	}
94  
95  	private void putDepthEvent(Response<? extends Payload> response) {
96  		orderBook = HuobiSocketIOAdapters.adaptOrderBook(depth);
97  		ExchangeEvent event = new HuobiExchangeEvent(DEPTH, response, orderBook);
98  		putEvent(event);
99  	}
100 
101 	private void putTradeEvent(Response<? extends Payload> response,
102 			org.oxerr.huobi.websocket.dto.TradeDetail tradeDetail) {
103 		Trade[] trades = HuobiSocketIOAdapters.adaptTrades(tradeDetail);
104 		for (Trade trade : trades) {
105 			putEvent(new HuobiExchangeEvent(ExchangeEventType.TRADE, response, trade));
106 		}
107 	}
108 
109 	private void putEvent(ExchangeEvent event) {
110 		try {
111 			consumerEventQueue.put(event);
112 		} catch (InterruptedException e) {
113 			throw new RuntimeException(e);
114 		}
115 	}
116 
117 }