Skip to content

Commit

Permalink
Messages sent out of order after one message fails (#3131)
Browse files Browse the repository at this point in the history
* Instead of skipping, bail out by clearing queue
* Allow additional status transition for events from QUEUED to NOT_SENT
  • Loading branch information
justjanne authored Feb 8, 2023
1 parent 16672b3 commit 5cf0bb4
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 22 deletions.
18 changes: 7 additions & 11 deletions spec/unit/scheduler.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ describe("MatrixScheduler", function () {
expect(procCount).toEqual(2);
});

it("should give up if the retryFn on failure returns -1 and try the next event", async function () {
it("should give up if the retryFn on failure returns -1", async function () {
// Queue A & B.
// Reject A and return -1 on retry.
// Expect B to be tried next and the promise for A to be rejected.
Expand All @@ -139,19 +139,15 @@ describe("MatrixScheduler", function () {
return new Promise<Record<string, boolean>>(() => {});
});

const globalA = scheduler.queueEvent(eventA);
scheduler.queueEvent(eventB);
const queuedA = scheduler.queueEvent(eventA);
const queuedB = scheduler.queueEvent(eventB);
await Promise.resolve();
deferA.reject(new Error("Testerror"));
// as queueing doesn't start processing synchronously anymore (see commit bbdb5ac)
// wait just long enough before it does
await Promise.resolve();
await expect(queuedA).rejects.toThrow("Testerror");
await expect(queuedB).rejects.toThrow("Testerror");
expect(procCount).toEqual(1);
deferA.reject({});
try {
await globalA;
} catch (err) {
await Promise.resolve();
expect(procCount).toEqual(2);
}
});

it("should treat each queue separately", function (done) {
Expand Down
2 changes: 1 addition & 1 deletion src/models/room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3380,7 +3380,7 @@ export class Room extends ReadReceipt<RoomEmittedEvents, RoomEventHandlerMap> {
const ALLOWED_TRANSITIONS: Record<EventStatus, EventStatus[]> = {
[EventStatus.ENCRYPTING]: [EventStatus.SENDING, EventStatus.NOT_SENT, EventStatus.CANCELLED],
[EventStatus.SENDING]: [EventStatus.ENCRYPTING, EventStatus.QUEUED, EventStatus.NOT_SENT, EventStatus.SENT],
[EventStatus.QUEUED]: [EventStatus.SENDING, EventStatus.CANCELLED],
[EventStatus.QUEUED]: [EventStatus.SENDING, EventStatus.NOT_SENT, EventStatus.CANCELLED],
[EventStatus.SENT]: [],
[EventStatus.NOT_SENT]: [EventStatus.SENDING, EventStatus.QUEUED, EventStatus.CANCELLED],
[EventStatus.CANCELLED]: [],
Expand Down
30 changes: 20 additions & 10 deletions src/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,7 @@ export class MatrixScheduler<T = ISendEventResponse> {
// get head of queue
const obj = this.peekNextEvent(queueName);
if (!obj) {
// queue is empty. Mark as inactive and stop recursing.
const index = this.activeQueues.indexOf(queueName);
if (index >= 0) {
this.activeQueues.splice(index, 1);
}
debuglog("Stopping queue '%s' as it is now empty", queueName);
this.disableQueue(queueName);
return;
}
debuglog("Queue '%s' has %s pending events", queueName, this.queues[queueName].length);
Expand Down Expand Up @@ -289,17 +284,32 @@ export class MatrixScheduler<T = ISendEventResponse> {
// give up (you quitter!)
debuglog("Queue '%s' giving up on event %s", queueName, obj.event.getId());
// remove this from the queue
this.removeNextEvent(queueName);
obj.defer.reject(err);
// process next event
this.processQueue(queueName);
this.clearQueue(queueName, err);
} else {
setTimeout(this.processQueue, waitTimeMs, queueName);
}
},
);
};

private disableQueue(queueName: string): void {
// queue is empty. Mark as inactive and stop recursing.
const index = this.activeQueues.indexOf(queueName);
if (index >= 0) {
this.activeQueues.splice(index, 1);
}
debuglog("Stopping queue '%s' as it is now empty", queueName);
}

private clearQueue(queueName: string, err: unknown): void {
debuglog("clearing queue '%s'", queueName);
let obj: IQueueEntry<T> | undefined;
while ((obj = this.removeNextEvent(queueName))) {
obj.defer.reject(err);
}
this.disableQueue(queueName);
}

private peekNextEvent(queueName: string): IQueueEntry<T> | undefined {
const queue = this.queues[queueName];
if (!Array.isArray(queue)) {
Expand Down

0 comments on commit 5cf0bb4

Please sign in to comment.