View Javadoc
1   package org.oxerr.huobi.xchange.service.streaming;
2   
3   import static com.xeiam.xchange.service.streaming.ExchangeEventType.CONNECT;
4   import static com.xeiam.xchange.service.streaming.ExchangeEventType.DISCONNECT;
5   import static com.xeiam.xchange.service.streaming.ExchangeEventType.ERROR;
6   import static org.java_websocket.WebSocket.READYSTATE.CLOSED;
7   import static org.java_websocket.WebSocket.READYSTATE.NOT_YET_CONNECTED;
8   import static org.java_websocket.WebSocket.READYSTATE.OPEN;
9   import static org.oxerr.huobi.xchange.HuobiExchange.WEBSOCKET_URI_KEY;
10  import static org.oxerr.huobi.xchange.service.streaming.HuobiSocketIOAdapters.adaptSymbol;
11  import io.socket.SocketIOException;
12  
13  import java.net.MalformedURLException;
14  import java.net.URL;
15  import java.util.HashSet;
16  import java.util.Set;
17  import java.util.concurrent.BlockingQueue;
18  import java.util.concurrent.LinkedBlockingQueue;
19  import java.util.concurrent.atomic.AtomicInteger;
20  
21  import org.java_websocket.WebSocket.READYSTATE;
22  import org.oxerr.huobi.websocket.HuobiSocketClient;
23  import org.oxerr.huobi.websocket.dto.Percent;
24  import org.oxerr.huobi.websocket.dto.request.Request;
25  import org.oxerr.huobi.websocket.dto.request.marketdata.Message;
26  import org.oxerr.huobi.websocket.dto.request.marketdata.PushType;
27  import org.oxerr.huobi.websocket.event.HuobiSocketAdapter;
28  import org.slf4j.Logger;
29  import org.slf4j.LoggerFactory;
30  
31  import com.google.gson.Gson;
32  import com.xeiam.xchange.ExchangeSpecification;
33  import com.xeiam.xchange.currency.CurrencyPair;
34  import com.xeiam.xchange.service.streaming.DefaultExchangeEvent;
35  import com.xeiam.xchange.service.streaming.ExchangeEvent;
36  import com.xeiam.xchange.service.streaming.ExchangeEventType;
37  import com.xeiam.xchange.service.streaming.StreamingExchangeService;
38  
39  /**
40   * Huobi streaming service implementation over WebSocket Market API.
41   */
42  public class HuobiSocketIOService implements StreamingExchangeService {
43  
44  	private final Logger log = LoggerFactory.getLogger(HuobiSocketIOService.class);
45  	private final HuobiSocketClient client;
46  	private final Message message;
47  	private final Gson gson = new Gson();
48  	private final BlockingQueue<ExchangeEvent> consumerEventQueue = new LinkedBlockingQueue<ExchangeEvent>();
49  	private final HuobiExchangeEventListener listener;
50  	private volatile READYSTATE webSocketStatus = NOT_YET_CONNECTED;
51  
52  	public HuobiSocketIOService(
53  			final ExchangeSpecification exchangeSpecification,
54  			final HuobiStreamingConfiguration configuration) {
55  		String url = (String) exchangeSpecification.getExchangeSpecificParametersItem(WEBSOCKET_URI_KEY);
56  		try {
57  			client = new HuobiSocketClient(new URL(url));
58  		} catch (MalformedURLException e) {
59  			throw new IllegalArgumentException(e);
60  		}
61  
62  		message = new Message();
63  
64  		final Set<CurrencyPair> currencyPairs = configuration.getCurrencyPairs();
65  		final Set<String> symbols = new HashSet<>(currencyPairs.size());
66  
67  		for (CurrencyPair currencyPair : currencyPairs) {
68  			String symbol = adaptSymbol(currencyPair);
69  			symbols.add(symbol);
70  
71  			// Ticker
72  			message.addMarketOverview(symbol, PushType.PUSH_LONG);
73  
74  			// Depth
75  			message.addMarketDepthDiff(symbol, PushType.PUSH_LONG, Percent.PERCENT10);
76  
77  			// Trade
78  			message.addTradeDetail(symbol, PushType.PUSH_LONG);
79  		}
80  
81  		client.addListener(new HuobiSocketAdapter() {
82  
83  			private final AtomicInteger reconnectAttempts = new AtomicInteger();
84  
85  			@Override
86  			public void onConnect() {
87  				reconnectAttempts.set(0);
88  
89  				webSocketStatus = OPEN;
90  				putEvent(CONNECT);
91  
92  				for (String symbol : symbols) {
93  					// Depth
94  					client.reqMarketDepth(symbol, Percent.PERCENT10);
95  
96  					// Trade
97  					client.reqTradeDetailTop(symbol, 10);
98  				}
99  
100 				client.reqMsgSubscribe(message);
101 			}
102 
103 			@Override
104 			public void onDisconnect() {
105 				webSocketStatus = CLOSED;
106 				putEvent(DISCONNECT);
107 			}
108 
109 
110 			@Override
111 			public void onError(SocketIOException socketIOException) {
112 				putEvent(new DefaultExchangeEvent(ERROR,
113 						socketIOException.getMessage(), socketIOException));
114 
115 				final int attempts = reconnectAttempts.incrementAndGet();
116 
117 				if (configuration.getMaxReconnectAttempts() <= 0
118 						|| attempts <= configuration.getMaxReconnectAttempts()) {
119 					sleepQuietly(configuration.getReconnectWaitTimeInMs());
120 					log.trace("Reconnecting({}/{})...",
121 							attempts,
122 							configuration.getMaxReconnectAttempts());
123 					client.connect();
124 				} else {
125 					log.warn("Reconnect attempts reached the max attempts {}, giving up.",
126 							configuration.getMaxReconnectAttempts());
127 				}
128 			}
129 
130 			private void sleepQuietly(long millis) {
131 				log.trace("Sleeping {} milliseconds...", millis);
132 
133 				try {
134 					Thread.sleep(millis);
135 				} catch (InterruptedException e) {
136 				}
137 			}
138 
139 		});
140 
141 		listener = new HuobiExchangeEventListener(client, consumerEventQueue);
142 		client.addListener(listener);
143 	}
144 
145 	/**
146 	 * {@inheritDoc}
147 	 */
148 	@Override
149 	public void connect() {
150 		client.connect();
151 	}
152 
153 	/**
154 	 * {@inheritDoc}
155 	 */
156 	@Override
157 	public void disconnect() {
158 		client.reqMsgUnsubscribe(message);
159 		client.disconnect();
160 	}
161 
162 	/**
163 	 * {@inheritDoc}
164 	 */
165 	@Override
166 	public ExchangeEvent getNextEvent() throws InterruptedException {
167 		return consumerEventQueue.take();
168 	}
169 
170 	/**
171 	 * {@inheritDoc}
172 	 */
173 	@Override
174 	public void send(String msg) {
175 		client.send(gson.fromJson(msg, Request.class));
176 	}
177 
178 	/**
179 	 * {@inheritDoc}
180 	 */
181 	@Override
182 	public READYSTATE getWebSocketStatus() {
183 		return webSocketStatus;
184 	}
185 
186 	private void putEvent(ExchangeEvent event) {
187 		try {
188 			consumerEventQueue.put(event);
189 		} catch (InterruptedException e) {
190 			throw new RuntimeException(e);
191 		}
192 	}
193 
194 	private void putEvent(ExchangeEventType exchangeEventType) {
195 		putEvent(new DefaultExchangeEvent(exchangeEventType, null));
196 	}
197 
198 }