-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathHypeRate.cs
429 lines (357 loc) · 13.9 KB
/
HypeRate.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
using System.Net.WebSockets;
using System.Text;
using System.Text.Json;
using HypeRate.EventArgs;
using HypeRate.Network.In;
using HypeRate.Network.Out;
namespace HypeRate;
/// <summary>
/// Exposes the public API for interacting with the HypeRate WebSocket API
/// </summary>
public class HypeRate
{
/// <summary>
/// Holds the global unique instance of the current class
/// </summary>
private static readonly HypeRate Instance = new();
private readonly Task _heartbeatTask;
private readonly Task _receiveTask;
/// <summary>
/// The internal channel manager which is used for managing channels
/// </summary>
private volatile ChannelManager _channelManager = new();
/// <summary>
/// Holds an instance of the built-in Websocket client
/// </summary>
private volatile ClientWebSocket _websocketClient = new();
/// <summary>
/// The base URL to which the Websocket client should connect to.
/// Normally you wouldn't change this value - otherwise the WebSocket client could not establish a connection.
/// Only change this if you are asked to by the HypeRate team.
/// </summary>
public string BaseUrl = "wss://app.hyperate.io/socket/websocket";
/// <summary>
/// Constructs a new instance of the HypeRate class
/// </summary>
private HypeRate()
{
_heartbeatTask = new Task(async () =>
{
while (true)
{
if (_websocketClient.State == WebSocketState.Open) await SendPacket(GetKeepAlivePacket(), default);
await Task.Delay(10 * 1000);
}
});
_receiveTask = new Task(async () =>
{
while (true)
{
if (_websocketClient.State == WebSocketState.Open)
{
var buffer = new ArraySegment<byte>(new byte[1024]);
WebSocketReceiveResult result;
var allBytes = new List<byte>();
do
{
try
{
result = await _websocketClient.ReceiveAsync(buffer, CancellationToken.None);
if (_websocketClient.State == WebSocketState.CloseReceived)
{
Disconnected?.Invoke(this, System.EventArgs.Empty);
break;
}
}
catch (Exception e)
{
if (e is ObjectDisposedException) Disconnected?.Invoke(this, System.EventArgs.Empty);
break;
}
for (var i = 0; i < result.Count; i++) allBytes.Add(buffer[i]);
} while (!result.EndOfMessage);
if (allBytes.Count == 0) continue;
var incomingPacket = JsonSerializer.Deserialize<BasicIncomingPacket>(allBytes.ToArray());
if (incomingPacket == null) continue;
if (incomingPacket.IsSystemPacket)
{
if (incomingPacket.Ref != null)
{
var @ref = (int)incomingPacket.Ref;
var refType = _channelManager.DetermineRefTypeByRef(@ref);
switch (refType)
{
case RefType.Join:
var joinedChannelName = _channelManager.HandleJoin(@ref);
if (joinedChannelName != null) ChannelJoined?.Invoke(this, joinedChannelName);
break;
case RefType.Leave:
var leftChannelName = _channelManager.HandleLeave(@ref);
if (leftChannelName != null) ChannelLeft?.Invoke(this, leftChannelName);
break;
}
}
}
else if (incomingPacket.IsHeartbeatPacket)
{
var heartbeatUpdatePacket =
JsonSerializer.Deserialize<IncomingHeartbeatPacket>(allBytes.ToArray());
if (heartbeatUpdatePacket?.Payload?.Hr == null) continue;
var deviceId = incomingPacket.Topic?[3..];
var heartbeat = (int)heartbeatUpdatePacket.Payload.Hr;
HeartbeatReceived?.Invoke(this, new HeartbeatReceivedEventArgs(deviceId!, heartbeat));
}
else if (incomingPacket.IsClipsPacket)
{
var clipCreatedPacket =
JsonSerializer.Deserialize<IncomingClipCreatedPacket>(allBytes.ToArray());
if (clipCreatedPacket?.Payload?.TwitchSlug == null) continue;
var deviceId = incomingPacket.Topic?[6..];
ClipCreated?.Invoke(this,
new ClipCreatedEventArgs(deviceId!, clipCreatedPacket.Payload.TwitchSlug));
}
}
await Task.Delay(1);
}
});
}
/// <summary>
/// Contains the application specific API token
/// </summary>
public string ApiToken { get; private set; } = "";
/// <summary>
/// Returns true when the connection to the HypeRate servers is established
/// </summary>
public bool IsConnected => _websocketClient.State == WebSocketState.Open;
/// <summary>
/// Starts the Tasks which are required for receiving packets and sending the keep-alive packet in the required
/// interval.
/// </summary>
public void Start()
{
_heartbeatTask.Start();
_receiveTask.Start();
}
/// <summary>
/// Sets the new WebSocket API token.
/// It automatically trims the input so the leading and trailing spaces get removed.
/// </summary>
/// <param name="newApiToken"></param>
public void SetApiToken(string newApiToken)
{
ApiToken = newApiToken.Trim();
}
/// <summary>
/// Returns the singleton instance of the HypeRate class
/// </summary>
/// <returns>The global unique instance</returns>
public static HypeRate GetInstance()
{
return Instance;
}
/// <summary>
/// Tries to determine the ChannelType based on the given input.
/// </summary>
/// <param name="channelName">The name of the channel</param>
/// <returns>
/// Returns ChannelType.Heartbeat when the channel name starts with "hr:".
/// Returns ChannelType.Clips when the channel name starts with "clips:".
/// Otherwise ChannelType.Unknown will be returned.
/// </returns>
public static ChannelType DetermineChannelType(string channelName)
{
if (channelName.StartsWith("hr:")) return ChannelType.Heartbeat;
if (channelName.StartsWith("clips:")) return ChannelType.Clips;
return ChannelType.Unknown;
}
#region Events
/// <summary>
/// This event gets fired when the WebSocket client has successfully established a connection to the HypeRate servers.
/// </summary>
public event EventHandler? Connected;
/// <summary>
/// This event gets fired when the WebSocket client has been disconnected.
/// This could be in the following scenarios:
/// - the user has lost their internet connection
/// - the HypeRate servers are getting restarted
/// - the client send data which couldn't be processed by the server
/// </summary>
public event EventHandler? Disconnected;
/// <summary>
/// This event gets fired when the server acknowledged the join of a specific channel.
/// </summary>
public event EventHandler<string>? ChannelJoined;
/// <summary>
/// This event gets fired when the server acknowledged the leave of a specific channel.
/// </summary>
public event EventHandler<string>? ChannelLeft;
/// <summary>
/// This event gets fired when the WebSocket client has received a new "heartbeat" event (the user sent a new heartbeat
/// from their device to the servers).
/// </summary>
public event EventHandler<HeartbeatReceivedEventArgs>? HeartbeatReceived;
/// <summary>
/// This event gets fired when the user has created a new clip (with the HypeClips integration in HypeRate).
/// More information here: https://github.com/HypeRate/DevDocs/blob/main/Clips.md#receiving-data
/// </summary>
public event EventHandler<ClipCreatedEventArgs>? ClipCreated;
#endregion
#region Network related
/// <summary>
/// Tries to establish a connection to the HypeRate servers based on the BaseUrl and ApiToken
/// </summary>
/// <param name="cancellationToken">The cancellation token which is passed to the WebSocket client</param>
public async Task Connect(CancellationToken cancellationToken = default)
{
var urlToConnectTo = new Uri($"{BaseUrl}?token={ApiToken}");
await _websocketClient.ConnectAsync(urlToConnectTo, cancellationToken);
_channelManager = new ChannelManager();
Connected?.Invoke(this, System.EventArgs.Empty);
}
/// <summary>
/// Tries to disconnect from the HypeRate servers
/// </summary>
/// <param name="cancellationToken">The cancellation token which is passed to the WebSocket client</param>
public async Task Disconnect(CancellationToken cancellationToken = default)
{
await _websocketClient.CloseAsync(WebSocketCloseStatus.NormalClosure, null, cancellationToken);
Disconnected?.Invoke(this, System.EventArgs.Empty);
}
/// <summary>
/// Reconnects the WebSocket client and re-joins all previous joined channels.
/// </summary>
/// <param name="cancellationToken">The optional cancellation token</param>
public async Task Reconnect(CancellationToken cancellationToken = default)
{
await Disconnect(cancellationToken);
var channelsToRejoin = _channelManager.GetChannelsToRejoin();
await Connect(cancellationToken);
foreach (var channelName in channelsToRejoin) await JoinChannel(channelName, cancellationToken);
}
#endregion
#region HypeRate related
/// <summary>
/// Tries to join the "heartbeat" channel so that the "HeartbeatReceived" event handler will be invoked when new data
/// has been received.
/// This function is part of the high-level API.
/// </summary>
/// <param name="deviceId">The device ID of the user</param>
/// <param name="cancellationToken">The optional cancellation token</param>
public async Task JoinHeartbeatChannel(string deviceId, CancellationToken? cancellationToken)
{
await JoinChannel($"hr:{deviceId}", cancellationToken);
}
/// <summary>
/// Tries to leave the "heartbeat" channel so that the "HeartbeatReceived" event handler will not emit any new events
/// for the given device ID.
/// This function is part of the high-level API.
/// </summary>
/// <param name="deviceId">The device ID of the user</param>
/// <param name="cancellationToken">The optional cancellation token</param>
public async Task LeaveHeartbeatChannel(string deviceId, CancellationToken? cancellationToken)
{
await LeaveChannel($"hr:{deviceId}", cancellationToken);
}
/// <summary>
/// Tries to join the "clips" channel so that the "ClipCreated" event handler will be invoked when new data has been
/// received.
/// This function is part of the high-level API.
/// </summary>
/// <param name="deviceId">The device ID of the user</param>
/// <param name="cancellationToken">The optional cancellation token</param>
public async Task JoinClipsChannel(string deviceId, CancellationToken? cancellationToken)
{
await JoinChannel($"clips:{deviceId}", cancellationToken);
}
/// <summary>
/// Tries to leave the "clips" channel so that the "ClipCreated" event handler will not emit any new events for the
/// given device ID.
/// This function is part of the high-level API.
/// </summary>
/// <param name="deviceId">The device ID of the user</param>
/// <param name="cancellationToken">The optional cancellation token</param>
public async Task LeaveClipsChannel(string deviceId, CancellationToken? cancellationToken)
{
await LeaveChannel($"clips:{deviceId}", cancellationToken);
}
/// <summary>
/// Tries to join the given channel by its name.
/// This function is part of the low-level API (when no specific function is available for joining the required
/// channel.
/// </summary>
/// <param name="channelName">The name of the channel to join</param>
/// <param name="cancellationToken">The optional cancellation token</param>
public async Task JoinChannel(string channelName, CancellationToken? cancellationToken)
{
var joinRef = _channelManager.JoinChannel(channelName);
if (joinRef == -1) return;
await SendPacket(GetJoinPacket(channelName, joinRef), cancellationToken ?? default);
}
/// <summary>
/// Tries to leave the given channel by its name.
/// This function is part of the low-level API (when no specific function is available for leaving the required
/// channel.
/// </summary>
/// <param name="channelName">The name of the channel to leave</param>
/// <param name="cancellationToken">The optional cancellation token</param>
public async Task LeaveChannel(string channelName, CancellationToken? cancellationToken)
{
var leaveRef = _channelManager.LeaveChannel(channelName);
if (leaveRef == -1) return;
await SendPacket(GetLeavePacket(channelName, leaveRef), cancellationToken ?? default);
}
#endregion
#region Packet stuff
/// <summary>
/// Returns the string representation of the channel join packet
/// </summary>
/// <param name="topic">The name of the topic which should be joined</param>
/// <param name="joinRef">The ref (request ID) of the packet</param>
/// <returns>The JSON representation of the join packet</returns>
private string GetJoinPacket(string topic, int joinRef)
{
return JsonSerializer.Serialize(new JoinChannelPacket(topic, joinRef));
}
/// <summary>
/// Returns the string representation of the keep-alive packet
/// </summary>
/// <returns>The JSON representation of the keep-alive packet</returns>
private string GetKeepAlivePacket()
{
return JsonSerializer.Serialize(new KeepAlivePacket());
}
/// <summary>
/// Returns the string representation of the channel leave packet
/// </summary>
/// <param name="topic">The name of the topic which should be left</param>
/// <param name="leaveRef">The ref (request ID) of the packet</param>
/// <returns>The JSON representation of the leave packet</returns>
private string GetLeavePacket(string topic, int leaveRef)
{
return JsonSerializer.Serialize(new LeaveChannelPacket(topic, leaveRef));
}
/// <summary>
/// Sends the given JSON data to the HypeRate server
/// </summary>
/// <param name="data">The data to send</param>
/// <param name="cancellationToken">The cancellation token of the user</param>
private async Task SendPacket(string data, CancellationToken cancellationToken)
{
var messageBytes = Encoding.UTF8.GetBytes(data);
try
{
await _websocketClient.SendAsync(
messageBytes,
WebSocketMessageType.Text,
WebSocketMessageFlags.EndOfMessage,
cancellationToken
);
}
catch (Exception e)
{
if (e is ObjectDisposedException) Disconnected?.Invoke(this, System.EventArgs.Empty);
throw;
}
}
#endregion
}