1 package org.oxerr.okcoin.websocket;
2
3 import java.io.IOException;
4 import java.util.Arrays;
5 import java.util.Collections;
6 import java.util.HashMap;
7 import java.util.HashSet;
8 import java.util.Map;
9 import java.util.Set;
10
11 import javax.websocket.ClientEndpoint;
12 import javax.websocket.CloseReason;
13 import javax.websocket.EncodeException;
14 import javax.websocket.EndpointConfig;
15 import javax.websocket.OnClose;
16 import javax.websocket.OnError;
17 import javax.websocket.OnMessage;
18 import javax.websocket.OnOpen;
19 import javax.websocket.Session;
20
21 import org.oxerr.okcoin.websocket.dto.Event;
22 import org.oxerr.okcoin.websocket.event.OKCoinDataListener;
23 import org.slf4j.Logger;
24 import org.slf4j.LoggerFactory;
25
26
27
28
29 @ClientEndpoint(encoders = OKCoinEncoder.class, decoders = OKCoinDecoder.class)
30 public final class OKCoinClientEndpoint {
31
32 private final Logger log = LoggerFactory
33 .getLogger(OKCoinClientEndpoint.class);
34
35 private final Map<String, Set<OKCoinDataListener>> listeners = new HashMap<>();
36
37 @OnOpen
38 public void onOpen(Session session, EndpointConfig config) {
39 log.trace("open: {}, config: {}", session, config);
40 }
41
42 @OnMessage
43 public void onMessage(Session session, OKCoinData[] data)
44 throws IOException, EncodeException {
45
46 if (log.isTraceEnabled()) {
47 log.trace("data: {}", Arrays.toString(data));
48 }
49
50 Arrays.stream(data).forEach(e -> {
51 Set<OKCoinDataListener> listeners = this.listeners.get(e.getChannel());
52 (listeners == null ? Collections.<OKCoinDataListener>emptySet() : listeners).forEach(listener -> {
53 listener.onMessage(session, e.getData());
54 });
55 });
56 }
57
58 @OnClose
59 public void onClose(Session session, CloseReason reason) {
60 log.trace("close: {}, reason: {}", session, reason);
61 }
62
63 @OnError
64 public void onError(Session session, Throwable throwable) {
65 log.trace("error: {}", session, throwable);
66 }
67
68 public synchronized void addChannelListener(String channel, OKCoinDataListener listener) {
69 Set<OKCoinDataListener> channelListeners = this.listeners.get(channel);
70 if (channelListeners == null) {
71 channelListeners = new HashSet<>();
72 this.listeners.put(channel, channelListeners);
73 }
74 channelListeners.add(listener);
75 }
76
77 public void addChannel(Session session, String channel) {
78 Event event = new Event("addChannel", channel);
79 session.getAsyncRemote().sendObject(event);
80 }
81
82 public void addChannel(Session session, String channel,
83 Map<String, String> parameters) {
84 Event event = new Event("addChannel", channel, parameters);
85 session.getAsyncRemote().sendObject(event);
86 }
87
88 public void removeChannel(Session session, String channel) {
89 Event event = new Event("removeChannel", channel);
90 session.getAsyncRemote().sendObject(event);
91 }
92
93 }