1 package org.oxerr.huobi.xchange.service.streaming;
2
3 import static com.xeiam.xchange.service.streaming.ExchangeEventType.CONNECT;
4 import static com.xeiam.xchange.service.streaming.ExchangeEventType.DISCONNECT;
5 import static com.xeiam.xchange.service.streaming.ExchangeEventType.ERROR;
6 import static org.java_websocket.WebSocket.READYSTATE.CLOSED;
7 import static org.java_websocket.WebSocket.READYSTATE.NOT_YET_CONNECTED;
8 import static org.java_websocket.WebSocket.READYSTATE.OPEN;
9 import static org.oxerr.huobi.xchange.HuobiExchange.WEBSOCKET_URI_KEY;
10 import static org.oxerr.huobi.xchange.service.streaming.HuobiSocketIOAdapters.adaptSymbol;
11 import io.socket.SocketIOException;
12
13 import java.net.MalformedURLException;
14 import java.net.URL;
15 import java.util.HashSet;
16 import java.util.Set;
17 import java.util.concurrent.BlockingQueue;
18 import java.util.concurrent.LinkedBlockingQueue;
19 import java.util.concurrent.atomic.AtomicInteger;
20
21 import org.java_websocket.WebSocket.READYSTATE;
22 import org.oxerr.huobi.websocket.HuobiSocketClient;
23 import org.oxerr.huobi.websocket.dto.Percent;
24 import org.oxerr.huobi.websocket.dto.request.Request;
25 import org.oxerr.huobi.websocket.dto.request.marketdata.Message;
26 import org.oxerr.huobi.websocket.dto.request.marketdata.PushType;
27 import org.oxerr.huobi.websocket.event.HuobiSocketAdapter;
28 import org.slf4j.Logger;
29 import org.slf4j.LoggerFactory;
30
31 import com.google.gson.Gson;
32 import com.xeiam.xchange.ExchangeSpecification;
33 import com.xeiam.xchange.currency.CurrencyPair;
34 import com.xeiam.xchange.service.streaming.DefaultExchangeEvent;
35 import com.xeiam.xchange.service.streaming.ExchangeEvent;
36 import com.xeiam.xchange.service.streaming.ExchangeEventType;
37 import com.xeiam.xchange.service.streaming.StreamingExchangeService;
38
39
40
41
42 public class HuobiSocketIOService implements StreamingExchangeService {
43
44 private final Logger log = LoggerFactory.getLogger(HuobiSocketIOService.class);
45 private final HuobiSocketClient client;
46 private final Message message;
47 private final Gson gson = new Gson();
48 private final BlockingQueue<ExchangeEvent> consumerEventQueue = new LinkedBlockingQueue<ExchangeEvent>();
49 private final HuobiExchangeEventListener listener;
50 private volatile READYSTATE webSocketStatus = NOT_YET_CONNECTED;
51
52 public HuobiSocketIOService(
53 final ExchangeSpecification exchangeSpecification,
54 final HuobiStreamingConfiguration configuration) {
55 String url = (String) exchangeSpecification.getExchangeSpecificParametersItem(WEBSOCKET_URI_KEY);
56 try {
57 client = new HuobiSocketClient(new URL(url));
58 } catch (MalformedURLException e) {
59 throw new IllegalArgumentException(e);
60 }
61
62 message = new Message();
63
64 final Set<CurrencyPair> currencyPairs = configuration.getCurrencyPairs();
65 final Set<String> symbols = new HashSet<>(currencyPairs.size());
66
67 for (CurrencyPair currencyPair : currencyPairs) {
68 String symbol = adaptSymbol(currencyPair);
69 symbols.add(symbol);
70
71
72 message.addMarketOverview(symbol, PushType.PUSH_LONG);
73
74
75 message.addMarketDepthDiff(symbol, PushType.PUSH_LONG, Percent.PERCENT10);
76
77
78 message.addTradeDetail(symbol, PushType.PUSH_LONG);
79 }
80
81 client.addListener(new HuobiSocketAdapter() {
82
83 private final AtomicInteger reconnectAttempts = new AtomicInteger();
84
85 @Override
86 public void onConnect() {
87 reconnectAttempts.set(0);
88
89 webSocketStatus = OPEN;
90 putEvent(CONNECT);
91
92 for (String symbol : symbols) {
93
94 client.reqMarketDepth(symbol, Percent.PERCENT10);
95
96
97 client.reqTradeDetailTop(symbol, 10);
98 }
99
100 client.reqMsgSubscribe(message);
101 }
102
103 @Override
104 public void onDisconnect() {
105 webSocketStatus = CLOSED;
106 putEvent(DISCONNECT);
107 }
108
109
110 @Override
111 public void onError(SocketIOException socketIOException) {
112 putEvent(new DefaultExchangeEvent(ERROR,
113 socketIOException.getMessage(), socketIOException));
114
115 final int attempts = reconnectAttempts.incrementAndGet();
116
117 if (configuration.getMaxReconnectAttempts() <= 0
118 || attempts <= configuration.getMaxReconnectAttempts()) {
119 sleepQuietly(configuration.getReconnectWaitTimeInMs());
120 log.trace("Reconnecting({}/{})...",
121 attempts,
122 configuration.getMaxReconnectAttempts());
123 client.connect();
124 } else {
125 log.warn("Reconnect attempts reached the max attempts {}, giving up.",
126 configuration.getMaxReconnectAttempts());
127 }
128 }
129
130 private void sleepQuietly(long millis) {
131 log.trace("Sleeping {} milliseconds...", millis);
132
133 try {
134 Thread.sleep(millis);
135 } catch (InterruptedException e) {
136 }
137 }
138
139 });
140
141 listener = new HuobiExchangeEventListener(client, consumerEventQueue);
142 client.addListener(listener);
143 }
144
145
146
147
148 @Override
149 public void connect() {
150 client.connect();
151 }
152
153
154
155
156 @Override
157 public void disconnect() {
158 client.reqMsgUnsubscribe(message);
159 client.disconnect();
160 }
161
162
163
164
165 @Override
166 public ExchangeEvent getNextEvent() throws InterruptedException {
167 return consumerEventQueue.take();
168 }
169
170
171
172
173 @Override
174 public void send(String msg) {
175 client.send(gson.fromJson(msg, Request.class));
176 }
177
178
179
180
181 @Override
182 public READYSTATE getWebSocketStatus() {
183 return webSocketStatus;
184 }
185
186 private void putEvent(ExchangeEvent event) {
187 try {
188 consumerEventQueue.put(event);
189 } catch (InterruptedException e) {
190 throw new RuntimeException(e);
191 }
192 }
193
194 private void putEvent(ExchangeEventType exchangeEventType) {
195 putEvent(new DefaultExchangeEvent(exchangeEventType, null));
196 }
197
198 }