1 package org.oxerr.okcoin.websocket;
2
3 import java.io.IOException;
4 import java.io.Reader;
5 import java.util.HashMap;
6 import java.util.Map;
7 import java.util.stream.Collectors;
8
9 import javax.json.Json;
10 import javax.json.JsonArray;
11 import javax.json.JsonObject;
12 import javax.json.JsonReader;
13 import javax.json.JsonValue;
14 import javax.websocket.DecodeException;
15 import javax.websocket.Decoder;
16 import javax.websocket.EndpointConfig;
17
18 import org.oxerr.okcoin.websocket.dto.CandlestickChart;
19 import org.oxerr.okcoin.websocket.dto.Depth;
20 import org.oxerr.okcoin.websocket.dto.Info;
21 import org.oxerr.okcoin.websocket.dto.OrderResult;
22 import org.oxerr.okcoin.websocket.dto.Ticker;
23 import org.oxerr.okcoin.websocket.dto.Trade;
24 import org.oxerr.okcoin.websocket.dto.TradeResult;
25 import org.oxerr.okcoin.websocket.dto.TradesV1;
26 import org.slf4j.Logger;
27 import org.slf4j.LoggerFactory;
28
29
30
31
32 public class OKCoinDecoder implements Decoder.TextStream<OKCoinData[]> {
33
34 private static final OKCoinData[] EMPTY_DATA = new OKCoinData[0];
35
36 private final Logger log = LoggerFactory.getLogger(OKCoinDecoder.class);
37
38 private final Map<String, Class<?>> types;
39
40 public OKCoinDecoder() {
41 types = new HashMap<>();
42 for (String symbol : new String[] {"btccny", "ltccny"}) {
43 types.put(String.format("ok_%s_ticker", symbol), Ticker.class);
44 types.put(String.format("ok_%s_depth", symbol), Depth.class);
45 types.put(String.format("ok_%s_depth60", symbol), Depth.class);
46 types.put(String.format("ok_%s_trades_v1", symbol), TradesV1.class);
47 for (String x : new String[] {
48 "1min", "3min", "5min", "15min", "30min",
49 "1hour", "2hour", "4hour", "6hour", "12hour",
50 "day", "3day", "week", }) {
51 types.put(String.format("ok_%s_kline_%s", symbol, x),
52 CandlestickChart.class);
53 }
54 }
55
56 types.put("ok_cny_realtrades", Trade.class);
57 types.put("ok_spotcny_trade", TradeResult.class);
58 types.put("ok_spotcny_cancel_order", TradeResult.class);
59 types.put("ok_spotcny_userinfo", Info.class);
60 types.put("ok_spotcny_order_info", OrderResult.class);
61 }
62
63
64
65
66 @Override
67 public void init(EndpointConfig config) {
68 }
69
70
71
72
73 @Override
74 public void destroy() {
75 }
76
77
78
79
80 @Override
81 public OKCoinData[] decode(Reader reader) throws DecodeException, IOException {
82 JsonReader jsonReader = Json.createReader(reader);
83 JsonArray jsonArray = jsonReader.readArray();
84 return jsonArray.stream().map(v -> {
85
86 if (log.isTraceEnabled()) {
87 log.trace("Decoding: {}", v);
88 }
89
90 JsonObject o = (JsonObject) v;
91 String channel = o.getString("channel");
92 Object data = decodeData(channel, o.get("data"));
93 return new OKCoinData(channel, data);
94 }).collect(Collectors.toList()).toArray(EMPTY_DATA);
95 }
96
97 private Object decodeData(String channel, JsonValue data) {
98 if (data == null) {
99 return null;
100 }
101
102 Object ret = null;
103 Class<?> type = types.get(channel);
104
105 if (type == null) {
106 throw new IllegalArgumentException("Unknown channel: " + channel);
107 }
108
109 try {
110 ret = type.getConstructor(JsonValue.class).newInstance(data);
111 } catch (Exception e) {
112 log.warn(String.format("Decode failed. Channel: %s, data: %s.", channel, data), e);
113 }
114 return ret;
115 }
116
117 }