Skip to content

Commit

Permalink
feat: implement retry mechanism
Browse files Browse the repository at this point in the history
Syntax:

```js
const socket = io({
  retries: 3,
  ackTimeout: 10000
});

// "my-event" will be sent up to 4 times (1 + 3), until the server sends an acknowledgement
socket.emit("my-event", (err) => {});
```

Notes:

- the order of the packets is guaranteed, as we send packets one by one
- the same packet id is reused for consecutive retries, in order to
allow deduplication on the server side
  • Loading branch information
darrachequesne committed Feb 1, 2023
1 parent 9f32925 commit 655dce9
Show file tree
Hide file tree
Showing 4 changed files with 187 additions and 3 deletions.
111 changes: 108 additions & 3 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,29 @@ export interface SocketOptions {
/**
* the authentication payload sent when connecting to the Namespace
*/
auth: { [key: string]: any } | ((cb: (data: object) => void) => void);
auth?: { [key: string]: any } | ((cb: (data: object) => void) => void);
/**
* The maximum number of retries. Above the limit, the packet will be discarded.
*
* Using `Infinity` means the delivery guarantee is "at-least-once" (instead of "at-most-once" by default), but a
* smaller value like 10 should be sufficient in practice.
*/
retries?: number;
/**
* The default timeout in milliseconds used when waiting for an acknowledgement.
*/
ackTimeout?: number;
}

type QueuedPacket = {
id: number;
args: unknown[];
flags: Flags;
pending: boolean;
tryCount: number;
ack?: (err?: Error, ...args: unknown[]) => void;
};

/**
* Internal events.
* These events can't be emitted by the user.
Expand All @@ -76,6 +96,7 @@ interface Flags {
compress?: boolean;
volatile?: boolean;
timeout?: number;
withRetry?: boolean;
}

export type DisconnectDescription =
Expand Down Expand Up @@ -198,8 +219,16 @@ export class Socket<
* Buffer for packets that will be sent once the socket is connected
*/
public sendBuffer: Array<Packet> = [];
/**
* The queue of packets to be sent with retry in case of failure.
*
* Packets are sent one by one, each waiting for the server acknowledgement, in order to guarantee the delivery order.
* @private
*/
private _queue: Array<QueuedPacket> = [];

private readonly nsp: string;
private readonly _opts: SocketOptions;

private ids: number = 0;
private acks: object = {};
Expand All @@ -218,6 +247,7 @@ export class Socket<
if (opts && opts.auth) {
this.auth = opts.auth;
}
this._opts = Object.assign({}, opts);
if (this.io._autoConnect) this.open();
}

Expand Down Expand Up @@ -350,6 +380,24 @@ export class Socket<
}

args.unshift(ev);

if (this._opts.retries && !this.flags.withRetry && !this.flags.volatile) {
let ack;
if (typeof args[args.length - 1] === "function") {
ack = args.pop();
}
this._queue.push({
id: this.ids++,
tryCount: 0,
pending: false,
args,
ack,
flags: Object.assign({ withRetry: true }, this.flags),
});
this._drainQueue();
return this;
}

const packet: any = {
type: PacketType.EVENT,
data: args,
Expand Down Expand Up @@ -393,7 +441,7 @@ export class Socket<
* @private
*/
private _registerAckCallback(id: number, ack: Function) {
const timeout = this.flags.timeout;
const timeout = this.flags.timeout ?? this._opts.ackTimeout;
if (timeout === undefined) {
this.acks[id] = ack;
return;
Expand Down Expand Up @@ -440,7 +488,8 @@ export class Socket<
...args: AllButLast<EventParams<EmitEvents, Ev>>
): Promise<FirstArg<Last<EventParams<EmitEvents, Ev>>>> {
// the timeout flag is optional
const withErr = this.flags.timeout !== undefined;
const withErr =
this.flags.timeout !== undefined || this._opts.ackTimeout !== undefined;
return new Promise((resolve, reject) => {
args.push((arg1, arg2) => {
if (withErr) {
Expand All @@ -453,6 +502,62 @@ export class Socket<
});
}

/**
* Send the first packet of the queue, and wait for an acknowledgement from the server.
* @private
*/
private _drainQueue() {
debug("draining queue");
if (this._queue.length === 0) {
return;
}
const packet = this._queue[0];
if (packet.pending) {
debug(
"packet [%d] has already been sent and is waiting for an ack",
packet.id
);
return;
}
packet.pending = true;
packet.tryCount++;
debug("sending packet [%d] (try n°%d)", packet.id, packet.tryCount);
const currentId = this.ids;
this.ids = packet.id; // the same id is reused for consecutive retries, in order to allow deduplication on the server side
this.flags = packet.flags;
packet.args.push((err, ...responseArgs) => {
if (packet !== this._queue[0]) {
// the packet has already been acknowledged
return;
}
const hasError = err !== null;
if (hasError) {
if (packet.tryCount > this._opts.retries) {
debug(
"packet [%d] is discarded after %d tries",
packet.id,
packet.tryCount
);
this._queue.shift();
if (packet.ack) {
packet.ack(err);
}
}
} else {
debug("packet [%d] was successfully sent", packet.id);
this._queue.shift();
if (packet.ack) {
packet.ack(null, ...responseArgs);
}
}
packet.pending = false;
return this._drainQueue();
});

this.emit.apply(this, packet.args);
this.ids = currentId; // restore offset
}

/**
* Sends a packet.
*
Expand Down
1 change: 1 addition & 0 deletions test/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ import "./connection";
import "./socket";
import "./node";
import "./connection-state-recovery";
import "./retry";
65 changes: 65 additions & 0 deletions test/retry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import expect from "expect.js";
import { io } from "..";
import { wrap, BASE_URL, success } from "./support/util";

describe("retry", () => {
it("should preserve the order of the packets", () => {
return wrap((done) => {
const socket = io(BASE_URL, {
forceNew: true,
retries: 1,
ackTimeout: 50,
});

socket.emit("echo", 1, () => {
// @ts-ignore
expect(socket._queue.length).to.eql(2);
});

// @ts-ignore
expect(socket._queue.length).to.eql(1);

socket.emit("echo", 2, () => {
// @ts-ignore
expect(socket._queue.length).to.eql(1);
});

// @ts-ignore
expect(socket._queue.length).to.eql(2);

socket.emit("echo", 3, (err, val) => {
expect(err).to.be(null);
expect(val).to.eql(3);
// @ts-ignore
expect(socket._queue.length).to.eql(0);

success(done, socket);
});

// @ts-ignore
expect(socket._queue.length).to.eql(3);
});
});

it("should fail when the server does not acknowledge the packet", () => {
return wrap((done) => {
const socket = io(BASE_URL, {
forceNew: true,
retries: 3,
ackTimeout: 50,
});

let count = 0;

socket.emit("ack", () => {
expect(count).to.eql(4);

success(done, socket);
});

socket.on("ack", () => {
count++;
});
});
});
});
13 changes: 13 additions & 0 deletions test/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -618,5 +618,18 @@ describe("socket", () => {
}
});
});

it("should use the default value", () => {
return wrap((done) => {
const socket = io(BASE_URL + "/", {
ackTimeout: 50,
});

socket.emit("unknown", (err) => {
expect(err).to.be.an(Error);
success(done, socket);
});
});
});
});
});

0 comments on commit 655dce9

Please sign in to comment.