From d8d87e512063bfdcb8b17a0d3e07dedfcbb54061 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 28 Jul 2022 16:07:12 +0100 Subject: [PATCH 01/10] Retry to-device messages This adds a queueToDevice API alongside sendToDevice which is a much higher-level API that adds the messages to a queue, stored in persistent storage, and retries them periodically. Also converts sending of megolm keys to use the new API. Other uses of sendToDevice are nopt converted in this PR, but could be later. Requires https://github.com/matrix-org/matrix-mock-request/pull/17 --- spec/unit/queueToDevice.spec.ts | 339 ++++++++++++++++++++++++++ src/ToDeviceMessageQueue.ts | 122 +++++++++ src/client.ts | 35 ++- src/crypto/algorithms/megolm.ts | 64 +++-- src/models/ToDeviceMessage.ts | 38 +++ src/scheduler.ts | 2 +- src/store/index.ts | 75 ++++-- src/store/indexeddb-backend.ts | 4 + src/store/indexeddb-local-backend.ts | 44 +++- src/store/indexeddb-remote-backend.ts | 13 + src/store/indexeddb-store-worker.ts | 9 + src/store/indexeddb.ts | 13 + src/store/memory.ts | 25 ++ src/store/stub.ts | 13 + 14 files changed, 735 insertions(+), 61 deletions(-) create mode 100644 spec/unit/queueToDevice.spec.ts create mode 100644 src/ToDeviceMessageQueue.ts create mode 100644 src/models/ToDeviceMessage.ts diff --git a/spec/unit/queueToDevice.spec.ts b/spec/unit/queueToDevice.spec.ts new file mode 100644 index 00000000000..db904ff1644 --- /dev/null +++ b/spec/unit/queueToDevice.spec.ts @@ -0,0 +1,339 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import MockHttpBackend from 'matrix-mock-request'; +import { indexedDB as fakeIndexedDB } from 'fake-indexeddb'; + +import { IHttpOpts, IndexedDBStore, MatrixEvent, MemoryStore, Room } from "../../src"; +import { MatrixClient } from "../../src/client"; +import { ToDeviceBatch } from '../../src/models/ToDeviceMessage'; +import { logger } from '../../src/logger'; + +const FAKE_USER = "@alice:example.org"; +const FAKE_DEVICE_ID = "AAAAAAAA"; +const FAKE_PAYLOAD = { + "foo": 42, +}; +const EXPECTED_BODY = { + messages: { + [FAKE_USER]: { + [FAKE_DEVICE_ID]: FAKE_PAYLOAD, + }, + }, +}; + +const FAKE_MSG = { + userId: FAKE_USER, + deviceId: FAKE_DEVICE_ID, + payload: FAKE_PAYLOAD, +}; + +enum StoreType { + Memory = 'Memory', + IndexedDB = 'IndexedDB', +} + +// Jest now uses @sinonjs/fake-timers which exposes tickAsync() and a number of +// other async methods which break the event loop, letting scheduled promise +// callbacks run. Unfortunately, Jest doesn't expose these, so we have to do +// it manually (this is what sinon does under the hood). We do both in a loop +// until the thing we expect happens: hopefully this is the least flakey way +// and avoids assuming anything about the app's behaviour. +const realSetTimeout = setTimeout; +function flushPromises() { + return new Promise(r => { + realSetTimeout(r, 1); + }); +} + +async function flushAndRunTimersUntil(cond: () => boolean) { + while (!cond()) { + await flushPromises(); + if (cond()) break; + jest.advanceTimersToNextTimer(); + } +} + +describe.each([ + [StoreType.Memory], [StoreType.IndexedDB], +])("queueToDevice (%s store)", function(storeType) { + let httpBackend: MockHttpBackend; + let client: MatrixClient; + + beforeEach(function() { + httpBackend = new MockHttpBackend(); + + const store = storeType === StoreType.IndexedDB ? + new IndexedDBStore({ indexedDB: fakeIndexedDB }) : new MemoryStore(); + + client = new MatrixClient({ + baseUrl: "https://my.home.server", + accessToken: "my.access.token", + request: httpBackend.requestFn as IHttpOpts["request"], + store, + }); + }); + + afterEach(function() { + jest.useRealTimers(); + }); + + it("sends a to-device message", async function() { + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).check((request) => { + expect(request.data).toEqual(EXPECTED_BODY); + }).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + + await httpBackend.flushAllExpected(); + }); + + it("retries on error", async function() { + jest.useFakeTimers(); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).check((request) => { + expect(request.data).toEqual(EXPECTED_BODY); + }).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + expect(httpBackend.flushSync(null, 1)).toEqual(1); + + await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); + + expect(httpBackend.flushSync(null, 1)).toEqual(1); + }); + + it("stops retrying on 4xx errors", async function() { + jest.useFakeTimers(); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(400); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + expect(httpBackend.flushSync(null, 1)).toEqual(1); + + // Asserting that another request is never made is obviously + // a bit tricky - we just flush the queue what should hopefully + // be plenty of times and assert that nothing comes through. + let tries = 0; + await flushAndRunTimersUntil(() => ++tries === 10); + + expect(httpBackend.requests.length).toEqual(0); + }); + + it("honours ratelimiting", async function() { + jest.useFakeTimers(); + + // pick something obscure enough it's unlikley to clash with a + // retry delay the algorithm uses anyway + const retryDelay = 279 * 1000; + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(429, { + errcode: "M_LIMIT_EXCEEDED", + retry_after_ms: retryDelay, + }); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + expect(httpBackend.flushSync(null, 1)).toEqual(1); + await flushPromises(); + + logger.info("Advancing clock to just before expected retry time..."); + + jest.advanceTimersByTime(retryDelay - 1000); + await flushPromises(); + + expect(httpBackend.requests.length).toEqual(0); + + logger.info("Advancing clock past expected retry time..."); + + jest.advanceTimersByTime(2000); + await flushPromises(); + + expect(httpBackend.flushSync(null, 1)).toEqual(1); + }); + + it("retries on retryImmediately()", async function() { + httpBackend.when("GET", "/_matrix/client/versions").respond(200, { + versions: ["r0.0.1"], + }); + + await Promise.all([client.startClient(), httpBackend.flush(null, 1, 20)]); + + try { + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + expect(await httpBackend.flush(null, 1, 1)).toEqual(1); + await flushPromises(); + + client.retryImmediately(); + + expect(await httpBackend.flush(null, 1, 20)).toEqual(1); + } finally { + client.stopClient(); + } + }); + + it("retries on when client is started", async function() { + httpBackend.when("GET", "/_matrix/client/versions").respond(200, { + versions: ["r0.0.1"], + }); + + await Promise.all([client.startClient(), httpBackend.flush("/_matrix/client/versions", 1, 20)]); + + try { + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + expect(await httpBackend.flush(null, 1, 1)).toEqual(1); + await flushPromises(); + + client.stopClient(); + await Promise.all([client.startClient(), httpBackend.flush("/_matrix/client/versions", 1, 20)]); + + expect(await httpBackend.flush(null, 1, 20)).toEqual(1); + } finally { + client.stopClient(); + } + }); + + it("retries when a message is retried", async function() { + httpBackend.when("GET", "/_matrix/client/versions").respond(200, { + versions: ["r0.0.1"], + }); + + await Promise.all([client.startClient(), httpBackend.flush(null, 1, 20)]); + + try { + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + + expect(await httpBackend.flush(null, 1, 1)).toEqual(1); + await flushPromises(); + + const dummyEvent = new MatrixEvent({ + event_id: "!fake:example.org", + }); + const mockRoom = { + updatePendingEvent: jest.fn(), + } as unknown as Room; + client.resendEvent(dummyEvent, mockRoom); + + expect(await httpBackend.flush(null, 1, 20)).toEqual(1); + } finally { + client.stopClient(); + } + }); + + it("splits many messages into multiple HTTP requests", async function() { + const batch: ToDeviceBatch = { + eventType: "org.example.foo", + batch: [], + }; + + for (let i = 0; i <= 20; ++i) { + batch.batch.push({ + userId: `@user${i}:example.org`, + deviceId: FAKE_DEVICE_ID, + payload: FAKE_PAYLOAD, + }); + } + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).check((request) => { + expect(Object.keys(request.data.messages).length).toEqual(20); + }).respond(200, {}); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).check((request) => { + expect(Object.keys(request.data.messages).length).toEqual(1); + }).respond(200, {}); + + await client.queueToDevice(batch); + await httpBackend.flushAllExpected(); + }); +}); diff --git a/src/ToDeviceMessageQueue.ts b/src/ToDeviceMessageQueue.ts new file mode 100644 index 00000000000..644cca156d7 --- /dev/null +++ b/src/ToDeviceMessageQueue.ts @@ -0,0 +1,122 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { logger } from "./logger"; +import { MatrixClient } from "./matrix"; +import { IndexedToDeviceBatch, ToDeviceBatch, ToDeviceBatchWithTxnId, ToDevicePayload } from "./models/ToDeviceMessage"; +import { MatrixScheduler } from "./scheduler"; + +const MAX_BATCH_SIZE = 20; + +/** + * Maintains a queue of outgoing to-device messages, sending them + * as soon as the homeserver is reachable. + */ +export class ToDeviceMessageQueue { + private sending = false; + private running = true; + private retryTimeout: number = null; + private retryAttempts = 0; + + constructor(private client: MatrixClient) { + } + + public start() { + this.running = true; + this.sendQueue(); + } + + public stop() { + this.running = false; + if (this.retryTimeout !== null) clearTimeout(this.retryTimeout); + this.retryTimeout = null; + } + + public async queueBatch(batch: ToDeviceBatch) { + const batches: ToDeviceBatchWithTxnId[] = []; + for (let i = 0; i < batch.batch.length; i += MAX_BATCH_SIZE) { + batches.push({ + eventType: batch.eventType, + batch: batch.batch.slice(i, i + 20), + txnId: this.client.makeTxnId(), + }); + } + + await this.client.store.saveToDeviceBatches(batches); + this.sendQueue(); + } + + public sendQueue = async () => { + if (this.retryTimeout !== null) clearTimeout(this.retryTimeout); + this.retryTimeout = null; + + if (this.sending || !this.running) return; + + logger.debug("Attempting to send queued to-device messages"); + + this.sending = true; + let headBatch; + try { + while (this.running) { + headBatch = await this.client.store.getOldestToDeviceBatch(); + if (headBatch === null) break; + await this.sendBatch(headBatch); + await this.client.store.removeToDeviceBatch(headBatch.id); + this.retryAttempts = 0; + } + + logger.debug("All queued to-device messages sent"); + } catch (e) { + ++this.retryAttempts; + // eslint-disable-next-line @typescript-eslint/naming-convention + // eslint-disable-next-line new-cap + const retryDelay = MatrixScheduler.RETRY_BACKOFF_RATELIMIT(null, this.retryAttempts, e); + if (retryDelay === -1) { + // the scheduler function doesn't differentiate between fatal errors and just getting + // bored and giving up for now + if (Math.floor(e.httpStatus / 100) === 4) { + logger.error("Fatal error when sending to-device message - dropping to-device batch!", e); + await this.client.store.removeToDeviceBatch(headBatch.id); + } else { + logger.info("Automatic retry limit reached for to-device messages."); + } + return; + } + + logger.info(`Failed to send batch of to-device messages. Will retry in ${retryDelay}ms`, e); + this.retryTimeout = setTimeout(this.sendQueue, retryDelay); + } finally { + this.sending = false; + } + }; + + /** + * Attempts to send a batch of to-device messages. + */ + private async sendBatch(batch: IndexedToDeviceBatch): Promise { + const contentMap: Record> = {}; + for (const item of batch.batch) { + if (!contentMap[item.userId]) { + contentMap[item.userId] = {}; + } + contentMap[item.userId][item.deviceId] = item.payload; + } + + logger.info(`Sending batch of ${batch.batch.length} to-device messages with ID ${batch.id}`); + + await this.client.sendToDevice(batch.eventType, contentMap, batch.txnId); + } +} diff --git a/src/client.ts b/src/client.ts index 73ca5144ef0..69c8ffdc051 100644 --- a/src/client.ts +++ b/src/client.ts @@ -194,6 +194,8 @@ import { MSC3575SlidingSyncRequest, MSC3575SlidingSyncResponse, SlidingSync } fr import { SlidingSyncSdk } from "./sliding-sync-sdk"; import { Thread, THREAD_RELATION_TYPE } from "./models/thread"; import { MBeaconInfoEventContent, M_BEACON_INFO } from "./@types/beacon"; +import { ToDeviceMessageQueue } from "./ToDeviceMessageQueue"; +import { ToDeviceBatch } from "./models/ToDeviceMessage"; export type Store = IStore; @@ -939,6 +941,8 @@ export class MatrixClient extends TypedEventEmitter>(); + private toDeviceMessageQueue: ToDeviceMessageQueue; + constructor(opts: IMatrixClientCreateOpts) { super(); @@ -1033,6 +1037,8 @@ export class MatrixClient extends TypedEventEmitterexplicitly attempts to retry their lost connection. + * Will also retry any outbound to-device messages currently in the queue to be sent + * (retries of regular outgoing events are handled separately, per-event). * @return {boolean} True if this resulted in a request being retried. */ public retryImmediately(): boolean { + // don't await for this promise: we just want to kick it off + this.toDeviceMessageQueue.sendQueue(); return this.syncApi.retryImmediately(); } @@ -3500,7 +3514,7 @@ export class MatrixClient extends TypedEventEmitter { + // also kick the to-device queue to retry + this.toDeviceMessageQueue.sendQueue(); + this.updatePendingEventStatus(room, event, EventStatus.SENDING); return this.encryptAndSendEvent(room, event); } @@ -8694,7 +8711,10 @@ export class MatrixClient extends TypedEventEmitter>} contentMap @@ -8726,6 +8746,17 @@ export class MatrixClient extends TypedEventEmitter { + return this.toDeviceMessageQueue.queueBatch(batch); + } + /** * Get the third party protocols that can be reached using * this HS diff --git a/src/crypto/algorithms/megolm.ts b/src/crypto/algorithms/megolm.ts index c16187fd622..9233c4debb9 100644 --- a/src/crypto/algorithms/megolm.ts +++ b/src/crypto/algorithms/megolm.ts @@ -22,6 +22,7 @@ limitations under the License. import { logger } from '../../logger'; import * as olmlib from "../olmlib"; +import { EventType } from '../../@types/event'; import { DecryptionAlgorithm, DecryptionError, @@ -37,6 +38,7 @@ import { IOlmSessionResult } from "../olmlib"; import { DeviceInfoMap } from "../DeviceList"; import { MatrixEvent } from "../.."; import { IEventDecryptionResult, IMegolmSessionData, IncomingRoomKeyRequest } from "../index"; +import { ToDeviceBatch, ToDeviceMessage } from '../../models/ToDeviceMessage'; // determine whether the key can be shared with invitees export function isRoomSharedHistory(room: Room): boolean { @@ -609,7 +611,11 @@ class MegolmEncryption extends EncryptionAlgorithm { userDeviceMap: IOlmDevice[], payload: IPayload, ): Promise { - const contentMap: Record> = {}; + const toDeviceBatch: ToDeviceBatch = { + eventType: EventType.RoomMessageEncrypted, + batch: [], + }; + // Map from userId to a map of deviceId to deviceInfo const deviceInfoByUserIdAndDeviceId = new Map>(); @@ -637,10 +643,11 @@ class MegolmEncryption extends EncryptionAlgorithm { // We hold by reference, this updates deviceInfoByUserIdAndDeviceId[userId] userIdDeviceInfo.set(deviceId, deviceInfo); - if (!contentMap[userId]) { - contentMap[userId] = {}; - } - contentMap[userId][deviceId] = encryptedContent; + toDeviceBatch.batch.push({ + userId, + deviceId, + payload: encryptedContent, + }); promises.push( olmlib.encryptMessageForDevice( @@ -660,40 +667,29 @@ class MegolmEncryption extends EncryptionAlgorithm { // in which case it will have just not added anything to the ciphertext object. // There's no point sending messages to devices if we couldn't encrypt to them, // since that's effectively a blank message. - for (const userId of Object.keys(contentMap)) { - for (const deviceId of Object.keys(contentMap[userId])) { - if (Object.keys(contentMap[userId][deviceId].ciphertext).length === 0) { - logger.log( - "No ciphertext for device " + - userId + ":" + deviceId + ": pruning", - ); - delete contentMap[userId][deviceId]; - } - } - // No devices left for that user? Strip that too. - if (Object.keys(contentMap[userId]).length === 0) { - logger.log("Pruned all devices for user " + userId); - delete contentMap[userId]; + const prunedBatch: ToDeviceMessage[] = []; + for (const msg of toDeviceBatch.batch) { + if (Object.keys(msg.payload.ciphertext).length > 0) { + prunedBatch.push(msg); + } else { + logger.log( + "No ciphertext for device " + + msg.userId + ":" + msg.deviceId + ": pruning", + ); } } - // Is there anything left? - if (Object.keys(contentMap).length === 0) { - logger.log("No users left to send to: aborting"); - return; - } + toDeviceBatch.batch = prunedBatch; - return this.baseApis.sendToDevice("m.room.encrypted", contentMap).then(() => { + return this.baseApis.queueToDevice(toDeviceBatch).then(() => { // store that we successfully uploaded the keys of the current slice - for (const userId of Object.keys(contentMap)) { - for (const deviceId of Object.keys(contentMap[userId])) { - session.markSharedWithDevice( - userId, - deviceId, - deviceInfoByUserIdAndDeviceId.get(userId).get(deviceId).getIdentityKey(), - chainIndex, - ); - } + for (const msg of toDeviceBatch.batch) { + session.markSharedWithDevice( + msg.userId, + msg.deviceId, + deviceInfoByUserIdAndDeviceId.get(msg.userId).get(msg.deviceId).getIdentityKey(), + chainIndex, + ); } }); }); diff --git a/src/models/ToDeviceMessage.ts b/src/models/ToDeviceMessage.ts new file mode 100644 index 00000000000..8efc3ed4e31 --- /dev/null +++ b/src/models/ToDeviceMessage.ts @@ -0,0 +1,38 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +export type ToDevicePayload = Record; + +export interface ToDeviceMessage { + userId: string; + deviceId: string; + payload: ToDevicePayload; +} + +export interface ToDeviceBatch { + eventType: string; + batch: ToDeviceMessage[]; +} + +// Only used internally +export interface ToDeviceBatchWithTxnId extends ToDeviceBatch { + txnId: string; +} + +// Only used internally +export interface IndexedToDeviceBatch extends ToDeviceBatchWithTxnId { + id: number; +} diff --git a/src/scheduler.ts b/src/scheduler.ts index d0249b6cc02..271982b745a 100644 --- a/src/scheduler.ts +++ b/src/scheduler.ts @@ -57,7 +57,7 @@ export class MatrixScheduler { * failure was due to a rate limited request, the time specified in the error is * waited before being retried. * @param {MatrixEvent} event - * @param {Number} attempts + * @param {Number} attempts Number of attempts that have been made, including the one that just failed (ie. starting at 1) * @param {MatrixError} err * @return {Number} * @see module:scheduler~retryAlgorithm diff --git a/src/store/index.ts b/src/store/index.ts index 3f4a0dadeb7..ee3137cc57d 100644 --- a/src/store/index.ts +++ b/src/store/index.ts @@ -23,6 +23,7 @@ import { RoomSummary } from "../models/room-summary"; import { IMinimalEvent, IRooms, ISyncResponse } from "../sync-accumulator"; import { IStartClientOpts } from "../client"; import { IStateEventWithRoomId } from "../@types/search"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; export interface ISavedSync { nextBatch: string; @@ -31,8 +32,7 @@ export interface ISavedSync { } /** - * Construct a stub store. This does no-ops on most store methods. - * @constructor + * A store for most of the data js-sdk needs to store, apart from crypto data */ export interface IStore { readonly accountData: Record; // type : content @@ -57,21 +57,21 @@ export interface IStore { setSyncToken(token: string): void; /** - * No-op. - * @param {Room} room + * Store the given room. + * @param {Room} room The room to be stored. All properties must be stored. */ storeRoom(room: Room): void; /** - * No-op. - * @param {string} roomId - * @return {null} + * Retrieve a room by its' room ID. + * @param {string} roomId The room ID. + * @return {Room} The room or null. */ getRoom(roomId: string): Room | null; /** - * No-op. - * @return {Array} An empty array. + * Retrieve all known rooms. + * @return {Room[]} A list of rooms, which may be empty. */ getRooms(): Room[]; @@ -82,35 +82,36 @@ export interface IStore { removeRoom(roomId: string): void; /** - * No-op. - * @return {Array} An empty array. + * Retrieve a summary of all the rooms. + * @return {RoomSummary[]} A summary of each room. */ getRoomSummaries(): RoomSummary[]; /** - * No-op. - * @param {User} user + * Store a User. + * @param {User} user The user to store. */ storeUser(user: User): void; /** - * No-op. - * @param {string} userId - * @return {null} + * Retrieve a User by its' user ID. + * @param {string} userId The user ID. + * @return {User} The user or null. */ getUser(userId: string): User | null; /** - * No-op. - * @return {User[]} + * Retrieve all known users. + * @return {User[]} A list of users, which may be empty. */ getUsers(): User[]; /** - * No-op. - * @param {Room} room - * @param {number} limit - * @return {Array} + * Retrieve scrollback for this room. + * @param {Room} room The matrix room + * @param {number} limit The max number of old events to retrieve. + * @return {Array} An array of objects which will be at most 'limit' + * length and at least 0. The objects are the raw event JSON. */ scrollback(room: Room, limit: number): MatrixEvent[]; @@ -209,8 +210,23 @@ export interface IStore { */ deleteAllData(): Promise; + /** + * Returns the out-of-band membership events for this room that + * were previously loaded. + * @param {string} roomId + * @returns {event[]} the events, potentially an empty array if OOB loading didn't yield any new members + * @returns {null} in case the members for this room haven't been stored yet + */ getOutOfBandMembers(roomId: string): Promise; + /** + * Stores the out-of-band membership events for this room. Note that + * it still makes sense to store an empty array as the OOB status for the room is + * marked as fetched, and getOutOfBandMembers will return an empty array instead of null + * @param {string} roomId + * @param {event[]} membershipEvents the membership events to store + * @returns {Promise} when all members have been stored + */ setOutOfBandMembers(roomId: string, membershipEvents: IStateEventWithRoomId[]): Promise; clearOutOfBandMembers(roomId: string): Promise; @@ -222,4 +238,19 @@ export interface IStore { getPendingEvents(roomId: string): Promise[]>; setPendingEvents(roomId: string, events: Partial[]): Promise; + + /** + * Stores batches of outgoing to-device messages + */ + saveToDeviceBatches(batch: ToDeviceBatchWithTxnId[]): Promise; + + /** + * Fetches the oldest batch of to-device messages in the queue + */ + getOldestToDeviceBatch(): Promise; + + /** + * Removes a specific batch of to-device messages from the queue + */ + removeToDeviceBatch(id: number): Promise; } diff --git a/src/store/indexeddb-backend.ts b/src/store/indexeddb-backend.ts index 83470d72a19..93d1cb3ab19 100644 --- a/src/store/indexeddb-backend.ts +++ b/src/store/indexeddb-backend.ts @@ -16,6 +16,7 @@ limitations under the License. import { ISavedSync } from "./index"; import { IEvent, IStartClientOpts, IStateEventWithRoomId, ISyncResponse } from ".."; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; export interface IIndexedDBBackend { connect(): Promise; @@ -31,6 +32,9 @@ export interface IIndexedDBBackend { getUserPresenceEvents(): Promise; getClientOptions(): Promise; storeClientOptions(options: IStartClientOpts): Promise; + saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise; + getOldestToDeviceBatch(): Promise; + removeToDeviceBatch(id: number): Promise; } export type UserTuple = [userId: string, presenceEvent: Partial]; diff --git a/src/store/indexeddb-local-backend.ts b/src/store/indexeddb-local-backend.ts index 178931ff857..d16afa5c1fb 100644 --- a/src/store/indexeddb-local-backend.ts +++ b/src/store/indexeddb-local-backend.ts @@ -21,8 +21,9 @@ import { logger } from '../logger'; import { IStartClientOpts, IStateEventWithRoomId } from ".."; import { ISavedSync } from "./index"; import { IIndexedDBBackend, UserTuple } from "./indexeddb-backend"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; -const VERSION = 3; +const VERSION = 4; function createDatabase(db: IDBDatabase): void { // Make user store, clobber based on user ID. (userId property of User objects) @@ -49,6 +50,10 @@ function upgradeSchemaV3(db: IDBDatabase): void { { keyPath: ["clobber"] }); } +function upgradeSchemaV4(db: IDBDatabase): void { + db.createObjectStore("to_device_queue", { autoIncrement: true }); +} + /** * Helper method to collect results from a Cursor and promiseify it. * @param {ObjectStore|Index} store The store to perform openCursor on. @@ -112,7 +117,7 @@ function reqAsPromise(req: IDBRequest): Promise { }); } -function reqAsCursorPromise(req: IDBRequest): Promise { +function reqAsCursorPromise(req: IDBRequest): Promise { return reqAsEventPromise(req).then((event) => req.result); } @@ -177,6 +182,9 @@ export class LocalIndexedDBStoreBackend implements IIndexedDBBackend { if (oldVersion < 3) { upgradeSchemaV3(db); } + if (oldVersion < 4) { + upgradeSchemaV4(db); + } // Expand as needed. }; @@ -561,4 +569,36 @@ export class LocalIndexedDBStoreBackend implements IIndexedDBBackend { }); // put == UPSERT await txnAsPromise(txn); } + + public async saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise { + const txn = this.db.transaction(["to_device_queue"], "readwrite"); + const store = txn.objectStore("to_device_queue"); + for (const batch of batches) { + store.add(batch); + } + await txnAsPromise(txn); + } + + public async getOldestToDeviceBatch(): Promise { + const txn = this.db.transaction(["to_device_queue"], "readonly"); + const store = txn.objectStore("to_device_queue"); + const cursor = await reqAsCursorPromise(store.openCursor()); + if (!cursor) return null; + + const resultBatch = cursor.value as ToDeviceBatchWithTxnId; + + return { + id: cursor.key as number, + txnId: resultBatch.txnId, + eventType: resultBatch.eventType, + batch: resultBatch.batch, + }; + } + + public async removeToDeviceBatch(id: number): Promise { + const txn = this.db.transaction(["to_device_queue"], "readwrite"); + const store = txn.objectStore("to_device_queue"); + store.delete(id); + await txnAsPromise(txn); + } } diff --git a/src/store/indexeddb-remote-backend.ts b/src/store/indexeddb-remote-backend.ts index 9c06105a1c5..67ab2ccd293 100644 --- a/src/store/indexeddb-remote-backend.ts +++ b/src/store/indexeddb-remote-backend.ts @@ -20,6 +20,7 @@ import { ISavedSync } from "./index"; import { IStartClientOpts } from "../client"; import { IStateEventWithRoomId, ISyncResponse } from ".."; import { IIndexedDBBackend, UserTuple } from "./indexeddb-backend"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; export class RemoteIndexedDBStoreBackend implements IIndexedDBBackend { private worker: Worker; @@ -133,6 +134,18 @@ export class RemoteIndexedDBStoreBackend implements IIndexedDBBackend { return this.doCmd('getUserPresenceEvents'); } + public async saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise { + return this.doCmd('saveToDeviceBatches', [batches]); + } + + public async getOldestToDeviceBatch(): Promise { + return this.doCmd('getOldestToDeviceBatch'); + } + + public async removeToDeviceBatch(id: number): Promise { + return this.doCmd('removeToDeviceBatch', [id]); + } + private ensureStarted(): Promise { if (this.startPromise === null) { this.worker = this.workerFactory(); diff --git a/src/store/indexeddb-store-worker.ts b/src/store/indexeddb-store-worker.ts index 0d37dbce935..ced77696176 100644 --- a/src/store/indexeddb-store-worker.ts +++ b/src/store/indexeddb-store-worker.ts @@ -103,6 +103,15 @@ export class IndexedDBStoreWorker { case 'storeClientOptions': prom = this.backend.storeClientOptions(msg.args[0]); break; + case 'saveToDeviceBatches': + prom = this.backend.saveToDeviceBatches(msg.args[0]); + break; + case 'getOldestToDeviceBatch': + prom = this.backend.getOldestToDeviceBatch(); + break; + case 'removeToDeviceBatch': + prom = this.backend.removeToDeviceBatch(msg.args[0]); + break; } if (prom === undefined) { diff --git a/src/store/indexeddb.ts b/src/store/indexeddb.ts index 09a85fd1b54..44f684bdf8f 100644 --- a/src/store/indexeddb.ts +++ b/src/store/indexeddb.ts @@ -27,6 +27,7 @@ import { IIndexedDBBackend } from "./indexeddb-backend"; import { ISyncResponse } from "../sync-accumulator"; import { TypedEventEmitter } from "../models/typed-event-emitter"; import { IStateEventWithRoomId } from "../@types/search"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; /** * This is an internal module. See {@link IndexedDBStore} for the public class. @@ -351,6 +352,18 @@ export class IndexedDBStore extends MemoryStore { this.localStorage.removeItem(pendingEventsKey(roomId)); } } + + public saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise { + return this.backend.saveToDeviceBatches(batches); + } + + public getOldestToDeviceBatch(): Promise { + return this.backend.getOldestToDeviceBatch(); + } + + public removeToDeviceBatch(id: number): Promise { + return this.backend.removeToDeviceBatch(id); + } } /** diff --git a/src/store/memory.ts b/src/store/memory.ts index cb49e425fdb..ae5673063d1 100644 --- a/src/store/memory.ts +++ b/src/store/memory.ts @@ -30,6 +30,7 @@ import { ISavedSync, IStore } from "./index"; import { RoomSummary } from "../models/room-summary"; import { ISyncResponse } from "../sync-accumulator"; import { IStateEventWithRoomId } from "../@types/search"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; function isValidFilterId(filterId: string): boolean { const isValidStr = typeof filterId === "string" && @@ -64,6 +65,8 @@ export class MemoryStore implements IStore { private oobMembers: Record = {}; // roomId: [member events] private pendingEvents: { [roomId: string]: Partial[] } = {}; private clientOptions = {}; + private pendingToDeviceBatches: IndexedToDeviceBatch[] = []; + private nextToDeviceBatchId = 0; constructor(opts: IOpts = {}) { this.localStorage = opts.localStorage; @@ -429,4 +432,26 @@ export class MemoryStore implements IStore { public async setPendingEvents(roomId: string, events: Partial[]): Promise { this.pendingEvents[roomId] = events; } + + public saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise { + for (const batch of batches) { + this.pendingToDeviceBatches.push({ + id: this.nextToDeviceBatchId++, + eventType: batch.eventType, + txnId: batch.txnId, + batch: batch.batch, + }); + } + return Promise.resolve(); + } + + public async getOldestToDeviceBatch(): Promise { + if (this.pendingToDeviceBatches.length === 0) return null; + return this.pendingToDeviceBatches[0]; + } + + public removeToDeviceBatch(id: number): Promise { + this.pendingToDeviceBatches = this.pendingToDeviceBatches.filter(batch => batch.id !== id); + return Promise.resolve(); + } } diff --git a/src/store/stub.ts b/src/store/stub.ts index c9fc57055fd..59fad76748b 100644 --- a/src/store/stub.ts +++ b/src/store/stub.ts @@ -28,6 +28,7 @@ import { ISavedSync, IStore } from "./index"; import { RoomSummary } from "../models/room-summary"; import { ISyncResponse } from "../sync-accumulator"; import { IStateEventWithRoomId } from "../@types/search"; +import { IndexedToDeviceBatch, ToDeviceBatch } from "../models/ToDeviceMessage"; /** * Construct a stub store. This does no-ops on most store methods. @@ -270,4 +271,16 @@ export class StubStore implements IStore { public setPendingEvents(roomId: string, events: Partial[]): Promise { return Promise.resolve(); } + + public async saveToDeviceBatches(batch: ToDeviceBatch[]): Promise { + return Promise.resolve(); + } + + public getOldestToDeviceBatch(): Promise { + return Promise.resolve(null); + } + + public async removeToDeviceBatch(id: number): Promise { + return Promise.resolve(); + } } From 7fbf238689453fdca6451ffc5624ffd0fd0229bd Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 28 Jul 2022 16:14:09 +0100 Subject: [PATCH 02/10] Bump matrix-mock-request --- package.json | 2 +- yarn.lock | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/package.json b/package.json index fbef0c861b1..aaae9e380f0 100644 --- a/package.json +++ b/package.json @@ -102,7 +102,7 @@ "jest-localstorage-mock": "^2.4.6", "jest-sonar-reporter": "^2.0.0", "jsdoc": "^3.6.6", - "matrix-mock-request": "^2.1.0", + "matrix-mock-request": "^2.1.1", "rimraf": "^3.0.2", "terser": "^5.5.1", "tsify": "^5.0.2", diff --git a/yarn.lock b/yarn.lock index 460b2f2df9a..e276bc717d6 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4801,10 +4801,10 @@ matrix-events-sdk@^0.0.1-beta.7: resolved "https://registry.yarnpkg.com/matrix-events-sdk/-/matrix-events-sdk-0.0.1-beta.7.tgz#5ffe45eba1f67cc8d7c2377736c728b322524934" integrity sha512-9jl4wtWanUFSy2sr2lCjErN/oC8KTAtaeaozJtrgot1JiQcEI4Rda9OLgQ7nLKaqb4Z/QUx/fR3XpDzm5Jy1JA== -matrix-mock-request@^2.1.0: - version "2.1.0" - resolved "https://registry.yarnpkg.com/matrix-mock-request/-/matrix-mock-request-2.1.0.tgz#86f5b0ef846865d0767d3a8e64f5bcd6ca94c178" - integrity sha512-Cjpl3yP6h0yu5GKG89m1XZXZlm69Kg/qHV41N/t6SrQsgcfM3Bfavqx9YrtG0UnuXGy4bBSZIe1QiWVeFPZw1A== +matrix-mock-request@^2.1.1: + version "2.1.1" + resolved "https://registry.yarnpkg.com/matrix-mock-request/-/matrix-mock-request-2.1.1.tgz#a8fc03a2816464bb95445df4cc8885ac36786b23" + integrity sha512-CxdaUPRVB4o8JxTBMASstS2loRe+hlqeJu0Q7yyS1r36LkSSo/KAP4AuomsqxuKqaqYYnEJFJzkG0gOhxV7aqA== dependencies: expect "^28.1.0" From 74eec1ac634245a8e20ebf6ff134eb1285ffb67b Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 28 Jul 2022 16:46:30 +0100 Subject: [PATCH 03/10] Add more waits to make indexeddb tests pass --- spec/unit/queueToDevice.spec.ts | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/spec/unit/queueToDevice.spec.ts b/spec/unit/queueToDevice.spec.ts index db904ff1644..33cde757484 100644 --- a/spec/unit/queueToDevice.spec.ts +++ b/spec/unit/queueToDevice.spec.ts @@ -21,6 +21,7 @@ import { IHttpOpts, IndexedDBStore, MatrixEvent, MemoryStore, Room } from "../.. import { MatrixClient } from "../../src/client"; import { ToDeviceBatch } from '../../src/models/ToDeviceMessage'; import { logger } from '../../src/logger'; +import { IStore } from '../../src/store'; const FAKE_USER = "@alice:example.org"; const FAKE_DEVICE_ID = "AAAAAAAA"; @@ -73,11 +74,17 @@ describe.each([ let httpBackend: MockHttpBackend; let client: MatrixClient; - beforeEach(function() { + beforeEach(async function() { httpBackend = new MockHttpBackend(); - const store = storeType === StoreType.IndexedDB ? - new IndexedDBStore({ indexedDB: fakeIndexedDB }) : new MemoryStore(); + let store: IStore; + if (storeType === StoreType.IndexedDB) { + const idbStore = new IndexedDBStore({ indexedDB: fakeIndexedDB }); + await idbStore.startup(); + store = idbStore; + } else { + store = new MemoryStore(); + } client = new MatrixClient({ baseUrl: "https://my.home.server", @@ -127,6 +134,7 @@ describe.each([ FAKE_MSG, ], }); + await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); expect(httpBackend.flushSync(null, 1)).toEqual(1); await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); @@ -147,6 +155,7 @@ describe.each([ FAKE_MSG, ], }); + await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); expect(httpBackend.flushSync(null, 1)).toEqual(1); // Asserting that another request is never made is obviously @@ -182,6 +191,7 @@ describe.each([ FAKE_MSG, ], }); + await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); expect(httpBackend.flushSync(null, 1)).toEqual(1); await flushPromises(); From 1eafdedf7f4fdb74fcbd43832f10262884938b24 Mon Sep 17 00:00:00 2001 From: David Baker Date: Thu, 28 Jul 2022 17:32:55 +0100 Subject: [PATCH 04/10] Switch some test expectations to queueToDevice --- spec/unit/crypto/algorithms/megolm.spec.js | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/spec/unit/crypto/algorithms/megolm.spec.js b/spec/unit/crypto/algorithms/megolm.spec.js index 9bb401b4d84..8ac449d471f 100644 --- a/spec/unit/crypto/algorithms/megolm.spec.js +++ b/spec/unit/crypto/algorithms/megolm.spec.js @@ -148,6 +148,7 @@ describe("MegolmDecryption", function() { }); mockBaseApis.sendToDevice = jest.fn(); + mockBaseApis.queueToDevice = jest.fn(); // do the share megolmDecryption.shareKeysWithDevice(keyRequest); @@ -292,6 +293,7 @@ describe("MegolmDecryption", function() { }, })); mockBaseApis.sendToDevice = jest.fn().mockResolvedValue(undefined); + mockBaseApis.queueToDevice = jest.fn().mockResolvedValue(undefined); aliceDeviceInfo = { deviceId: 'aliceDevice', @@ -369,7 +371,7 @@ describe("MegolmDecryption", function() { expect(mockCrypto.downloadKeys).toHaveBeenCalledWith( ['@alice:home.server'], false, ); - expect(mockBaseApis.sendToDevice).toHaveBeenCalled(); + expect(mockBaseApis.queueToDevice).toHaveBeenCalled(); expect(mockBaseApis.claimOneTimeKeys).toHaveBeenCalledWith( [['@alice:home.server', 'aliceDevice']], 'signed_curve25519', 2000, ); @@ -412,7 +414,7 @@ describe("MegolmDecryption", function() { 'YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWI', ); - mockBaseApis.sendToDevice.mockClear(); + mockBaseApis.queueToDevice.mockClear(); await megolmEncryption.reshareKeyWithDevice( olmDevice.deviceCurve25519Key, ct1.session_id, @@ -420,7 +422,7 @@ describe("MegolmDecryption", function() { aliceDeviceInfo, ); - expect(mockBaseApis.sendToDevice).not.toHaveBeenCalled(); + expect(mockBaseApis.queueToDevice).not.toHaveBeenCalled(); }); }); }); From e58724e968e83f66e889793aec896bb0ccd5f807 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Jul 2022 09:57:35 +0100 Subject: [PATCH 05/10] Stop straight away if the client has been stopped Hopefully will fix tests being flakey and logging after tests have finished. --- src/ToDeviceMessageQueue.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ToDeviceMessageQueue.ts b/src/ToDeviceMessageQueue.ts index 644cca156d7..ec34d74d504 100644 --- a/src/ToDeviceMessageQueue.ts +++ b/src/ToDeviceMessageQueue.ts @@ -78,6 +78,9 @@ export class ToDeviceMessageQueue { this.retryAttempts = 0; } + // Make sure we're still running after the async tasks: if not, stop. + if (!this.running) return; + logger.debug("All queued to-device messages sent"); } catch (e) { ++this.retryAttempts; From e1b10019c827f36b7ba38ddc6896e9730e729061 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Jul 2022 15:31:31 +0100 Subject: [PATCH 06/10] Add return types & fix constant usage --- src/ToDeviceMessageQueue.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/ToDeviceMessageQueue.ts b/src/ToDeviceMessageQueue.ts index ec34d74d504..12827d8bbc8 100644 --- a/src/ToDeviceMessageQueue.ts +++ b/src/ToDeviceMessageQueue.ts @@ -34,23 +34,23 @@ export class ToDeviceMessageQueue { constructor(private client: MatrixClient) { } - public start() { + public start(): void { this.running = true; this.sendQueue(); } - public stop() { + public stop(): void { this.running = false; if (this.retryTimeout !== null) clearTimeout(this.retryTimeout); this.retryTimeout = null; } - public async queueBatch(batch: ToDeviceBatch) { + public async queueBatch(batch: ToDeviceBatch): Promise { const batches: ToDeviceBatchWithTxnId[] = []; for (let i = 0; i < batch.batch.length; i += MAX_BATCH_SIZE) { batches.push({ eventType: batch.eventType, - batch: batch.batch.slice(i, i + 20), + batch: batch.batch.slice(i, i + MAX_BATCH_SIZE), txnId: this.client.makeTxnId(), }); } @@ -59,7 +59,7 @@ export class ToDeviceMessageQueue { this.sendQueue(); } - public sendQueue = async () => { + public sendQueue = async (): Promise => { if (this.retryTimeout !== null) clearTimeout(this.retryTimeout); this.retryTimeout = null; From 690f5cb8f8b9f43a9e455131640da7450db060ec Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Jul 2022 15:32:00 +0100 Subject: [PATCH 07/10] Fix return type Co-authored-by: Germain --- src/store/memory.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store/memory.ts b/src/store/memory.ts index ae5673063d1..0ed43a5b5ac 100644 --- a/src/store/memory.ts +++ b/src/store/memory.ts @@ -445,7 +445,7 @@ export class MemoryStore implements IStore { return Promise.resolve(); } - public async getOldestToDeviceBatch(): Promise { + public async getOldestToDeviceBatch(): Promise { if (this.pendingToDeviceBatches.length === 0) return null; return this.pendingToDeviceBatches[0]; } From 5865f25d358feb508d85af6def9cce2938867a06 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Jul 2022 15:32:11 +0100 Subject: [PATCH 08/10] Fix return type Co-authored-by: Germain --- src/store/stub.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store/stub.ts b/src/store/stub.ts index 59fad76748b..eb988a9733b 100644 --- a/src/store/stub.ts +++ b/src/store/stub.ts @@ -276,7 +276,7 @@ export class StubStore implements IStore { return Promise.resolve(); } - public getOldestToDeviceBatch(): Promise { + public getOldestToDeviceBatch(): Promise { return Promise.resolve(null); } From cd8b5f30989321bb1dac74bc81bc7d347251f038 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Jul 2022 15:32:21 +0100 Subject: [PATCH 09/10] Fix return type Co-authored-by: Germain --- src/store/indexeddb-local-backend.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/store/indexeddb-local-backend.ts b/src/store/indexeddb-local-backend.ts index d16afa5c1fb..bd646c37202 100644 --- a/src/store/indexeddb-local-backend.ts +++ b/src/store/indexeddb-local-backend.ts @@ -579,7 +579,7 @@ export class LocalIndexedDBStoreBackend implements IIndexedDBBackend { await txnAsPromise(txn); } - public async getOldestToDeviceBatch(): Promise { + public async getOldestToDeviceBatch(): Promise { const txn = this.db.transaction(["to_device_queue"], "readonly"); const store = txn.objectStore("to_device_queue"); const cursor = await reqAsCursorPromise(store.openCursor()); From b62efdbfec63706f290323f2fb3c7ddad653b24b Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 29 Jul 2022 15:41:56 +0100 Subject: [PATCH 10/10] Stop the client in all test cases --- spec/unit/queueToDevice.spec.ts | 125 +++++++++++++++----------------- 1 file changed, 57 insertions(+), 68 deletions(-) diff --git a/spec/unit/queueToDevice.spec.ts b/spec/unit/queueToDevice.spec.ts index 33cde757484..ff22d29d48d 100644 --- a/spec/unit/queueToDevice.spec.ts +++ b/spec/unit/queueToDevice.spec.ts @@ -96,6 +96,7 @@ describe.each([ afterEach(function() { jest.useRealTimers(); + client.stopClient(); }); it("sends a to-device message", async function() { @@ -217,30 +218,26 @@ describe.each([ await Promise.all([client.startClient(), httpBackend.flush(null, 1, 20)]); - try { - httpBackend.when( - "PUT", "/sendToDevice/org.example.foo/", - ).respond(500); + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); - httpBackend.when( - "PUT", "/sendToDevice/org.example.foo/", - ).respond(200, {}); + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); - await client.queueToDevice({ - eventType: "org.example.foo", - batch: [ - FAKE_MSG, - ], - }); - expect(await httpBackend.flush(null, 1, 1)).toEqual(1); - await flushPromises(); + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + expect(await httpBackend.flush(null, 1, 1)).toEqual(1); + await flushPromises(); - client.retryImmediately(); + client.retryImmediately(); - expect(await httpBackend.flush(null, 1, 20)).toEqual(1); - } finally { - client.stopClient(); - } + expect(await httpBackend.flush(null, 1, 20)).toEqual(1); }); it("retries on when client is started", async function() { @@ -250,31 +247,27 @@ describe.each([ await Promise.all([client.startClient(), httpBackend.flush("/_matrix/client/versions", 1, 20)]); - try { - httpBackend.when( - "PUT", "/sendToDevice/org.example.foo/", - ).respond(500); + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); - httpBackend.when( - "PUT", "/sendToDevice/org.example.foo/", - ).respond(200, {}); + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); - await client.queueToDevice({ - eventType: "org.example.foo", - batch: [ - FAKE_MSG, - ], - }); - expect(await httpBackend.flush(null, 1, 1)).toEqual(1); - await flushPromises(); + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + expect(await httpBackend.flush(null, 1, 1)).toEqual(1); + await flushPromises(); - client.stopClient(); - await Promise.all([client.startClient(), httpBackend.flush("/_matrix/client/versions", 1, 20)]); + client.stopClient(); + await Promise.all([client.startClient(), httpBackend.flush("/_matrix/client/versions", 1, 20)]); - expect(await httpBackend.flush(null, 1, 20)).toEqual(1); - } finally { - client.stopClient(); - } + expect(await httpBackend.flush(null, 1, 20)).toEqual(1); }); it("retries when a message is retried", async function() { @@ -284,37 +277,33 @@ describe.each([ await Promise.all([client.startClient(), httpBackend.flush(null, 1, 20)]); - try { - httpBackend.when( - "PUT", "/sendToDevice/org.example.foo/", - ).respond(500); + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); - httpBackend.when( - "PUT", "/sendToDevice/org.example.foo/", - ).respond(200, {}); + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); - await client.queueToDevice({ - eventType: "org.example.foo", - batch: [ - FAKE_MSG, - ], - }); + expect(await httpBackend.flush(null, 1, 1)).toEqual(1); + await flushPromises(); - expect(await httpBackend.flush(null, 1, 1)).toEqual(1); - await flushPromises(); + const dummyEvent = new MatrixEvent({ + event_id: "!fake:example.org", + }); + const mockRoom = { + updatePendingEvent: jest.fn(), + } as unknown as Room; + client.resendEvent(dummyEvent, mockRoom); - const dummyEvent = new MatrixEvent({ - event_id: "!fake:example.org", - }); - const mockRoom = { - updatePendingEvent: jest.fn(), - } as unknown as Room; - client.resendEvent(dummyEvent, mockRoom); - - expect(await httpBackend.flush(null, 1, 20)).toEqual(1); - } finally { - client.stopClient(); - } + expect(await httpBackend.flush(null, 1, 20)).toEqual(1); }); it("splits many messages into multiple HTTP requests", async function() {