From 3f2a3808f226119b924789c52ee823944721b281 Mon Sep 17 00:00:00 2001 From: BZ-CO <30245815+BZ-CO@users.noreply.github.com> Date: Sat, 4 Mar 2023 19:44:08 +0200 Subject: [PATCH 1/3] Update GetTickersWebSocketAsync --- .../Exchanges/Poloniex/ExchangePoloniexAPI.cs | 132 ++++++++++++------ 1 file changed, 93 insertions(+), 39 deletions(-) diff --git a/src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs b/src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs index 22dc2fb4..9fd0536f 100644 --- a/src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs +++ b/src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs @@ -11,6 +11,8 @@ The above copyright notice and this permission notice shall be included in all c */ using System.Diagnostics; +using System.Threading; +using Newtonsoft.Json; namespace ExchangeSharp { @@ -24,7 +26,7 @@ namespace ExchangeSharp public sealed partial class ExchangePoloniexAPI : ExchangeAPI { public override string BaseUrl { get; set; } = "https://api.poloniex.com"; - public override string BaseUrlWebSocket { get; set; } = "wss://api2.poloniex.com"; + public override string BaseUrlWebSocket { get; set; } = "wss://ws.poloniex.com/ws"; private ExchangePoloniexAPI() { @@ -222,18 +224,25 @@ private static IEnumerable ParseCompletedOrderDetails(JToke private async Task ParseTickerWebSocketAsync(string symbol, JToken token) { - /* - last: args[1], - lowestAsk: args[2], - highestBid: args[3], - percentChange: args[4], - baseVolume: args[5], - quoteVolume: args[6], - isFrozen: args[7], - high24hr: args[8], - low24hr: args[9] - */ - return await this.ParseTickerAsync(token, symbol, 2, 3, 1, 5, 6); + // { + // "symbol": "ETH_USDT", + // "dailyChange": "0.9428", + // "high": "507", + // "amount": "20", + // "quantity": "3", + // "tradeCount": 11, + // "low": "16", + // "closeTime": 1634062351868, + // "startTime": 1633996800000, + // "close": "204", + // "open": "105", + // "ts": 1648052794867, + // "markPrice": "205", + // } + + return await this.ParseTickerAsync(token, symbol, askKey: null, bidKey: null, lastKey: "close", + baseVolumeKey: "quantity", quoteVolumeKey: "amount", timestampKey: "ts", + TimestampType.UnixMilliseconds); } public override string PeriodSecondsToString(int seconds) @@ -451,34 +460,17 @@ protected override async Task OnGetTickersWebSocketAsync( Action>> callback, params string[] symbols) { - Dictionary idsToSymbols = new Dictionary(); - return await ConnectPublicWebSocketAsync(string.Empty, async (_socket, msg) => - { - JToken token = JToken.Parse(msg.ToStringFromUTF8()); - if (token[0].ConvertInvariant() == 1002) + return await ConnectWebsocketPublicAsync( + async (socket) => { await SubscribeToChannel(socket, "ticker", symbols); }, + async (socket, symbol, sArray, token) => { - if (token is JArray outerArray && outerArray.Count > 2 && outerArray[2] is JArray array && - array.Count > 9 && - idsToSymbols.TryGetValue(array[0].ToStringInvariant(), out string symbol)) + var tickers = new List> { - callback.Invoke(new List> - { - new KeyValuePair(symbol, - await ParseTickerWebSocketAsync(symbol, array)) - }); - } - } - }, async (_socket) => - { - var tickers = await GetTickersAsync(); - foreach (var ticker in tickers) - { - idsToSymbols[ticker.Value.Id] = ticker.Key; - } - - // subscribe to ticker channel (1002) - await _socket.SendMessageAsync(new { command = "subscribe", channel = 1002 }); - }); + new KeyValuePair(symbol, + await this.ParseTickerWebSocketAsync(symbol, token)) + }; + callback(tickers); + }); } protected override async Task OnGetTradesWebSocketAsync( @@ -1112,6 +1104,68 @@ private async Task CreateDepositAddress( return details; } + + private Task ConnectWebsocketPublicAsync( + Func connected, + Func callback) + { + Timer pingTimer = null; + return ConnectPublicWebSocketAsync( + url: "/public", + messageCallback: async (socket, msg) => + { + var token = JToken.Parse(msg.ToStringFromUTF8()); + var eventType = token["event"]?.ToStringInvariant(); + if (eventType != null) + { + if (eventType != "error") return; + Logger.Info("Websocket unable to connect: " + token["msg"]?.ToStringInvariant()); + return; + } + + if (token["data"] == null) return; + + foreach (var d in token["data"]) + { + await callback(socket, d["symbol"]?.ToStringInvariant(), null, d); + } + }, + connectCallback: async (socket) => + { + await connected(socket); + pingTimer ??= new Timer( + callback: async s => + await socket.SendMessageAsync( + JsonConvert.SerializeObject(new { Event = "ping" }, SerializerSettings)), + null, 0, 15000); + }, + disconnectCallback: socket => + { + pingTimer?.Dispose(); + pingTimer = null; + return Task.CompletedTask; + }); + } + + private static async Task SubscribeToChannel( + IWebSocket socket, + string channel, + string[] marketSymbols) + { + if (marketSymbols.Length == 0) + { + marketSymbols = new[] { "all" }; + } + + var payload = JsonConvert.SerializeObject(new + { + Event = "subscribe", + Channel = new[] { channel }, + Symbols = marketSymbols + }, SerializerSettings); + + await socket.SendMessageAsync(payload); + } } public partial class ExchangeName From fe727cf972fb17d1fba517e97241fdff715070f4 Mon Sep 17 00:00:00 2001 From: BZ-CO <30245815+BZ-CO@users.noreply.github.com> Date: Sat, 4 Mar 2023 21:04:09 +0200 Subject: [PATCH 2/3] Update GetTradesWebSocketAsync --- .../Exchanges/Poloniex/ExchangePoloniexAPI.cs | 81 ++----------------- 1 file changed, 8 insertions(+), 73 deletions(-) diff --git a/src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs b/src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs index 9fd0536f..3d149163 100644 --- a/src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs +++ b/src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs @@ -475,80 +475,15 @@ await this.ParseTickerWebSocketAsync(symbol, token)) protected override async Task OnGetTradesWebSocketAsync( Func, Task> callback, - params string[] marketSymbols) - { - Dictionary messageIdToSymbol = new Dictionary(); - Dictionary symbolToMessageId = new Dictionary(); - var symMeta = await GetMarketSymbolsMetadataAsync(); - foreach (var symbol in symMeta) - { - messageIdToSymbol.Add(int.Parse(symbol.MarketId), symbol.MarketSymbol); - symbolToMessageId.Add(symbol.MarketSymbol, int.Parse(symbol.MarketId)); - } - - return await ConnectPublicWebSocketAsync(string.Empty, async (_socket, msg) => - { - JToken token = JToken.Parse(msg.ToStringFromUTF8()); - if (token.Type == JTokenType.Object && token["error"] != null) - throw new APIException($"Exchange returned error: {token["error"].ToStringInvariant()}"); - int msgId = token[0].ConvertInvariant(); - - if (msgId == 1010 || token.Count() == 2) // "[7,2]" - { - // this is a heartbeat message - return; - } - - var seq = token[1].ConvertInvariant(); - var dataArray = token[2]; - foreach (var data in dataArray) - { - var dataType = data[0].ToStringInvariant(); - if (dataType == "i") - { - // can also populate messageIdToSymbol from here - continue; - } - else if (dataType == "t") - { - if (messageIdToSymbol.TryGetValue(msgId, out string symbol)) - { - // 0 1 2 3 4 5 6 - // ["t", "", <1 for buy 0 for sell>, "", "", , ""] - ExchangeTrade trade = data.ParseTrade(amountKey: 4, priceKey: 3, typeKey: 2, - timestampKey: 6, - timestampType: TimestampType.UnixMilliseconds, idKey: 1, typeKeyIsBuyValue: "1"); - await callback(new KeyValuePair(symbol, trade)); - } - } - else if (dataType == "o") - { - continue; - } - else - { - continue; - } - } - }, async (_socket) => - { - IEnumerable marketIDs = null; - if (marketSymbols == null || marketSymbols.Length == 0) - { - marketIDs = messageIdToSymbol.Keys; - } - else - { - marketIDs = marketSymbols.Select(s => symbolToMessageId[s]); - } - - // subscribe to order book and trades channel for each symbol - foreach (var id in marketIDs) + params string[] marketSymbols) => + await ConnectWebsocketPublicAsync( + async (socket) => { await SubscribeToChannel(socket, "trades", marketSymbols); }, + async (socket, symbol, sArray, token) => { - await _socket.SendMessageAsync(new { command = "subscribe", channel = id }); - } - }); - } + var trade = token.ParseTrade(amountKey: "quantity", priceKey: "price", typeKey: "takerSide", + timestampKey: "ts", TimestampType.UnixMilliseconds, idKey: "id"); + await callback(new KeyValuePair(symbol, trade)); + }); protected override async Task OnGetDeltaOrderBookWebSocketAsync( Action callback, From 36353f1957ee96f512ec46688f4c93065614e746 Mon Sep 17 00:00:00 2001 From: BZ-CO <30245815+BZ-CO@users.noreply.github.com> Date: Sat, 4 Mar 2023 22:58:03 +0200 Subject: [PATCH 3/3] Update GetOrderBookDepthAsync --- .../Exchanges/Poloniex/ExchangePoloniexAPI.cs | 125 +++++------------- 1 file changed, 34 insertions(+), 91 deletions(-) diff --git a/src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs b/src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs index 3d149163..4766c671 100644 --- a/src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs +++ b/src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs @@ -458,9 +458,8 @@ protected override async Task>> protected override async Task OnGetTickersWebSocketAsync( Action>> callback, - params string[] symbols) - { - return await ConnectWebsocketPublicAsync( + params string[] symbols) => + await ConnectWebsocketPublicAsync( async (socket) => { await SubscribeToChannel(socket, "ticker", symbols); }, async (socket, symbol, sArray, token) => { @@ -471,7 +470,6 @@ await this.ParseTickerWebSocketAsync(symbol, token)) }; callback(tickers); }); - } protected override async Task OnGetTradesWebSocketAsync( Func, Task> callback, @@ -490,96 +488,17 @@ protected override async Task OnGetDeltaOrderBookWebSocketAsync( int maxCount = 20, params string[] marketSymbols) { - Dictionary> messageIdToSymbol = new Dictionary>(); - return await ConnectPublicWebSocketAsync(string.Empty, (_socket, msg) => - { - JToken token = JToken.Parse(msg.ToStringFromUTF8()); - int msgId = token[0].ConvertInvariant(); - - //return if this is a heartbeat message - if (msgId == 1010) - { - return Task.CompletedTask; - } - - var seq = token[1].ConvertInvariant(); - var dataArray = token[2]; - ExchangeOrderBook book = new ExchangeOrderBook(); - foreach (var data in dataArray) + return await ConnectWebsocketPublicAsync( + async (socket) => { - var dataType = data[0].ToStringInvariant(); - if (dataType == "i") - { - var marketInfo = data[1]; - var market = marketInfo["currencyPair"].ToStringInvariant(); - messageIdToSymbol[msgId] = new Tuple(market, 0); - - // we are only returning the deltas, this would create a full order book which we don't want, but keeping it - // here for historical reference - /* - foreach (JProperty jprop in marketInfo["orderBook"][0].Cast()) - { - var depth = new ExchangeOrderPrice - { - Price = jprop.Name.ConvertInvariant(), - Amount = jprop.Value.ConvertInvariant() - }; - book.Asks[depth.Price] = depth; - } - foreach (JProperty jprop in marketInfo["orderBook"][1].Cast()) - { - var depth = new ExchangeOrderPrice - { - Price = jprop.Name.ConvertInvariant(), - Amount = jprop.Value.ConvertInvariant() - }; - book.Bids[depth.Price] = depth; - } - */ - } - else if (dataType == "o") - { - //removes or modifies an existing item on the order books - if (messageIdToSymbol.TryGetValue(msgId, out Tuple symbol)) - { - int type = data[1].ConvertInvariant(); - var depth = new ExchangeOrderPrice - { - Price = data[2].ConvertInvariant(), - Amount = data[3].ConvertInvariant() - }; - var list = (type == 1 ? book.Bids : book.Asks); - list[depth.Price] = depth; - book.MarketSymbol = symbol.Item1; - book.SequenceId = symbol.Item2 + 1; - messageIdToSymbol[msgId] = new Tuple(book.MarketSymbol, book.SequenceId); - } - } - else - { - continue; - } - } - - if (book != null && (book.Asks.Count != 0 || book.Bids.Count != 0)) + await SubscribeToOrderBookDepthChannel(socket, marketSymbols, maxCount); + }, (socket, symbol, sArray, token) => { + var book = token.ParseOrderBookFromJTokenArrays(); + book.MarketSymbol = symbol; callback(book); - } - - return Task.CompletedTask; - }, async (_socket) => - { - if (marketSymbols == null || marketSymbols.Length == 0) - { - marketSymbols = (await GetMarketSymbolsAsync()).ToArray(); - } - - // subscribe to order book and trades channel for each symbol - foreach (var sym in marketSymbols) - { - await _socket.SendMessageAsync(new { command = "subscribe", channel = NormalizeMarketSymbol(sym) }); - } - }); + return Task.CompletedTask; + }); } protected override async Task OnGetOrderBookAsync(string marketSymbol, int maxCount = 100) @@ -1101,6 +1020,30 @@ private static async Task SubscribeToChannel( await socket.SendMessageAsync(payload); } + + private async Task SubscribeToOrderBookDepthChannel( + IWebSocket socket, + string[] marketSymbols, + int depth = 20) + { + var depthIsValid = depth == 5 || depth == 10 || depth == 20; + if (!depthIsValid) + throw new ArgumentOutOfRangeException(nameof(depth)); + if (marketSymbols.Length == 0) + { + marketSymbols = (await OnGetMarketSymbolsAsync()).ToArray(); + } + + var payload = JsonConvert.SerializeObject(new + { + Event = "subscribe", + Channel = new[] { "book" }, + Symbols = marketSymbols, + Depth = depth + }, SerializerSettings); + + await socket.SendMessageAsync(payload); + } } public partial class ExchangeName