View Javadoc
1   package org.oxerr.okcoin.websocket;
2   
3   import java.io.IOException;
4   import java.util.Arrays;
5   import java.util.Collections;
6   import java.util.HashMap;
7   import java.util.HashSet;
8   import java.util.Map;
9   import java.util.Set;
10  
11  import javax.websocket.ClientEndpoint;
12  import javax.websocket.CloseReason;
13  import javax.websocket.EncodeException;
14  import javax.websocket.EndpointConfig;
15  import javax.websocket.OnClose;
16  import javax.websocket.OnError;
17  import javax.websocket.OnMessage;
18  import javax.websocket.OnOpen;
19  import javax.websocket.Session;
20  
21  import org.oxerr.okcoin.websocket.dto.Event;
22  import org.oxerr.okcoin.websocket.event.OKCoinDataListener;
23  import org.slf4j.Logger;
24  import org.slf4j.LoggerFactory;
25  
26  /**
27   * OKCoin WebSocket client endpoint.
28   */
29  @ClientEndpoint(encoders = OKCoinEncoder.class, decoders = OKCoinDecoder.class)
30  public final class OKCoinClientEndpoint {
31  
32  	private final Logger log = LoggerFactory
33  			.getLogger(OKCoinClientEndpoint.class);
34  
35  	private final Map<String, Set<OKCoinDataListener>> listeners = new HashMap<>();
36  
37  	@OnOpen
38  	public void onOpen(Session session, EndpointConfig config) {
39  		log.trace("open: {}, config: {}", session, config);
40  	}
41  
42  	@OnMessage
43  	public void onMessage(Session session, OKCoinData[] data)
44  		throws IOException, EncodeException {
45  
46  		if (log.isTraceEnabled()) {
47  			log.trace("data: {}", Arrays.toString(data));
48  		}
49  
50  		Arrays.stream(data).forEach(e -> {
51  			Set<OKCoinDataListener> listeners = this.listeners.get(e.getChannel());
52  			(listeners == null ? Collections.<OKCoinDataListener>emptySet() : listeners).forEach(listener -> {
53  				listener.onMessage(session, e.getData());
54  			});
55  		});
56  	}
57  
58  	@OnClose
59  	public void onClose(Session session, CloseReason reason) {
60  		log.trace("close: {}, reason: {}", session, reason);
61  	}
62  
63  	@OnError
64  	public void onError(Session session, Throwable throwable) {
65  		log.trace("error: {}", session, throwable);
66  	}
67  
68  	public synchronized void addChannelListener(String channel, OKCoinDataListener listener) {
69  		Set<OKCoinDataListener> channelListeners = this.listeners.get(channel);
70  		if (channelListeners == null) {
71  			channelListeners = new HashSet<>();
72  			this.listeners.put(channel, channelListeners);
73  		}
74  		channelListeners.add(listener);
75  	}
76  
77  	public void addChannel(Session session, String channel) {
78  		Event event = new Event("addChannel", channel);
79  		session.getAsyncRemote().sendObject(event);
80  	}
81  
82  	public void addChannel(Session session, String channel,
83  			Map<String, String> parameters) {
84  		Event event = new Event("addChannel", channel, parameters);
85  		session.getAsyncRemote().sendObject(event);
86  	}
87  
88  	public void removeChannel(Session session, String channel) {
89  		Event event = new Event("removeChannel", channel);
90  		session.getAsyncRemote().sendObject(event);
91  	}
92  
93  }