View Javadoc
1   package org.oxerr.okcoin.xchange.service.fix;
2   
3   import java.math.BigDecimal;
4   import java.util.ArrayList;
5   import java.util.Collections;
6   import java.util.Date;
7   import java.util.HashMap;
8   import java.util.List;
9   import java.util.Map;
10  import java.util.UUID;
11  
12  import org.knowm.xchange.currency.CurrencyPair;
13  import org.knowm.xchange.dto.Order.OrderType;
14  import org.knowm.xchange.dto.account.AccountInfo;
15  import org.knowm.xchange.dto.marketdata.OrderBook;
16  import org.knowm.xchange.dto.marketdata.OrderBookUpdate;
17  import org.knowm.xchange.dto.marketdata.Ticker;
18  import org.knowm.xchange.dto.marketdata.Trade;
19  import org.knowm.xchange.dto.trade.LimitOrder;
20  import org.oxerr.okcoin.fix.OKCoinApplication;
21  import org.oxerr.okcoin.fix.fix44.AccountInfoResponse;
22  import org.slf4j.Logger;
23  import org.slf4j.LoggerFactory;
24  
25  import quickfix.Application;
26  import quickfix.FieldNotFound;
27  import quickfix.Group;
28  import quickfix.IncorrectTagValue;
29  import quickfix.SessionID;
30  import quickfix.UnsupportedMessageType;
31  import quickfix.field.MDEntryPx;
32  import quickfix.field.MDEntrySize;
33  import quickfix.field.MDEntryType;
34  import quickfix.field.MDUpdateAction;
35  import quickfix.field.MDUpdateType;
36  import quickfix.field.NoMDEntries;
37  import quickfix.field.OrigTime;
38  import quickfix.field.SendingTime;
39  import quickfix.field.Side;
40  import quickfix.field.SubscriptionRequestType;
41  import quickfix.field.Symbol;
42  import quickfix.fix44.MarketDataIncrementalRefresh;
43  import quickfix.fix44.MarketDataSnapshotFullRefresh;
44  
45  /**
46   * {@link Application} implementation using XChange DTOs as callback parameters.
47   */
48  public class OKCoinXChangeApplication extends OKCoinApplication {
49  
50  	private final Logger log = LoggerFactory.getLogger(OKCoinXChangeApplication.class);
51  
52  	private final Map<String, String> mdReqIds = new HashMap<>();
53  
54  	private volatile OrderBook orderBook;
55  
56  	public OKCoinXChangeApplication(String apiKey, String secretKey) {
57  		super(apiKey, secretKey);
58  	}
59  
60  	/**
61  	 * Subscribes the order book of the specified symbol.
62  	 * When the order book refreshed,
63  	 * the {@link #onOrderBook(Date, List, List, SessionID)}
64  	 * and {@link #onOrderBook(OrderBook, SessionID)} will be invoked.
65  	 *
66  	 * @param symbol the symbol, such as "BTC/CNY", "LTC/CNY".
67  	 * @param sessionId the FIX session ID.
68  	 */
69  	public synchronized void subscribeOrderBook(
70  			String symbol,
71  			SessionID sessionId) {
72  		String mdReqId = UUID.randomUUID().toString();
73  		log.trace("Subscribing {}...", symbol);
74  		requestOrderBook(
75  			mdReqId,
76  			symbol,
77  			SubscriptionRequestType.SNAPSHOT_PLUS_UPDATES,
78  			0,
79  			MDUpdateType.FULL_REFRESH,
80  			sessionId);
81  		mdReqIds.put(symbol, mdReqId);
82  	}
83  
84  	public void subscribeOrderBook(CurrencyPair currencyPair, SessionID sessionId) {
85  		subscribeOrderBook(OKCoinFIXAdapters.adaptSymbol(currencyPair), sessionId);
86  	}
87  
88  	/**
89  	 * Unsubscribes the order book of the specified symbol.
90  	 *
91  	 * @param symbol the symbol, such as "BTC/CNY", "LTC/CNY".
92  	 * @param sessionId the FIX session ID.
93  	 */
94  	public synchronized void unsubscribeOrderBook(
95  			String symbol,
96  			SessionID sessionId) {
97  		String mdReqId = mdReqIds.get(symbol);
98  		if (mdReqId == null) {
99  			log.trace("{} is not subscribed, skip unsubscribing.", symbol);
100 			return;
101 		}
102 
103 		log.trace("Unsubscribing {}...", symbol);
104 		requestOrderBook(
105 			mdReqId,
106 			symbol,
107 			SubscriptionRequestType.DISABLE_PREVIOUS_SNAPSHOT_PLUS_UPDATE_REQUEST,
108 			0,
109 			MDUpdateType.FULL_REFRESH,
110 			sessionId);
111 		mdReqIds.remove(symbol);
112 	}
113 
114 	public void unsubscribeOrderBook(CurrencyPair currencyPair, SessionID sessionId) {
115 		unsubscribeOrderBook(OKCoinFIXAdapters.adaptSymbol(currencyPair), sessionId);
116 	}
117 
118 	/**
119 	 * {@inheritDoc}
120 	 */
121 	@Override
122 	public void onMessage(final MarketDataSnapshotFullRefresh message,
123 			final SessionID sessionId)
124 			throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
125 		Date origTime = message.getField(new OrigTime()).getValue();
126 		String symbol = message.getSymbol().getValue();
127 		String mdReqId = message.isSetMDReqID() ? message.getMDReqID().getValue() : null;
128 		CurrencyPair currencyPair = OKCoinFIXAdapters.adaptCurrencyPair(symbol);
129 
130 		log.debug("OrigTime: {}", origTime);
131 		log.debug("Symbol: {}, currency pair: {}", symbol, currencyPair);
132 		log.debug("MDReqID: {}", mdReqId);
133 
134 		List<LimitOrder> asks = new ArrayList<>();
135 		List<LimitOrder> bids = new ArrayList<>();
136 		List<Trade> trades = new ArrayList<>();
137 		BigDecimal openingPrice = null, closingPrice = null,
138 			highPrice = null, lowPrice = null,
139 			vwapPrice = null, lastPrice = null,
140 			volume = null;
141 
142 		for (int i = 1, l = message.getNoMDEntries().getValue(); i <= l; i++) {
143 			Group group = message.getGroup(i, NoMDEntries.FIELD);
144 			char type = group.getChar(MDEntryType.FIELD);
145 			BigDecimal px = group.getField(new MDEntryPx()).getValue();
146 			BigDecimal size = group.isSetField(MDEntrySize.FIELD) ? group.getField(new MDEntrySize()).getValue() : null;
147 			log.trace("{} {}@{}", type, size, px);
148 
149 			switch (type) {
150 			case MDEntryType.BID:
151 				bids.add(new LimitOrder.Builder(OrderType.BID, currencyPair).limitPrice(px).tradableAmount(size).timestamp(origTime).build());
152 				break;
153 			case MDEntryType.OFFER:
154 				asks.add(new LimitOrder.Builder(OrderType.ASK, currencyPair).limitPrice(px).tradableAmount(size).timestamp(origTime).build());
155 				break;
156 			case MDEntryType.TRADE:
157 				OrderType orderType = group.getField(new Side()).getValue() == Side.BUY ? OrderType.BID : OrderType.ASK;
158 				Trade trade = new Trade.Builder().currencyPair(currencyPair).type(orderType).price(px).tradableAmount(size).build();
159 				trades.add(trade);
160 				break;
161 			case MDEntryType.OPENING_PRICE:
162 				openingPrice = px;
163 				break;
164 			case MDEntryType.CLOSING_PRICE:
165 				closingPrice = px;
166 				break;
167 			case MDEntryType.TRADING_SESSION_HIGH_PRICE:
168 				highPrice = px;
169 				break;
170 			case MDEntryType.TRADING_SESSION_LOW_PRICE:
171 				lowPrice = px;
172 				break;
173 			case MDEntryType.TRADING_SESSION_VWAP_PRICE:
174 				vwapPrice = px;
175 				break;
176 			case MDEntryType.TRADE_VOLUME:
177 				lastPrice = px;
178 				volume = size;
179 				break;
180 			default:
181 				break;
182 			}
183 		}
184 
185 		if (!asks.isEmpty() && !bids.isEmpty()) {
186 			onOrderBook(origTime, asks, bids, sessionId);
187 		}
188 
189 		if (!trades.isEmpty()) {
190 			onTrades(trades, sessionId);
191 		}
192 
193 		if (openingPrice != null && closingPrice != null
194 			&& highPrice != null && lowPrice != null
195 			&& vwapPrice != null && lastPrice != null
196 			&& volume != null) {
197 			Ticker ticker = new Ticker.Builder()
198 				.currencyPair(currencyPair)
199 				.timestamp(origTime)
200 				.high(highPrice)
201 				.low(lowPrice)
202 				.last(lastPrice)
203 				.volume(volume)
204 				.build();
205 			onTicker(ticker);
206 		}
207 	}
208 
209 	/**
210 	 * {@inheritDoc}
211 	 */
212 	@Override
213 	public void onMessage(final MarketDataIncrementalRefresh message,
214 			final SessionID sessionId)
215 			throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
216 		final Date date = message.getHeader().getUtcTimeStamp(SendingTime.FIELD);
217 		final String symbol = message.getField(new Symbol()).getValue();
218 		final CurrencyPair currencyPair = OKCoinFIXAdapters.adaptCurrencyPair(symbol);
219 
220 		for (int i = 1, l = message.getNoMDEntries().getValue(); i <= l; i++) {
221 			final Group group = message.getGroup(i, NoMDEntries.FIELD);
222 
223 			final char action = group.getChar(MDUpdateAction.FIELD);
224 			final char type = group.getChar(MDEntryType.FIELD);
225 			final BigDecimal px = group.getDecimal(MDEntryPx.FIELD);
226 			final BigDecimal size = group.getDecimal(MDEntrySize.FIELD);
227 
228 			log.trace("{} {} {}@{}", action, type, size, px);
229 
230 			final LimitOrder limitOrder;
231 
232 			switch (type) {
233 			case MDEntryType.BID:
234 				limitOrder = new LimitOrder.Builder(OrderType.BID, currencyPair).limitPrice(px).tradableAmount(size).timestamp(date).build();
235 				break;
236 			case MDEntryType.OFFER:
237 				limitOrder = new LimitOrder.Builder(OrderType.ASK, currencyPair).limitPrice(px).tradableAmount(size).timestamp(date).build();
238 				break;
239 			default:
240 				log.warn("Unsupported MDEntryType: {}.", type);
241 				limitOrder = null;
242 				break;
243 			}
244 
245 			if (limitOrder != null && this.orderBook != null) {
246 				switch (action) {
247 				case MDUpdateAction.NEW:
248 					this.orderBook.update(limitOrder);
249 					break;
250 				case MDUpdateAction.CHANGE:
251 					this.orderBook.update(limitOrder);
252 					break;
253 				case MDUpdateAction.DELETE:
254 					OrderBookUpdate orderBookUpdate = new OrderBookUpdate(
255 						limitOrder.getType(),
256 						limitOrder.getTradableAmount(),
257 						limitOrder.getCurrencyPair(),
258 						limitOrder.getLimitPrice(),
259 						limitOrder.getTimestamp(), BigDecimal.ZERO);
260 					this.orderBook.update(orderBookUpdate);
261 					break;
262 				default:
263 					log.warn("Unsupported MDUpdateAction: {}.", action);
264 					break;
265 				}
266 			}
267 		}
268 
269 		match(this.orderBook);
270 
271 		onOrderBook(this.orderBook, sessionId);
272 	}
273 
274 	/**
275 	 * {@inheritDoc}
276 	 */
277 	@Override
278 	public void onMessage(AccountInfoResponse message, SessionID sessionId)
279 			throws FieldNotFound, UnsupportedMessageType, IncorrectTagValue {
280 		onAccountInfo(OKCoinFIXAdapters.adaptAccountInfo(message), sessionId);
281 	}
282 
283 	/**
284 	 * Invoked when the order book updated.
285 	 *
286 	 * @param origTime time of message origination.
287 	 * @param asks ask orders.
288 	 * @param bids bid orders.
289 	 * @param sessionId the FIX session ID.
290 	 */
291 	public void onOrderBook(Date origTime, List<LimitOrder> asks, List<LimitOrder> bids, SessionID sessionId) {
292 		LimitOrder lowestAsk = asks.get(0);
293 		LimitOrder highestBid = bids.get(0);
294 
295 		if (lowestAsk.getLimitPrice().compareTo(highestBid.getLimitPrice()) <= 0) {
296 			// OKCoin's bid/ask of SNAPSHOT are reversed?
297 			// Swap the bid/ask orders
298 			List<LimitOrder> tmpAsks = new ArrayList<>(asks);
299 
300 			asks.clear();
301 			for (LimitOrder limitOrder : bids) {
302 				asks.add(LimitOrder.Builder.from(limitOrder).orderType(OrderType.ASK).build());
303 			}
304 
305 			bids.clear();
306 			for (LimitOrder limitOrder : tmpAsks) {
307 				bids.add(LimitOrder.Builder.from(limitOrder).orderType(OrderType.BID).build());
308 			}
309 		}
310 
311 		// bids should be sorted by limit price descending
312 		Collections.sort(bids);
313 
314 		// asks should be sorted by limit price ascending
315 		Collections.sort(asks);
316 
317 		this.orderBook = new OrderBook(origTime, asks, bids);;
318 		onOrderBook(this.orderBook, sessionId);
319 	}
320 
321 	/**
322 	 * Invoked when the order book updated.
323 	 *
324 	 * @param orderBook the full order book.
325 	 * @param sessionId the FIX session ID.
326 	 */
327 	public void onOrderBook(OrderBook orderBook, SessionID sessionId) {
328 	}
329 
330 	public void onTrades(List<Trade> trade, SessionID sessionId) {
331 	}
332 
333 	public void onTicker(Ticker ticker) {
334 	}
335 
336 	public void onAccountInfo(AccountInfo accountInfo, SessionID sessionId) {
337 	}
338 
339 	private void match(final OrderBook orderBook) {
340 		if (orderBook.getBids().size() > 0 && orderBook.getAsks().size() > 0) {
341 			final LimitOrder bid = orderBook.getBids().get(0);
342 			final LimitOrder ask = orderBook.getAsks().get(0);
343 
344 			if (bid.getLimitPrice().compareTo(ask.getLimitPrice()) >= 0) {
345 				final BigDecimal tradeAmount = bid.getTradableAmount().min(ask.getTradableAmount());
346 				log.trace("Trade {} bid@{} ask@{}",
347 					tradeAmount,
348 					bid.getLimitPrice(),
349 					ask.getLimitPrice()
350 				);
351 
352 				final OrderBookUpdate bidUpdate = new OrderBookUpdate(
353 					bid.getType(),
354 					bid.getTradableAmount(),
355 					bid.getCurrencyPair(),
356 					bid.getLimitPrice(),
357 					bid.getTimestamp(),
358 					bid.getTradableAmount().subtract(tradeAmount)
359 				);
360 				final OrderBookUpdate askUpdate = new OrderBookUpdate(
361 					ask.getType(),
362 					ask.getTradableAmount(),
363 					ask.getCurrencyPair(),
364 					ask.getLimitPrice(),
365 					ask.getTimestamp(),
366 					ask.getTradableAmount().subtract(tradeAmount)
367 				);
368 
369 				orderBook.update(bidUpdate);
370 				orderBook.update(askUpdate);
371 
372 				match(orderBook);
373 			}
374 		}
375 	}
376 
377 }