diff --git a/lib/socket.ts b/lib/socket.ts index a8f189f4..c4ee3f94 100644 --- a/lib/socket.ts +++ b/lib/socket.ts @@ -55,9 +55,29 @@ export interface SocketOptions { /** * the authentication payload sent when connecting to the Namespace */ - auth: { [key: string]: any } | ((cb: (data: object) => void) => void); + auth?: { [key: string]: any } | ((cb: (data: object) => void) => void); + /** + * The maximum number of retries. Above the limit, the packet will be discarded. + * + * Using `Infinity` means the delivery guarantee is "at-least-once" (instead of "at-most-once" by default), but a + * smaller value like 10 should be sufficient in practice. + */ + retries?: number; + /** + * The default timeout in milliseconds used when waiting for an acknowledgement. + */ + ackTimeout?: number; } +type QueuedPacket = { + id: number; + args: unknown[]; + flags: Flags; + pending: boolean; + tryCount: number; + ack?: (err?: Error, ...args: unknown[]) => void; +}; + /** * Internal events. * These events can't be emitted by the user. @@ -76,6 +96,7 @@ interface Flags { compress?: boolean; volatile?: boolean; timeout?: number; + withRetry?: boolean; } export type DisconnectDescription = @@ -198,8 +219,16 @@ export class Socket< * Buffer for packets that will be sent once the socket is connected */ public sendBuffer: Array = []; + /** + * The queue of packets to be sent with retry in case of failure. + * + * Packets are sent one by one, each waiting for the server acknowledgement, in order to guarantee the delivery order. + * @private + */ + private _queue: Array = []; private readonly nsp: string; + private readonly _opts: SocketOptions; private ids: number = 0; private acks: object = {}; @@ -218,6 +247,7 @@ export class Socket< if (opts && opts.auth) { this.auth = opts.auth; } + this._opts = Object.assign({}, opts); if (this.io._autoConnect) this.open(); } @@ -350,6 +380,24 @@ export class Socket< } args.unshift(ev); + + if (this._opts.retries && !this.flags.withRetry && !this.flags.volatile) { + let ack; + if (typeof args[args.length - 1] === "function") { + ack = args.pop(); + } + this._queue.push({ + id: this.ids++, + tryCount: 0, + pending: false, + args, + ack, + flags: Object.assign({ withRetry: true }, this.flags), + }); + this._drainQueue(); + return this; + } + const packet: any = { type: PacketType.EVENT, data: args, @@ -393,7 +441,7 @@ export class Socket< * @private */ private _registerAckCallback(id: number, ack: Function) { - const timeout = this.flags.timeout; + const timeout = this.flags.timeout ?? this._opts.ackTimeout; if (timeout === undefined) { this.acks[id] = ack; return; @@ -440,7 +488,8 @@ export class Socket< ...args: AllButLast> ): Promise>>> { // the timeout flag is optional - const withErr = this.flags.timeout !== undefined; + const withErr = + this.flags.timeout !== undefined || this._opts.ackTimeout !== undefined; return new Promise((resolve, reject) => { args.push((arg1, arg2) => { if (withErr) { @@ -453,6 +502,62 @@ export class Socket< }); } + /** + * Send the first packet of the queue, and wait for an acknowledgement from the server. + * @private + */ + private _drainQueue() { + debug("draining queue"); + if (this._queue.length === 0) { + return; + } + const packet = this._queue[0]; + if (packet.pending) { + debug( + "packet [%d] has already been sent and is waiting for an ack", + packet.id + ); + return; + } + packet.pending = true; + packet.tryCount++; + debug("sending packet [%d] (try n°%d)", packet.id, packet.tryCount); + const currentId = this.ids; + this.ids = packet.id; // the same id is reused for consecutive retries, in order to allow deduplication on the server side + this.flags = packet.flags; + packet.args.push((err, ...responseArgs) => { + if (packet !== this._queue[0]) { + // the packet has already been acknowledged + return; + } + const hasError = err !== null; + if (hasError) { + if (packet.tryCount > this._opts.retries) { + debug( + "packet [%d] is discarded after %d tries", + packet.id, + packet.tryCount + ); + this._queue.shift(); + if (packet.ack) { + packet.ack(err); + } + } + } else { + debug("packet [%d] was successfully sent", packet.id); + this._queue.shift(); + if (packet.ack) { + packet.ack(null, ...responseArgs); + } + } + packet.pending = false; + return this._drainQueue(); + }); + + this.emit.apply(this, packet.args); + this.ids = currentId; // restore offset + } + /** * Sends a packet. * diff --git a/test/index.ts b/test/index.ts index 69d0d8d9..355c840c 100644 --- a/test/index.ts +++ b/test/index.ts @@ -3,3 +3,4 @@ import "./connection"; import "./socket"; import "./node"; import "./connection-state-recovery"; +import "./retry"; diff --git a/test/retry.ts b/test/retry.ts new file mode 100644 index 00000000..183e0caa --- /dev/null +++ b/test/retry.ts @@ -0,0 +1,65 @@ +import expect from "expect.js"; +import { io } from ".."; +import { wrap, BASE_URL, success } from "./support/util"; + +describe("retry", () => { + it("should preserve the order of the packets", () => { + return wrap((done) => { + const socket = io(BASE_URL, { + forceNew: true, + retries: 1, + ackTimeout: 50, + }); + + socket.emit("echo", 1, () => { + // @ts-ignore + expect(socket._queue.length).to.eql(2); + }); + + // @ts-ignore + expect(socket._queue.length).to.eql(1); + + socket.emit("echo", 2, () => { + // @ts-ignore + expect(socket._queue.length).to.eql(1); + }); + + // @ts-ignore + expect(socket._queue.length).to.eql(2); + + socket.emit("echo", 3, (err, val) => { + expect(err).to.be(null); + expect(val).to.eql(3); + // @ts-ignore + expect(socket._queue.length).to.eql(0); + + success(done, socket); + }); + + // @ts-ignore + expect(socket._queue.length).to.eql(3); + }); + }); + + it("should fail when the server does not acknowledge the packet", () => { + return wrap((done) => { + const socket = io(BASE_URL, { + forceNew: true, + retries: 3, + ackTimeout: 50, + }); + + let count = 0; + + socket.emit("ack", () => { + expect(count).to.eql(4); + + success(done, socket); + }); + + socket.on("ack", () => { + count++; + }); + }); + }); +}); diff --git a/test/socket.ts b/test/socket.ts index e4c3566e..d427048d 100644 --- a/test/socket.ts +++ b/test/socket.ts @@ -618,5 +618,18 @@ describe("socket", () => { } }); }); + + it("should use the default value", () => { + return wrap((done) => { + const socket = io(BASE_URL + "/", { + ackTimeout: 50, + }); + + socket.emit("unknown", (err) => { + expect(err).to.be.an(Error); + success(done, socket); + }); + }); + }); }); });