From 10994b1178dea6aca89701b1a00e4c4e810d6528 Mon Sep 17 00:00:00 2001 From: Tiberiu Baron <37719049+tibrn@users.noreply.github.com> Date: Sun, 18 Jul 2021 20:05:05 +0300 Subject: [PATCH] binance web-sockets: Add GetOrderBook() (#718) (part of #715) * add GetOrderBook * add more comments * move comment on next line * patch/resolve comments * patch/ fixes * patch/ increase waitTime * add timeout instead of waiting * patch/ remove timeout * patch/ check for len > fetchSize --- plugins/binanceExchange_ws.go | 204 ++++++++++++++++++++++++++--- plugins/binanceExchange_ws_test.go | 41 ++++++ 2 files changed, 225 insertions(+), 20 deletions(-) diff --git a/plugins/binanceExchange_ws.go b/plugins/binanceExchange_ws.go index 2c4566431..e44a68c59 100644 --- a/plugins/binanceExchange_ws.go +++ b/plugins/binanceExchange_ws.go @@ -9,23 +9,27 @@ import ( "time" "github.com/adshao/go-binance/v2" + "github.com/adshao/go-binance/v2/common" "github.com/stellar/kelp/api" "github.com/stellar/kelp/model" ) const ( STREAM_TICKER_FMT = "%s@ticker" + STREAM_BOOK_FMT = "%s@depth" TTLTIME = time.Second * 3 // ttl time in seconds ) var ( - timeToWaitForFirstEvent = time.Second + timeWaitForFirstEvent = time.Second * 2 ) var ( - ErrConversionWsMarketEvent = errConversion{from: "interface", to: "*binance.WsMarketStatEvent"} + ErrConversionWsMarketEvent = errConversion{from: "interface", to: "*binance.WsMarketStatEvent"} + ErrConversionWsPartialDepthEvent = errConversion{from: "interface", to: "*binance.WsPartialDepthEvent"} ) +type Subscriber func(symbol string, state *mapEvents) (*stream, error) type errMissingSymbol struct { symbol string } @@ -135,17 +139,21 @@ func makeMapEvents() *mapEvents { //struct used to keep all cached data type events struct { SymbolStats *mapEvents + BookStats *mapEvents } func createStateEvents() *events { events := &events{ SymbolStats: makeMapEvents(), + BookStats: makeMapEvents(), } return events } -// subscribe for symbol@ticker +// 24hr rolling window ticker statistics for a single symbol. These are NOT the statistics of the UTC day, but a 24hr rolling window for the previous 24hrs. +// Stream Name: @ticker +// Update Speed: 1000ms func subcribeTicker(symbol string, state *mapEvents) (*stream, error) { wsMarketStatHandler := func(ticker *binance.WsMarketStatEvent) { @@ -162,6 +170,50 @@ func subcribeTicker(symbol string, state *mapEvents) (*stream, error) { return nil, err } + keepConnection(doneC, func() { + subcribeTicker(symbol, state) + }) + + return &stream{doneC: doneC, stopC: stopC, cleanup: func() { + state.Del(symbol) + }}, err + +} + +//restart Connection with ws +// Binance close each connection after 24 hours +func keepConnection(doneC chan struct{}, reconnect func()) { + + go func() { + <-doneC + reconnect() + }() +} + +// Top bids and asks, pushed every second. Valid are 5, 10, or 20. +// @depth@100ms +// 100ms +func subcribeBook(symbol string, state *mapEvents) (*stream, error) { + + wsPartialDepthHandler := func(event *binance.WsPartialDepthEvent) { + state.Set(symbol, event) + } + + errHandler := func(err error) { + log.Printf("Error WsPartialDepthServe for symbol %s: %v\n", symbol, err) + } + + //Subscribe to highest level + doneC, stopC, err := binance.WsPartialDepthServe100Ms(symbol, "20", wsPartialDepthHandler, errHandler) + + if err != nil { + return nil, err + } + + keepConnection(doneC, func() { + subcribeBook(symbol, state) + }) + return &stream{doneC: doneC, stopC: stopC, cleanup: func() { state.Del(symbol) }}, err @@ -209,6 +261,35 @@ func getPrecision(floatStr string) int8 { return int8(len(strs[1])) } +//subscribeStream and wait for the first event +func (beWs *binanceExchangeWs) subscribeStream(symbol, format string, subscribe Subscriber, state *mapEvents) (mapData, error) { + + stream, err := subscribe(symbol, state) + + streamName := fmt.Sprintf(format, symbol) + + if err != nil { + return mapData{}, fmt.Errorf("error when subscribing for %s: %s", streamName, err) + } + + //Store stream + beWs.streamLock.Lock() + beWs.streams[streamName] = stream + beWs.streamLock.Unlock() + + //Wait for binance to send events + time.Sleep(timeWaitForFirstEvent) + + data, isStream := state.Get(symbol) + + //We couldn't subscribe for this pair + if !isStream { + return mapData{}, fmt.Errorf("error while subscribing for %s", streamName) + } + + return data, nil +} + // GetTickerPrice impl. func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[model.TradingPair]api.Ticker, error) { @@ -224,25 +305,10 @@ func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[mo tickerData, isTicker := beWs.events.SymbolStats.Get(symbol) if !isTicker { - stream, err := subcribeTicker(symbol, beWs.events.SymbolStats) + tickerData, err = beWs.subscribeStream(symbol, STREAM_TICKER_FMT, subcribeTicker, beWs.events.SymbolStats) if err != nil { - return nil, fmt.Errorf("error when subscribing for %s: %s", symbol, err) - } - - //Store stream - beWs.streamLock.Lock() - beWs.streams[fmt.Sprintf(STREAM_TICKER_FMT, symbol)] = stream - beWs.streamLock.Unlock() - - //Wait for binance to send events - time.Sleep(timeToWaitForFirstEvent) - - tickerData, isTicker = beWs.events.SymbolStats.Get(symbol) - - //We couldn't subscribe for this pair - if !isTicker { - return nil, fmt.Errorf("error while fetching ticker price for trading pair %s", symbol) + return nil, err } } @@ -286,6 +352,104 @@ func (beWs *binanceExchangeWs) GetTickerPrice(pairs []model.TradingPair) (map[mo return priceResult, nil } +//GetOrderBook impl +func (beWs *binanceExchangeWs) GetOrderBook(pair *model.TradingPair, maxCount int32) (*model.OrderBook, error) { + + var ( + fetchSize = int(maxCount) + ) + + if fetchSize > 20 { + return nil, fmt.Errorf("Max supported depth level is 20") + } + + symbol, err := pair.ToString(beWs.assetConverter, beWs.delimiter) + if err != nil { + return nil, fmt.Errorf("error converting pair to string: %s", err) + } + + bookData, isBook := beWs.events.BookStats.Get(symbol) + + if !isBook { + + bookData, err = beWs.subscribeStream(symbol, STREAM_BOOK_FMT, subcribeBook, beWs.events.BookStats) + + if err != nil { + return nil, err + } + + } + + //Show how old is the orderbook + log.Printf("OrderBook for %s is %d milliseconds old!\n", symbol, time.Now().Sub(bookData.createdAt).Milliseconds()) + + if isStale(bookData, TTLTIME) { + return nil, fmt.Errorf("ticker for %s symbols is older than %v", symbol, TTLTIME) + } + + bookI := bookData.data + + //Convert to WsMarketStatEvent + book, isOk := bookI.(*binance.WsPartialDepthEvent) + + if !isOk { + return nil, ErrConversionWsPartialDepthEvent + } + + askCcxtOrders := book.Asks + bidCcxtOrders := book.Bids + + if len(askCcxtOrders) > fetchSize { + askCcxtOrders = askCcxtOrders[:fetchSize] + + } + + if len(bidCcxtOrders) > fetchSize { + bidCcxtOrders = bidCcxtOrders[:fetchSize] + } + + asks, err := beWs.readOrders(askCcxtOrders, pair, model.OrderActionSell) + + if err != nil { + return nil, err + } + + bids, err := beWs.readOrders(bidCcxtOrders, pair, model.OrderActionBuy) + + if err != nil { + return nil, err + } + + return model.MakeOrderBook(pair, asks, bids), nil +} + +//readOrders... transform orders from binance to model.Order +func (beWs *binanceExchangeWs) readOrders(orders []common.PriceLevel, pair *model.TradingPair, orderAction model.OrderAction) ([]model.Order, error) { + + pricePrecision := getPrecision(orders[0].Price) + volumePrecision := getPrecision(orders[0].Quantity) + + result := []model.Order{} + for _, o := range orders { + + price, quantity, err := o.Parse() + + if err != nil { + return nil, err + } + + result = append(result, model.Order{ + Pair: pair, + OrderAction: orderAction, + OrderType: model.OrderTypeLimit, + Price: model.NumberFromFloat(price, pricePrecision), + Volume: model.NumberFromFloat(quantity, volumePrecision), + Timestamp: nil, + }) + } + return result, nil +} + //Unsubscribe ... unsubscribe from binance streams func (beWs *binanceExchangeWs) Unsubscribe(stream string) { diff --git a/plugins/binanceExchange_ws_test.go b/plugins/binanceExchange_ws_test.go index fa909629b..4f6133468 100644 --- a/plugins/binanceExchange_ws_test.go +++ b/plugins/binanceExchange_ws_test.go @@ -48,3 +48,44 @@ func Test_binanceExchangeWs_GetTickerPrice(t *testing.T) { return } } + +func Test_binanceExchangeWs_GetOrderBook(t *testing.T) { + + testBinanceExchangeWs, e := makeBinanceWs() + if !assert.NoError(t, e) { + return + } + + for _, obDepth := range []int32{1, 5, 8, 10, 15, 16, 20} { + + pair := model.TradingPair{Base: model.XLM, Quote: model.BTC} + ob, e := testBinanceExchangeWs.GetOrderBook(&pair, obDepth) + if !assert.NoError(t, e) { + return + } + assert.Equal(t, ob.Pair(), &pair) + + if !assert.True(t, len(ob.Asks()) > 0, len(ob.Asks())) { + return + } + if !assert.True(t, len(ob.Bids()) > 0, len(ob.Bids())) { + return + } + + if !assert.True(t, len(ob.Asks()) <= int(obDepth), fmt.Sprintf("asks should be <= %d", obDepth)) { + return + } + if !assert.True(t, len(ob.Bids()) <= int(obDepth), fmt.Sprintf("bids should be <= %d", obDepth)) { + return + } + + assert.True(t, ob.Asks()[0].OrderAction.IsSell()) + assert.True(t, ob.Asks()[0].OrderType.IsLimit()) + assert.True(t, ob.Bids()[0].OrderAction.IsBuy()) + assert.True(t, ob.Bids()[0].OrderType.IsLimit()) + assert.True(t, ob.Asks()[0].Price.AsFloat() > 0) + assert.True(t, ob.Asks()[0].Volume.AsFloat() > 0) + assert.True(t, ob.Bids()[0].Price.AsFloat() > 0) + assert.True(t, ob.Bids()[0].Volume.AsFloat() > 0) + } +}