Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migration to the new Poloniex API [Public WS endpoints] #798

Merged
merged 3 commits into from
Mar 4, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
336 changes: 134 additions & 202 deletions src/ExchangeSharp/API/Exchanges/Poloniex/ExchangePoloniexAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ The above copyright notice and this permission notice shall be included in all c
*/

using System.Diagnostics;
using System.Threading;
using Newtonsoft.Json;

namespace ExchangeSharp
{
Expand All @@ -24,7 +26,7 @@ namespace ExchangeSharp
public sealed partial class ExchangePoloniexAPI : ExchangeAPI
{
public override string BaseUrl { get; set; } = "https://api.poloniex.com";
public override string BaseUrlWebSocket { get; set; } = "wss://api2.poloniex.com";
public override string BaseUrlWebSocket { get; set; } = "wss://ws.poloniex.com/ws";

private ExchangePoloniexAPI()
{
Expand Down Expand Up @@ -222,18 +224,25 @@ private static IEnumerable<ExchangeOrderResult> ParseCompletedOrderDetails(JToke

private async Task<ExchangeTicker> ParseTickerWebSocketAsync(string symbol, JToken token)
{
/*
last: args[1],
lowestAsk: args[2],
highestBid: args[3],
percentChange: args[4],
baseVolume: args[5],
quoteVolume: args[6],
isFrozen: args[7],
high24hr: args[8],
low24hr: args[9]
*/
return await this.ParseTickerAsync(token, symbol, 2, 3, 1, 5, 6);
// {
// "symbol": "ETH_USDT",
// "dailyChange": "0.9428",
// "high": "507",
// "amount": "20",
// "quantity": "3",
// "tradeCount": 11,
// "low": "16",
// "closeTime": 1634062351868,
// "startTime": 1633996800000,
// "close": "204",
// "open": "105",
// "ts": 1648052794867,
// "markPrice": "205",
// }

return await this.ParseTickerAsync(token, symbol, askKey: null, bidKey: null, lastKey: "close",
baseVolumeKey: "quantity", quoteVolumeKey: "amount", timestampKey: "ts",
TimestampType.UnixMilliseconds);
}

public override string PeriodSecondsToString(int seconds)
Expand Down Expand Up @@ -449,210 +458,47 @@ protected override async Task<IEnumerable<KeyValuePair<string, ExchangeTicker>>>

protected override async Task<IWebSocket> OnGetTickersWebSocketAsync(
Action<IReadOnlyCollection<KeyValuePair<string, ExchangeTicker>>> callback,
params string[] symbols)
{
Dictionary<string, string> idsToSymbols = new Dictionary<string, string>();
return await ConnectPublicWebSocketAsync(string.Empty, async (_socket, msg) =>
{
JToken token = JToken.Parse(msg.ToStringFromUTF8());
if (token[0].ConvertInvariant<int>() == 1002)
params string[] symbols) =>
await ConnectWebsocketPublicAsync(
async (socket) => { await SubscribeToChannel(socket, "ticker", symbols); },
async (socket, symbol, sArray, token) =>
{
if (token is JArray outerArray && outerArray.Count > 2 && outerArray[2] is JArray array &&
array.Count > 9 &&
idsToSymbols.TryGetValue(array[0].ToStringInvariant(), out string symbol))
var tickers = new List<KeyValuePair<string, ExchangeTicker>>
{
callback.Invoke(new List<KeyValuePair<string, ExchangeTicker>>
{
new KeyValuePair<string, ExchangeTicker>(symbol,
await ParseTickerWebSocketAsync(symbol, array))
});
}
}
}, async (_socket) =>
{
var tickers = await GetTickersAsync();
foreach (var ticker in tickers)
{
idsToSymbols[ticker.Value.Id] = ticker.Key;
}

// subscribe to ticker channel (1002)
await _socket.SendMessageAsync(new { command = "subscribe", channel = 1002 });
});
}
new KeyValuePair<string, ExchangeTicker>(symbol,
await this.ParseTickerWebSocketAsync(symbol, token))
};
callback(tickers);
});

protected override async Task<IWebSocket> OnGetTradesWebSocketAsync(
Func<KeyValuePair<string, ExchangeTrade>, Task> callback,
params string[] marketSymbols)
{
Dictionary<int, string> messageIdToSymbol = new Dictionary<int, string>();
Dictionary<string, int> symbolToMessageId = new Dictionary<string, int>();
var symMeta = await GetMarketSymbolsMetadataAsync();
foreach (var symbol in symMeta)
{
messageIdToSymbol.Add(int.Parse(symbol.MarketId), symbol.MarketSymbol);
symbolToMessageId.Add(symbol.MarketSymbol, int.Parse(symbol.MarketId));
}

return await ConnectPublicWebSocketAsync(string.Empty, async (_socket, msg) =>
{
JToken token = JToken.Parse(msg.ToStringFromUTF8());
if (token.Type == JTokenType.Object && token["error"] != null)
throw new APIException($"Exchange returned error: {token["error"].ToStringInvariant()}");
int msgId = token[0].ConvertInvariant<int>();

if (msgId == 1010 || token.Count() == 2) // "[7,2]"
{
// this is a heartbeat message
return;
}

var seq = token[1].ConvertInvariant<long>();
var dataArray = token[2];
foreach (var data in dataArray)
{
var dataType = data[0].ToStringInvariant();
if (dataType == "i")
{
// can also populate messageIdToSymbol from here
continue;
}
else if (dataType == "t")
{
if (messageIdToSymbol.TryGetValue(msgId, out string symbol))
{
// 0 1 2 3 4 5 6
// ["t", "<trade id>", <1 for buy 0 for sell>, "<price>", "<size>", <timestamp>, "<epoch_ms>"]
ExchangeTrade trade = data.ParseTrade(amountKey: 4, priceKey: 3, typeKey: 2,
timestampKey: 6,
timestampType: TimestampType.UnixMilliseconds, idKey: 1, typeKeyIsBuyValue: "1");
await callback(new KeyValuePair<string, ExchangeTrade>(symbol, trade));
}
}
else if (dataType == "o")
{
continue;
}
else
{
continue;
}
}
}, async (_socket) =>
{
IEnumerable<int> marketIDs = null;
if (marketSymbols == null || marketSymbols.Length == 0)
{
marketIDs = messageIdToSymbol.Keys;
}
else
params string[] marketSymbols) =>
await ConnectWebsocketPublicAsync(
async (socket) => { await SubscribeToChannel(socket, "trades", marketSymbols); },
async (socket, symbol, sArray, token) =>
{
marketIDs = marketSymbols.Select(s => symbolToMessageId[s]);
}

// subscribe to order book and trades channel for each symbol
foreach (var id in marketIDs)
{
await _socket.SendMessageAsync(new { command = "subscribe", channel = id });
}
});
}
var trade = token.ParseTrade(amountKey: "quantity", priceKey: "price", typeKey: "takerSide",
timestampKey: "ts", TimestampType.UnixMilliseconds, idKey: "id");
await callback(new KeyValuePair<string, ExchangeTrade>(symbol, trade));
});

