Skip to content

Commit

Permalink
test: add more tests for the retry mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
darrachequesne committed Feb 4, 2023
1 parent 0110e46 commit c54e09d
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 38 deletions.
87 changes: 49 additions & 38 deletions lib/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ type QueuedPacket = {
flags: Flags;
pending: boolean;
tryCount: number;
ack?: (err?: Error, ...args: unknown[]) => void;
};

/**
Expand Down Expand Up @@ -382,19 +381,7 @@ export class Socket<
args.unshift(ev);

if (this._opts.retries && !this.flags.fromQueue && !this.flags.volatile) {
let ack;
if (typeof args[args.length - 1] === "function") {
ack = args.pop();
}
this._queue.push({
id: this.ids++,
tryCount: 0,
pending: false,
args,
ack,
flags: Object.assign({ fromQueue: true }, this.flags),
});
this._drainQueue();
this._addToQueue(args);
return this;
}

Expand Down Expand Up @@ -503,29 +490,25 @@ export class Socket<
}

/**
* Send the first packet of the queue, and wait for an acknowledgement from the server.
* Add the packet to the queue.
* @param args
* @private
*/
private _drainQueue() {
debug("draining queue");
if (this._queue.length === 0) {
return;
private _addToQueue(args: unknown[]) {
let ack;
if (typeof args[args.length - 1] === "function") {
ack = args.pop();
}
const packet = this._queue[0];
if (packet.pending) {
debug(
"packet [%d] has already been sent and is waiting for an ack",
packet.id
);
return;
}
packet.pending = true;
packet.tryCount++;
debug("sending packet [%d] (try n°%d)", packet.id, packet.tryCount);
const currentId = this.ids;
this.ids = packet.id; // the same id is reused for consecutive retries, in order to allow deduplication on the server side
this.flags = packet.flags;
packet.args.push((err, ...responseArgs) => {

const packet = {
id: this.ids++,
tryCount: 0,
pending: false,
args,
flags: Object.assign({ fromQueue: true }, this.flags),
};

args.push((err, ...responseArgs) => {
if (packet !== this._queue[0]) {
// the packet has already been acknowledged
return;
Expand All @@ -539,21 +522,49 @@ export class Socket<
packet.tryCount
);
this._queue.shift();
if (packet.ack) {
packet.ack(err);
if (ack) {
ack(err);
}
}
} else {
debug("packet [%d] was successfully sent", packet.id);
this._queue.shift();
if (packet.ack) {
packet.ack(null, ...responseArgs);
if (ack) {
ack(null, ...responseArgs);
}
}
packet.pending = false;
return this._drainQueue();
});

this._queue.push(packet);
this._drainQueue();
}

/**
* Send the first packet of the queue, and wait for an acknowledgement from the server.
* @private
*/
private _drainQueue() {
debug("draining queue");
if (this._queue.length === 0) {
return;
}
const packet = this._queue[0];
if (packet.pending) {
debug(
"packet [%d] has already been sent and is waiting for an ack",
packet.id
);
return;
}
packet.pending = true;
packet.tryCount++;
debug("sending packet [%d] (try n°%d)", packet.id, packet.tryCount);
const currentId = this.ids;
this.ids = packet.id; // the same id is reused for consecutive retries, in order to allow deduplication on the server side
this.flags = packet.flags;

this.emit.apply(this, packet.args);
this.ids = currentId; // restore offset
}
Expand Down
27 changes: 27 additions & 0 deletions test/retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,19 @@ describe("retry", () => {
ackTimeout: 50,
});

let i = 0;
const expected = [
"0",
'20["echo",1]',
'21["echo",2]',
'22["echo",3]',
"1",
];

socket.io.engine.on("packetCreate", ({ data }) => {
expect(data).to.eql(expected[i++]);
});

socket.emit("echo", 1, () => {
// @ts-ignore
expect(socket._queue.length).to.eql(2);
Expand Down Expand Up @@ -51,6 +64,20 @@ describe("retry", () => {

let count = 0;

let i = 0;
const expected = [
"0",
'20["ack"]',
'20["ack"]',
'20["ack"]',
'20["ack"]',
"1",
];

socket.io.engine.on("packetCreate", ({ data }) => {
expect(data).to.eql(expected[i++]);
});

socket.emit("ack", () => {
expect(count).to.eql(4);

Expand Down

0 comments on commit c54e09d

Please sign in to comment.