Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(WebSocketShard): send ratelimit handling #8887

Merged
merged 4 commits into from
Dec 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions packages/ws/src/utils/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { Collection } from '@discordjs/collection';
import { lazy } from '@discordjs/util';
import { APIVersion, GatewayOpcodes } from 'discord-api-types/v10';
import type { OptionalWebSocketManagerOptions, SessionInfo } from '../ws/WebSocketManager.js';
import type { SendRateLimitState } from '../ws/WebSocketShard.js';

/**
* Valid encoding types
Expand Down Expand Up @@ -60,3 +61,10 @@ export const ImportantGatewayOpcodes = new Set([
GatewayOpcodes.Identify,
GatewayOpcodes.Resume,
]);

export function getInitialSendRateLimitState(): SendRateLimitState {
return {
remaining: 120,
resetAt: Date.now() + 60_000,
};
}
76 changes: 55 additions & 21 deletions packages/ws/src/ws/WebSocketShard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import {
import { WebSocket, type RawData } from 'ws';
import type { Inflate } from 'zlib-sync';
import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy';
import { ImportantGatewayOpcodes } from '../utils/constants.js';
import { getInitialSendRateLimitState, ImportantGatewayOpcodes } from '../utils/constants.js';
import type { SessionInfo } from './WebSocketManager.js';

// eslint-disable-next-line promise/prefer-await-to-then
const getZlibSync = lazy(async () => import('zlib-sync').then((mod) => mod.default).catch(() => null));

export enum WebSocketShardEvents {
Closed = 'closed',
Debug = 'debug',
Dispatch = 'dispatch',
Hello = 'hello',
Expand All @@ -51,6 +52,7 @@ export enum WebSocketShardDestroyRecovery {

// eslint-disable-next-line @typescript-eslint/consistent-type-definitions
export type WebSocketShardEventsMap = {
[WebSocketShardEvents.Closed]: [{ code: number }];
[WebSocketShardEvents.Debug]: [payload: { message: string }];
[WebSocketShardEvents.Hello]: [];
[WebSocketShardEvents.Ready]: [payload: { data: GatewayReadyDispatchData }];
Expand All @@ -69,6 +71,11 @@ export enum CloseCodes {
Resuming = 4_200,
}

export interface SendRateLimitState {
remaining: number;
resetAt: number;
}

export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
private connection: WebSocket | null = null;

Expand All @@ -86,10 +93,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

private isAck = true;

private sendRateLimitState = {
remaining: 120,
resetAt: Date.now(),
};
private sendRateLimitState: SendRateLimitState = getInitialSendRateLimitState();

private heartbeatInterval: NodeJS.Timer | null = null;

Expand Down Expand Up @@ -146,6 +150,8 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

this.status = WebSocketShardStatus.Connecting;

this.sendRateLimitState = getInitialSendRateLimitState();

await this.waitForEvent(WebSocketShardEvents.Hello, this.strategy.options.helloTimeout);

if (session?.shardCount === this.strategy.options.shardCount) {
Expand Down Expand Up @@ -187,22 +193,32 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
await this.strategy.updateSessionInfo(this.id, null);
}

if (
this.connection &&
(this.connection.readyState === WebSocket.OPEN || this.connection.readyState === WebSocket.CONNECTING)
) {
if (this.connection) {
// No longer need to listen to messages
this.connection.removeAllListeners('message');
// Prevent a reconnection loop by unbinding the main close event
this.connection.removeAllListeners('close');
this.connection.close(options.code, options.reason);

// Actually wait for the connection to close
await once(this.connection, 'close');
const shouldClose =
this.connection.readyState === WebSocket.OPEN || this.connection.readyState === WebSocket.CONNECTING;

this.debug([
'Connection status during destroy',
`Needs closing: ${shouldClose}`,
`Ready state: ${this.connection.readyState}`,
]);

if (shouldClose) {
this.connection.close(options.code, options.reason);
await once(this.connection, 'close');
this.emit(WebSocketShardEvents.Closed, { code: options.code });
}

// Lastly, remove the error event.
// Doing this earlier would cause a hard crash in case an error event fired on our `close` call
this.connection.removeAllListeners('error');
} else {
this.debug(['Destroying a shard that has no connection; please open an issue on GitHub']);
didinele marked this conversation as resolved.
Show resolved Hide resolved
}

this.status = WebSocketShardStatus.Idle;
Expand All @@ -227,26 +243,44 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
}

public async send(payload: GatewaySendPayload) {
public async send(payload: GatewaySendPayload): Promise<void> {
if (!this.connection) {
throw new Error("WebSocketShard wasn't connected");
}

if (this.status !== WebSocketShardStatus.Ready && !ImportantGatewayOpcodes.has(payload.op)) {
this.debug(['Tried to send a non-crucial payload before the shard was ready, waiting']);
await once(this, WebSocketShardEvents.Ready);
}

await this.sendQueue.wait();

if (--this.sendRateLimitState.remaining <= 0) {
if (this.sendRateLimitState.resetAt < Date.now()) {
await sleep(Date.now() - this.sendRateLimitState.resetAt);
const now = Date.now();

if (this.sendRateLimitState.resetAt > now) {
const sleepFor = this.sendRateLimitState.resetAt - now;

this.debug([`Was about to hit the send rate limit, sleeping for ${sleepFor}ms`]);
const controller = new AbortController();

// Sleep for the remaining time, but if the connection closes in the meantime, we shouldn't wait the remainder to avoid blocking the new conn
const interrupted = await Promise.race([
sleep(sleepFor).then(() => false),
once(this, WebSocketShardEvents.Closed, { signal: controller.signal }).then(() => true),
]);

if (interrupted) {
this.debug(['Connection closed while waiting for the send rate limit to reset, re-queueing payload']);
this.sendQueue.shift();
return this.send(payload);
}

// This is so the listener from the `once` call is removed
controller.abort();
}

this.sendRateLimitState = {
remaining: 119,
resetAt: Date.now() + 60_000,
};
this.sendRateLimitState = getInitialSendRateLimitState();
}

this.sendQueue.shift();
Expand Down Expand Up @@ -476,9 +510,10 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}

private async onClose(code: number) {
this.emit(WebSocketShardEvents.Closed, { code });

switch (code) {
case CloseCodes.Normal: {
this.debug([`Disconnected normally from code ${code}`]);
return this.destroy({
code,
reason: 'Got disconnected by Discord',
Expand All @@ -487,7 +522,6 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}

case CloseCodes.Resuming: {
this.debug([`Disconnected normally from code ${code}`]);
break;
}

Expand Down