protected override async Task<IWebSocket> OnGetDeltaOrderBookWebSocketAsync(
Action<ExchangeOrderBook> callback,
int maxCount = 20,
params string[] marketSymbols)
{
Dictionary<int, Tuple<string, long>> messageIdToSymbol = new Dictionary<int, Tuple<string, long>>();
return await ConnectPublicWebSocketAsync(string.Empty, (_socket, msg) =>
{
JToken token = JToken.Parse(msg.ToStringFromUTF8());
int msgId = token[0].ConvertInvariant<int>();

//return if this is a heartbeat message
if (msgId == 1010)
{
return Task.CompletedTask;
}

var seq = token[1].ConvertInvariant<long>();
var dataArray = token[2];
ExchangeOrderBook book = new ExchangeOrderBook();
foreach (var data in dataArray)
return await ConnectWebsocketPublicAsync(
async (socket) =>
{
var dataType = data[0].ToStringInvariant();
if (dataType == "i")
{
var marketInfo = data[1];
var market = marketInfo["currencyPair"].ToStringInvariant();
messageIdToSymbol[msgId] = new Tuple<string, long>(market, 0);

// we are only returning the deltas, this would create a full order book which we don't want, but keeping it
// here for historical reference
/*
foreach (JProperty jprop in marketInfo["orderBook"][0].Cast<JProperty>())
{
var depth = new ExchangeOrderPrice
{
Price = jprop.Name.ConvertInvariant<decimal>(),
Amount = jprop.Value.ConvertInvariant<decimal>()
};
book.Asks[depth.Price] = depth;
}
foreach (JProperty jprop in marketInfo["orderBook"][1].Cast<JProperty>())
{
var depth = new ExchangeOrderPrice
{
Price = jprop.Name.ConvertInvariant<decimal>(),
Amount = jprop.Value.ConvertInvariant<decimal>()
};
book.Bids[depth.Price] = depth;
}
*/
}
else if (dataType == "o")
{
//removes or modifies an existing item on the order books
if (messageIdToSymbol.TryGetValue(msgId, out Tuple<string, long> symbol))
{
int type = data[1].ConvertInvariant<int>();
var depth = new ExchangeOrderPrice
{
Price = data[2].ConvertInvariant<decimal>(),
Amount = data[3].ConvertInvariant<decimal>()
};
var list = (type == 1 ? book.Bids : book.Asks);
list[depth.Price] = depth;
book.MarketSymbol = symbol.Item1;
book.SequenceId = symbol.Item2 + 1;
messageIdToSymbol[msgId] = new Tuple<string, long>(book.MarketSymbol, book.SequenceId);
}
}
else
{
continue;
}
}

if (book != null && (book.Asks.Count != 0 || book.Bids.Count != 0))
await SubscribeToOrderBookDepthChannel(socket, marketSymbols, maxCount);
}, (socket, symbol, sArray, token) =>
{
var book = token.ParseOrderBookFromJTokenArrays();
book.MarketSymbol = symbol;
callback(book);
}

return Task.CompletedTask;
}, async (_socket) =>
{
if (marketSymbols == null || marketSymbols.Length == 0)
{
marketSymbols = (await GetMarketSymbolsAsync()).ToArray();
}

// subscribe to order book and trades channel for each symbol
foreach (var sym in marketSymbols)
{
await _socket.SendMessageAsync(new { command = "subscribe", channel = NormalizeMarketSymbol(sym) });
}
});
return Task.CompletedTask;
});
}

protected override async Task<ExchangeOrderBook> OnGetOrderBookAsync(string marketSymbol, int maxCount = 100)
Expand Down Expand Up @@ -1112,6 +958,92 @@ private async Task<ExchangeDepositDetails> CreateDepositAddress(

return details;
}

private Task<IWebSocket> ConnectWebsocketPublicAsync(
Func<IWebSocket, Task> connected,
Func<IWebSocket, string, string[], JToken, Task> callback)
{
Timer pingTimer = null;
return ConnectPublicWebSocketAsync(
url: "/public",
messageCallback: async (socket, msg) =>
{
var token = JToken.Parse(msg.ToStringFromUTF8());
var eventType = token["event"]?.ToStringInvariant();
if (eventType != null)
{
if (eventType != "error") return;
Logger.Info("Websocket unable to connect: " + token["msg"]?.ToStringInvariant());
return;
}

if (token["data"] == null) return;

foreach (var d in token["data"])
{
await callback(socket, d["symbol"]?.ToStringInvariant(), null, d);
}
},
connectCallback: async (socket) =>
{
await connected(socket);
pingTimer ??= new Timer(
callback: async s =>
await socket.SendMessageAsync(
JsonConvert.SerializeObject(new { Event = "ping" }, SerializerSettings)),
null, 0, 15000);
},
disconnectCallback: socket =>
{
pingTimer?.Dispose();
pingTimer = null;
return Task.CompletedTask;
});
}

private static async Task SubscribeToChannel(
IWebSocket socket,
string channel,
string[] marketSymbols)
{
if (marketSymbols.Length == 0)
{
marketSymbols = new[] { "all" };
}

var payload = JsonConvert.SerializeObject(new
{
Event = "subscribe",
Channel = new[] { channel },
Symbols = marketSymbols
}, SerializerSettings);

await socket.SendMessageAsync(payload);
}

private async Task SubscribeToOrderBookDepthChannel(
IWebSocket socket,
string[] marketSymbols,
int depth = 20)
{
var depthIsValid = depth == 5 || depth == 10 || depth == 20;
if (!depthIsValid)
throw new ArgumentOutOfRangeException(nameof(depth));
if (marketSymbols.Length == 0)
{
marketSymbols = (await OnGetMarketSymbolsAsync()).ToArray();
}

var payload = JsonConvert.SerializeObject(new
{
Event = "subscribe",
Channel = new[] { "book" },
Symbols = marketSymbols,
Depth = depth
}, SerializerSettings);

await socket.SendMessageAsync(payload);
}
}

public partial class ExchangeName
Expand Down