diff --git a/package.json b/package.json index e7f7441ae72..e1f1ac13e2d 100644 --- a/package.json +++ b/package.json @@ -102,7 +102,7 @@ "jest-localstorage-mock": "^2.4.6", "jest-sonar-reporter": "^2.0.0", "jsdoc": "^3.6.6", - "matrix-mock-request": "^2.1.0", + "matrix-mock-request": "^2.1.1", "rimraf": "^3.0.2", "terser": "^5.5.1", "tsify": "^5.0.2", diff --git a/spec/unit/crypto/algorithms/megolm.spec.ts b/spec/unit/crypto/algorithms/megolm.spec.ts index fec37296f10..3b39dcd869f 100644 --- a/spec/unit/crypto/algorithms/megolm.spec.ts +++ b/spec/unit/crypto/algorithms/megolm.spec.ts @@ -59,6 +59,7 @@ describe("MegolmDecryption", function() { mockBaseApis = { claimOneTimeKeys: jest.fn(), sendToDevice: jest.fn(), + queueToDevice: jest.fn(), } as unknown as MockedObject; const cryptoStore = new MemoryCryptoStore(); @@ -179,6 +180,7 @@ describe("MegolmDecryption", function() { }); mockBaseApis.sendToDevice.mockReset(); + mockBaseApis.queueToDevice.mockReset(); // do the share megolmDecryption.shareKeysWithDevice(keyRequest); @@ -324,6 +326,7 @@ describe("MegolmDecryption", function() { }, }); mockBaseApis.sendToDevice.mockResolvedValue(undefined); + mockBaseApis.queueToDevice.mockResolvedValue(undefined); aliceDeviceInfo = { deviceId: 'aliceDevice', @@ -403,7 +406,7 @@ describe("MegolmDecryption", function() { expect(mockCrypto.downloadKeys).toHaveBeenCalledWith( ['@alice:home.server'], false, ); - expect(mockBaseApis.sendToDevice).toHaveBeenCalled(); + expect(mockBaseApis.queueToDevice).toHaveBeenCalled(); expect(mockBaseApis.claimOneTimeKeys).toHaveBeenCalledWith( [['@alice:home.server', 'aliceDevice']], 'signed_curve25519', 2000, ); @@ -446,7 +449,7 @@ describe("MegolmDecryption", function() { 'YWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWFhYWI', ); - mockBaseApis.sendToDevice.mockClear(); + mockBaseApis.queueToDevice.mockClear(); await megolmEncryption.reshareKeyWithDevice( olmDevice.deviceCurve25519Key, ct1.session_id, @@ -454,7 +457,7 @@ describe("MegolmDecryption", function() { aliceDeviceInfo, ); - expect(mockBaseApis.sendToDevice).not.toHaveBeenCalled(); + expect(mockBaseApis.queueToDevice).not.toHaveBeenCalled(); }); }); }); diff --git a/spec/unit/queueToDevice.spec.ts b/spec/unit/queueToDevice.spec.ts new file mode 100644 index 00000000000..ff22d29d48d --- /dev/null +++ b/spec/unit/queueToDevice.spec.ts @@ -0,0 +1,338 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import MockHttpBackend from 'matrix-mock-request'; +import { indexedDB as fakeIndexedDB } from 'fake-indexeddb'; + +import { IHttpOpts, IndexedDBStore, MatrixEvent, MemoryStore, Room } from "../../src"; +import { MatrixClient } from "../../src/client"; +import { ToDeviceBatch } from '../../src/models/ToDeviceMessage'; +import { logger } from '../../src/logger'; +import { IStore } from '../../src/store'; + +const FAKE_USER = "@alice:example.org"; +const FAKE_DEVICE_ID = "AAAAAAAA"; +const FAKE_PAYLOAD = { + "foo": 42, +}; +const EXPECTED_BODY = { + messages: { + [FAKE_USER]: { + [FAKE_DEVICE_ID]: FAKE_PAYLOAD, + }, + }, +}; + +const FAKE_MSG = { + userId: FAKE_USER, + deviceId: FAKE_DEVICE_ID, + payload: FAKE_PAYLOAD, +}; + +enum StoreType { + Memory = 'Memory', + IndexedDB = 'IndexedDB', +} + +// Jest now uses @sinonjs/fake-timers which exposes tickAsync() and a number of +// other async methods which break the event loop, letting scheduled promise +// callbacks run. Unfortunately, Jest doesn't expose these, so we have to do +// it manually (this is what sinon does under the hood). We do both in a loop +// until the thing we expect happens: hopefully this is the least flakey way +// and avoids assuming anything about the app's behaviour. +const realSetTimeout = setTimeout; +function flushPromises() { + return new Promise(r => { + realSetTimeout(r, 1); + }); +} + +async function flushAndRunTimersUntil(cond: () => boolean) { + while (!cond()) { + await flushPromises(); + if (cond()) break; + jest.advanceTimersToNextTimer(); + } +} + +describe.each([ + [StoreType.Memory], [StoreType.IndexedDB], +])("queueToDevice (%s store)", function(storeType) { + let httpBackend: MockHttpBackend; + let client: MatrixClient; + + beforeEach(async function() { + httpBackend = new MockHttpBackend(); + + let store: IStore; + if (storeType === StoreType.IndexedDB) { + const idbStore = new IndexedDBStore({ indexedDB: fakeIndexedDB }); + await idbStore.startup(); + store = idbStore; + } else { + store = new MemoryStore(); + } + + client = new MatrixClient({ + baseUrl: "https://my.home.server", + accessToken: "my.access.token", + request: httpBackend.requestFn as IHttpOpts["request"], + store, + }); + }); + + afterEach(function() { + jest.useRealTimers(); + client.stopClient(); + }); + + it("sends a to-device message", async function() { + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).check((request) => { + expect(request.data).toEqual(EXPECTED_BODY); + }).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + + await httpBackend.flushAllExpected(); + }); + + it("retries on error", async function() { + jest.useFakeTimers(); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).check((request) => { + expect(request.data).toEqual(EXPECTED_BODY); + }).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); + expect(httpBackend.flushSync(null, 1)).toEqual(1); + + await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); + + expect(httpBackend.flushSync(null, 1)).toEqual(1); + }); + + it("stops retrying on 4xx errors", async function() { + jest.useFakeTimers(); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(400); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); + expect(httpBackend.flushSync(null, 1)).toEqual(1); + + // Asserting that another request is never made is obviously + // a bit tricky - we just flush the queue what should hopefully + // be plenty of times and assert that nothing comes through. + let tries = 0; + await flushAndRunTimersUntil(() => ++tries === 10); + + expect(httpBackend.requests.length).toEqual(0); + }); + + it("honours ratelimiting", async function() { + jest.useFakeTimers(); + + // pick something obscure enough it's unlikley to clash with a + // retry delay the algorithm uses anyway + const retryDelay = 279 * 1000; + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(429, { + errcode: "M_LIMIT_EXCEEDED", + retry_after_ms: retryDelay, + }); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + await flushAndRunTimersUntil(() => httpBackend.requests.length > 0); + expect(httpBackend.flushSync(null, 1)).toEqual(1); + await flushPromises(); + + logger.info("Advancing clock to just before expected retry time..."); + + jest.advanceTimersByTime(retryDelay - 1000); + await flushPromises(); + + expect(httpBackend.requests.length).toEqual(0); + + logger.info("Advancing clock past expected retry time..."); + + jest.advanceTimersByTime(2000); + await flushPromises(); + + expect(httpBackend.flushSync(null, 1)).toEqual(1); + }); + + it("retries on retryImmediately()", async function() { + httpBackend.when("GET", "/_matrix/client/versions").respond(200, { + versions: ["r0.0.1"], + }); + + await Promise.all([client.startClient(), httpBackend.flush(null, 1, 20)]); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + expect(await httpBackend.flush(null, 1, 1)).toEqual(1); + await flushPromises(); + + client.retryImmediately(); + + expect(await httpBackend.flush(null, 1, 20)).toEqual(1); + }); + + it("retries on when client is started", async function() { + httpBackend.when("GET", "/_matrix/client/versions").respond(200, { + versions: ["r0.0.1"], + }); + + await Promise.all([client.startClient(), httpBackend.flush("/_matrix/client/versions", 1, 20)]); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + expect(await httpBackend.flush(null, 1, 1)).toEqual(1); + await flushPromises(); + + client.stopClient(); + await Promise.all([client.startClient(), httpBackend.flush("/_matrix/client/versions", 1, 20)]); + + expect(await httpBackend.flush(null, 1, 20)).toEqual(1); + }); + + it("retries when a message is retried", async function() { + httpBackend.when("GET", "/_matrix/client/versions").respond(200, { + versions: ["r0.0.1"], + }); + + await Promise.all([client.startClient(), httpBackend.flush(null, 1, 20)]); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(500); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).respond(200, {}); + + await client.queueToDevice({ + eventType: "org.example.foo", + batch: [ + FAKE_MSG, + ], + }); + + expect(await httpBackend.flush(null, 1, 1)).toEqual(1); + await flushPromises(); + + const dummyEvent = new MatrixEvent({ + event_id: "!fake:example.org", + }); + const mockRoom = { + updatePendingEvent: jest.fn(), + } as unknown as Room; + client.resendEvent(dummyEvent, mockRoom); + + expect(await httpBackend.flush(null, 1, 20)).toEqual(1); + }); + + it("splits many messages into multiple HTTP requests", async function() { + const batch: ToDeviceBatch = { + eventType: "org.example.foo", + batch: [], + }; + + for (let i = 0; i <= 20; ++i) { + batch.batch.push({ + userId: `@user${i}:example.org`, + deviceId: FAKE_DEVICE_ID, + payload: FAKE_PAYLOAD, + }); + } + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).check((request) => { + expect(Object.keys(request.data.messages).length).toEqual(20); + }).respond(200, {}); + + httpBackend.when( + "PUT", "/sendToDevice/org.example.foo/", + ).check((request) => { + expect(Object.keys(request.data.messages).length).toEqual(1); + }).respond(200, {}); + + await client.queueToDevice(batch); + await httpBackend.flushAllExpected(); + }); +}); diff --git a/src/ToDeviceMessageQueue.ts b/src/ToDeviceMessageQueue.ts new file mode 100644 index 00000000000..12827d8bbc8 --- /dev/null +++ b/src/ToDeviceMessageQueue.ts @@ -0,0 +1,125 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +import { logger } from "./logger"; +import { MatrixClient } from "./matrix"; +import { IndexedToDeviceBatch, ToDeviceBatch, ToDeviceBatchWithTxnId, ToDevicePayload } from "./models/ToDeviceMessage"; +import { MatrixScheduler } from "./scheduler"; + +const MAX_BATCH_SIZE = 20; + +/** + * Maintains a queue of outgoing to-device messages, sending them + * as soon as the homeserver is reachable. + */ +export class ToDeviceMessageQueue { + private sending = false; + private running = true; + private retryTimeout: number = null; + private retryAttempts = 0; + + constructor(private client: MatrixClient) { + } + + public start(): void { + this.running = true; + this.sendQueue(); + } + + public stop(): void { + this.running = false; + if (this.retryTimeout !== null) clearTimeout(this.retryTimeout); + this.retryTimeout = null; + } + + public async queueBatch(batch: ToDeviceBatch): Promise { + const batches: ToDeviceBatchWithTxnId[] = []; + for (let i = 0; i < batch.batch.length; i += MAX_BATCH_SIZE) { + batches.push({ + eventType: batch.eventType, + batch: batch.batch.slice(i, i + MAX_BATCH_SIZE), + txnId: this.client.makeTxnId(), + }); + } + + await this.client.store.saveToDeviceBatches(batches); + this.sendQueue(); + } + + public sendQueue = async (): Promise => { + if (this.retryTimeout !== null) clearTimeout(this.retryTimeout); + this.retryTimeout = null; + + if (this.sending || !this.running) return; + + logger.debug("Attempting to send queued to-device messages"); + + this.sending = true; + let headBatch; + try { + while (this.running) { + headBatch = await this.client.store.getOldestToDeviceBatch(); + if (headBatch === null) break; + await this.sendBatch(headBatch); + await this.client.store.removeToDeviceBatch(headBatch.id); + this.retryAttempts = 0; + } + + // Make sure we're still running after the async tasks: if not, stop. + if (!this.running) return; + + logger.debug("All queued to-device messages sent"); + } catch (e) { + ++this.retryAttempts; + // eslint-disable-next-line @typescript-eslint/naming-convention + // eslint-disable-next-line new-cap + const retryDelay = MatrixScheduler.RETRY_BACKOFF_RATELIMIT(null, this.retryAttempts, e); + if (retryDelay === -1) { + // the scheduler function doesn't differentiate between fatal errors and just getting + // bored and giving up for now + if (Math.floor(e.httpStatus / 100) === 4) { + logger.error("Fatal error when sending to-device message - dropping to-device batch!", e); + await this.client.store.removeToDeviceBatch(headBatch.id); + } else { + logger.info("Automatic retry limit reached for to-device messages."); + } + return; + } + + logger.info(`Failed to send batch of to-device messages. Will retry in ${retryDelay}ms`, e); + this.retryTimeout = setTimeout(this.sendQueue, retryDelay); + } finally { + this.sending = false; + } + }; + + /** + * Attempts to send a batch of to-device messages. + */ + private async sendBatch(batch: IndexedToDeviceBatch): Promise { + const contentMap: Record> = {}; + for (const item of batch.batch) { + if (!contentMap[item.userId]) { + contentMap[item.userId] = {}; + } + contentMap[item.userId][item.deviceId] = item.payload; + } + + logger.info(`Sending batch of ${batch.batch.length} to-device messages with ID ${batch.id}`); + + await this.client.sendToDevice(batch.eventType, contentMap, batch.txnId); + } +} diff --git a/src/client.ts b/src/client.ts index 73ca5144ef0..69c8ffdc051 100644 --- a/src/client.ts +++ b/src/client.ts @@ -194,6 +194,8 @@ import { MSC3575SlidingSyncRequest, MSC3575SlidingSyncResponse, SlidingSync } fr import { SlidingSyncSdk } from "./sliding-sync-sdk"; import { Thread, THREAD_RELATION_TYPE } from "./models/thread"; import { MBeaconInfoEventContent, M_BEACON_INFO } from "./@types/beacon"; +import { ToDeviceMessageQueue } from "./ToDeviceMessageQueue"; +import { ToDeviceBatch } from "./models/ToDeviceMessage"; export type Store = IStore; @@ -939,6 +941,8 @@ export class MatrixClient extends TypedEventEmitter>(); + private toDeviceMessageQueue: ToDeviceMessageQueue; + constructor(opts: IMatrixClientCreateOpts) { super(); @@ -1033,6 +1037,8 @@ export class MatrixClient extends TypedEventEmitterexplicitly attempts to retry their lost connection. + * Will also retry any outbound to-device messages currently in the queue to be sent + * (retries of regular outgoing events are handled separately, per-event). * @return {boolean} True if this resulted in a request being retried. */ public retryImmediately(): boolean { + // don't await for this promise: we just want to kick it off + this.toDeviceMessageQueue.sendQueue(); return this.syncApi.retryImmediately(); } @@ -3500,7 +3514,7 @@ export class MatrixClient extends TypedEventEmitter { + // also kick the to-device queue to retry + this.toDeviceMessageQueue.sendQueue(); + this.updatePendingEventStatus(room, event, EventStatus.SENDING); return this.encryptAndSendEvent(room, event); } @@ -8694,7 +8711,10 @@ export class MatrixClient extends TypedEventEmitter>} contentMap @@ -8726,6 +8746,17 @@ export class MatrixClient extends TypedEventEmitter { + return this.toDeviceMessageQueue.queueBatch(batch); + } + /** * Get the third party protocols that can be reached using * this HS diff --git a/src/crypto/algorithms/megolm.ts b/src/crypto/algorithms/megolm.ts index c16187fd622..9233c4debb9 100644 --- a/src/crypto/algorithms/megolm.ts +++ b/src/crypto/algorithms/megolm.ts @@ -22,6 +22,7 @@ limitations under the License. import { logger } from '../../logger'; import * as olmlib from "../olmlib"; +import { EventType } from '../../@types/event'; import { DecryptionAlgorithm, DecryptionError, @@ -37,6 +38,7 @@ import { IOlmSessionResult } from "../olmlib"; import { DeviceInfoMap } from "../DeviceList"; import { MatrixEvent } from "../.."; import { IEventDecryptionResult, IMegolmSessionData, IncomingRoomKeyRequest } from "../index"; +import { ToDeviceBatch, ToDeviceMessage } from '../../models/ToDeviceMessage'; // determine whether the key can be shared with invitees export function isRoomSharedHistory(room: Room): boolean { @@ -609,7 +611,11 @@ class MegolmEncryption extends EncryptionAlgorithm { userDeviceMap: IOlmDevice[], payload: IPayload, ): Promise { - const contentMap: Record> = {}; + const toDeviceBatch: ToDeviceBatch = { + eventType: EventType.RoomMessageEncrypted, + batch: [], + }; + // Map from userId to a map of deviceId to deviceInfo const deviceInfoByUserIdAndDeviceId = new Map>(); @@ -637,10 +643,11 @@ class MegolmEncryption extends EncryptionAlgorithm { // We hold by reference, this updates deviceInfoByUserIdAndDeviceId[userId] userIdDeviceInfo.set(deviceId, deviceInfo); - if (!contentMap[userId]) { - contentMap[userId] = {}; - } - contentMap[userId][deviceId] = encryptedContent; + toDeviceBatch.batch.push({ + userId, + deviceId, + payload: encryptedContent, + }); promises.push( olmlib.encryptMessageForDevice( @@ -660,40 +667,29 @@ class MegolmEncryption extends EncryptionAlgorithm { // in which case it will have just not added anything to the ciphertext object. // There's no point sending messages to devices if we couldn't encrypt to them, // since that's effectively a blank message. - for (const userId of Object.keys(contentMap)) { - for (const deviceId of Object.keys(contentMap[userId])) { - if (Object.keys(contentMap[userId][deviceId].ciphertext).length === 0) { - logger.log( - "No ciphertext for device " + - userId + ":" + deviceId + ": pruning", - ); - delete contentMap[userId][deviceId]; - } - } - // No devices left for that user? Strip that too. - if (Object.keys(contentMap[userId]).length === 0) { - logger.log("Pruned all devices for user " + userId); - delete contentMap[userId]; + const prunedBatch: ToDeviceMessage[] = []; + for (const msg of toDeviceBatch.batch) { + if (Object.keys(msg.payload.ciphertext).length > 0) { + prunedBatch.push(msg); + } else { + logger.log( + "No ciphertext for device " + + msg.userId + ":" + msg.deviceId + ": pruning", + ); } } - // Is there anything left? - if (Object.keys(contentMap).length === 0) { - logger.log("No users left to send to: aborting"); - return; - } + toDeviceBatch.batch = prunedBatch; - return this.baseApis.sendToDevice("m.room.encrypted", contentMap).then(() => { + return this.baseApis.queueToDevice(toDeviceBatch).then(() => { // store that we successfully uploaded the keys of the current slice - for (const userId of Object.keys(contentMap)) { - for (const deviceId of Object.keys(contentMap[userId])) { - session.markSharedWithDevice( - userId, - deviceId, - deviceInfoByUserIdAndDeviceId.get(userId).get(deviceId).getIdentityKey(), - chainIndex, - ); - } + for (const msg of toDeviceBatch.batch) { + session.markSharedWithDevice( + msg.userId, + msg.deviceId, + deviceInfoByUserIdAndDeviceId.get(msg.userId).get(msg.deviceId).getIdentityKey(), + chainIndex, + ); } }); }); diff --git a/src/models/ToDeviceMessage.ts b/src/models/ToDeviceMessage.ts new file mode 100644 index 00000000000..8efc3ed4e31 --- /dev/null +++ b/src/models/ToDeviceMessage.ts @@ -0,0 +1,38 @@ +/* +Copyright 2022 The Matrix.org Foundation C.I.C. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +export type ToDevicePayload = Record; + +export interface ToDeviceMessage { + userId: string; + deviceId: string; + payload: ToDevicePayload; +} + +export interface ToDeviceBatch { + eventType: string; + batch: ToDeviceMessage[]; +} + +// Only used internally +export interface ToDeviceBatchWithTxnId extends ToDeviceBatch { + txnId: string; +} + +// Only used internally +export interface IndexedToDeviceBatch extends ToDeviceBatchWithTxnId { + id: number; +} diff --git a/src/scheduler.ts b/src/scheduler.ts index d0249b6cc02..271982b745a 100644 --- a/src/scheduler.ts +++ b/src/scheduler.ts @@ -57,7 +57,7 @@ export class MatrixScheduler { * failure was due to a rate limited request, the time specified in the error is * waited before being retried. * @param {MatrixEvent} event - * @param {Number} attempts + * @param {Number} attempts Number of attempts that have been made, including the one that just failed (ie. starting at 1) * @param {MatrixError} err * @return {Number} * @see module:scheduler~retryAlgorithm diff --git a/src/store/index.ts b/src/store/index.ts index 3f4a0dadeb7..ee3137cc57d 100644 --- a/src/store/index.ts +++ b/src/store/index.ts @@ -23,6 +23,7 @@ import { RoomSummary } from "../models/room-summary"; import { IMinimalEvent, IRooms, ISyncResponse } from "../sync-accumulator"; import { IStartClientOpts } from "../client"; import { IStateEventWithRoomId } from "../@types/search"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; export interface ISavedSync { nextBatch: string; @@ -31,8 +32,7 @@ export interface ISavedSync { } /** - * Construct a stub store. This does no-ops on most store methods. - * @constructor + * A store for most of the data js-sdk needs to store, apart from crypto data */ export interface IStore { readonly accountData: Record; // type : content @@ -57,21 +57,21 @@ export interface IStore { setSyncToken(token: string): void; /** - * No-op. - * @param {Room} room + * Store the given room. + * @param {Room} room The room to be stored. All properties must be stored. */ storeRoom(room: Room): void; /** - * No-op. - * @param {string} roomId - * @return {null} + * Retrieve a room by its' room ID. + * @param {string} roomId The room ID. + * @return {Room} The room or null. */ getRoom(roomId: string): Room | null; /** - * No-op. - * @return {Array} An empty array. + * Retrieve all known rooms. + * @return {Room[]} A list of rooms, which may be empty. */ getRooms(): Room[]; @@ -82,35 +82,36 @@ export interface IStore { removeRoom(roomId: string): void; /** - * No-op. - * @return {Array} An empty array. + * Retrieve a summary of all the rooms. + * @return {RoomSummary[]} A summary of each room. */ getRoomSummaries(): RoomSummary[]; /** - * No-op. - * @param {User} user + * Store a User. + * @param {User} user The user to store. */ storeUser(user: User): void; /** - * No-op. - * @param {string} userId - * @return {null} + * Retrieve a User by its' user ID. + * @param {string} userId The user ID. + * @return {User} The user or null. */ getUser(userId: string): User | null; /** - * No-op. - * @return {User[]} + * Retrieve all known users. + * @return {User[]} A list of users, which may be empty. */ getUsers(): User[]; /** - * No-op. - * @param {Room} room - * @param {number} limit - * @return {Array} + * Retrieve scrollback for this room. + * @param {Room} room The matrix room + * @param {number} limit The max number of old events to retrieve. + * @return {Array} An array of objects which will be at most 'limit' + * length and at least 0. The objects are the raw event JSON. */ scrollback(room: Room, limit: number): MatrixEvent[]; @@ -209,8 +210,23 @@ export interface IStore { */ deleteAllData(): Promise; + /** + * Returns the out-of-band membership events for this room that + * were previously loaded. + * @param {string} roomId + * @returns {event[]} the events, potentially an empty array if OOB loading didn't yield any new members + * @returns {null} in case the members for this room haven't been stored yet + */ getOutOfBandMembers(roomId: string): Promise; + /** + * Stores the out-of-band membership events for this room. Note that + * it still makes sense to store an empty array as the OOB status for the room is + * marked as fetched, and getOutOfBandMembers will return an empty array instead of null + * @param {string} roomId + * @param {event[]} membershipEvents the membership events to store + * @returns {Promise} when all members have been stored + */ setOutOfBandMembers(roomId: string, membershipEvents: IStateEventWithRoomId[]): Promise; clearOutOfBandMembers(roomId: string): Promise; @@ -222,4 +238,19 @@ export interface IStore { getPendingEvents(roomId: string): Promise[]>; setPendingEvents(roomId: string, events: Partial[]): Promise; + + /** + * Stores batches of outgoing to-device messages + */ + saveToDeviceBatches(batch: ToDeviceBatchWithTxnId[]): Promise; + + /** + * Fetches the oldest batch of to-device messages in the queue + */ + getOldestToDeviceBatch(): Promise; + + /** + * Removes a specific batch of to-device messages from the queue + */ + removeToDeviceBatch(id: number): Promise; } diff --git a/src/store/indexeddb-backend.ts b/src/store/indexeddb-backend.ts index 83470d72a19..93d1cb3ab19 100644 --- a/src/store/indexeddb-backend.ts +++ b/src/store/indexeddb-backend.ts @@ -16,6 +16,7 @@ limitations under the License. import { ISavedSync } from "./index"; import { IEvent, IStartClientOpts, IStateEventWithRoomId, ISyncResponse } from ".."; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; export interface IIndexedDBBackend { connect(): Promise; @@ -31,6 +32,9 @@ export interface IIndexedDBBackend { getUserPresenceEvents(): Promise; getClientOptions(): Promise; storeClientOptions(options: IStartClientOpts): Promise; + saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise; + getOldestToDeviceBatch(): Promise; + removeToDeviceBatch(id: number): Promise; } export type UserTuple = [userId: string, presenceEvent: Partial]; diff --git a/src/store/indexeddb-local-backend.ts b/src/store/indexeddb-local-backend.ts index 178931ff857..bd646c37202 100644 --- a/src/store/indexeddb-local-backend.ts +++ b/src/store/indexeddb-local-backend.ts @@ -21,8 +21,9 @@ import { logger } from '../logger'; import { IStartClientOpts, IStateEventWithRoomId } from ".."; import { ISavedSync } from "./index"; import { IIndexedDBBackend, UserTuple } from "./indexeddb-backend"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; -const VERSION = 3; +const VERSION = 4; function createDatabase(db: IDBDatabase): void { // Make user store, clobber based on user ID. (userId property of User objects) @@ -49,6 +50,10 @@ function upgradeSchemaV3(db: IDBDatabase): void { { keyPath: ["clobber"] }); } +function upgradeSchemaV4(db: IDBDatabase): void { + db.createObjectStore("to_device_queue", { autoIncrement: true }); +} + /** * Helper method to collect results from a Cursor and promiseify it. * @param {ObjectStore|Index} store The store to perform openCursor on. @@ -112,7 +117,7 @@ function reqAsPromise(req: IDBRequest): Promise { }); } -function reqAsCursorPromise(req: IDBRequest): Promise { +function reqAsCursorPromise(req: IDBRequest): Promise { return reqAsEventPromise(req).then((event) => req.result); } @@ -177,6 +182,9 @@ export class LocalIndexedDBStoreBackend implements IIndexedDBBackend { if (oldVersion < 3) { upgradeSchemaV3(db); } + if (oldVersion < 4) { + upgradeSchemaV4(db); + } // Expand as needed. }; @@ -561,4 +569,36 @@ export class LocalIndexedDBStoreBackend implements IIndexedDBBackend { }); // put == UPSERT await txnAsPromise(txn); } + + public async saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise { + const txn = this.db.transaction(["to_device_queue"], "readwrite"); + const store = txn.objectStore("to_device_queue"); + for (const batch of batches) { + store.add(batch); + } + await txnAsPromise(txn); + } + + public async getOldestToDeviceBatch(): Promise { + const txn = this.db.transaction(["to_device_queue"], "readonly"); + const store = txn.objectStore("to_device_queue"); + const cursor = await reqAsCursorPromise(store.openCursor()); + if (!cursor) return null; + + const resultBatch = cursor.value as ToDeviceBatchWithTxnId; + + return { + id: cursor.key as number, + txnId: resultBatch.txnId, + eventType: resultBatch.eventType, + batch: resultBatch.batch, + }; + } + + public async removeToDeviceBatch(id: number): Promise { + const txn = this.db.transaction(["to_device_queue"], "readwrite"); + const store = txn.objectStore("to_device_queue"); + store.delete(id); + await txnAsPromise(txn); + } } diff --git a/src/store/indexeddb-remote-backend.ts b/src/store/indexeddb-remote-backend.ts index 9c06105a1c5..67ab2ccd293 100644 --- a/src/store/indexeddb-remote-backend.ts +++ b/src/store/indexeddb-remote-backend.ts @@ -20,6 +20,7 @@ import { ISavedSync } from "./index"; import { IStartClientOpts } from "../client"; import { IStateEventWithRoomId, ISyncResponse } from ".."; import { IIndexedDBBackend, UserTuple } from "./indexeddb-backend"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; export class RemoteIndexedDBStoreBackend implements IIndexedDBBackend { private worker: Worker; @@ -133,6 +134,18 @@ export class RemoteIndexedDBStoreBackend implements IIndexedDBBackend { return this.doCmd('getUserPresenceEvents'); } + public async saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise { + return this.doCmd('saveToDeviceBatches', [batches]); + } + + public async getOldestToDeviceBatch(): Promise { + return this.doCmd('getOldestToDeviceBatch'); + } + + public async removeToDeviceBatch(id: number): Promise { + return this.doCmd('removeToDeviceBatch', [id]); + } + private ensureStarted(): Promise { if (this.startPromise === null) { this.worker = this.workerFactory(); diff --git a/src/store/indexeddb-store-worker.ts b/src/store/indexeddb-store-worker.ts index 0d37dbce935..ced77696176 100644 --- a/src/store/indexeddb-store-worker.ts +++ b/src/store/indexeddb-store-worker.ts @@ -103,6 +103,15 @@ export class IndexedDBStoreWorker { case 'storeClientOptions': prom = this.backend.storeClientOptions(msg.args[0]); break; + case 'saveToDeviceBatches': + prom = this.backend.saveToDeviceBatches(msg.args[0]); + break; + case 'getOldestToDeviceBatch': + prom = this.backend.getOldestToDeviceBatch(); + break; + case 'removeToDeviceBatch': + prom = this.backend.removeToDeviceBatch(msg.args[0]); + break; } if (prom === undefined) { diff --git a/src/store/indexeddb.ts b/src/store/indexeddb.ts index 09a85fd1b54..44f684bdf8f 100644 --- a/src/store/indexeddb.ts +++ b/src/store/indexeddb.ts @@ -27,6 +27,7 @@ import { IIndexedDBBackend } from "./indexeddb-backend"; import { ISyncResponse } from "../sync-accumulator"; import { TypedEventEmitter } from "../models/typed-event-emitter"; import { IStateEventWithRoomId } from "../@types/search"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; /** * This is an internal module. See {@link IndexedDBStore} for the public class. @@ -351,6 +352,18 @@ export class IndexedDBStore extends MemoryStore { this.localStorage.removeItem(pendingEventsKey(roomId)); } } + + public saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise { + return this.backend.saveToDeviceBatches(batches); + } + + public getOldestToDeviceBatch(): Promise { + return this.backend.getOldestToDeviceBatch(); + } + + public removeToDeviceBatch(id: number): Promise { + return this.backend.removeToDeviceBatch(id); + } } /** diff --git a/src/store/memory.ts b/src/store/memory.ts index cb49e425fdb..0ed43a5b5ac 100644 --- a/src/store/memory.ts +++ b/src/store/memory.ts @@ -30,6 +30,7 @@ import { ISavedSync, IStore } from "./index"; import { RoomSummary } from "../models/room-summary"; import { ISyncResponse } from "../sync-accumulator"; import { IStateEventWithRoomId } from "../@types/search"; +import { IndexedToDeviceBatch, ToDeviceBatchWithTxnId } from "../models/ToDeviceMessage"; function isValidFilterId(filterId: string): boolean { const isValidStr = typeof filterId === "string" && @@ -64,6 +65,8 @@ export class MemoryStore implements IStore { private oobMembers: Record = {}; // roomId: [member events] private pendingEvents: { [roomId: string]: Partial[] } = {}; private clientOptions = {}; + private pendingToDeviceBatches: IndexedToDeviceBatch[] = []; + private nextToDeviceBatchId = 0; constructor(opts: IOpts = {}) { this.localStorage = opts.localStorage; @@ -429,4 +432,26 @@ export class MemoryStore implements IStore { public async setPendingEvents(roomId: string, events: Partial[]): Promise { this.pendingEvents[roomId] = events; } + + public saveToDeviceBatches(batches: ToDeviceBatchWithTxnId[]): Promise { + for (const batch of batches) { + this.pendingToDeviceBatches.push({ + id: this.nextToDeviceBatchId++, + eventType: batch.eventType, + txnId: batch.txnId, + batch: batch.batch, + }); + } + return Promise.resolve(); + } + + public async getOldestToDeviceBatch(): Promise { + if (this.pendingToDeviceBatches.length === 0) return null; + return this.pendingToDeviceBatches[0]; + } + + public removeToDeviceBatch(id: number): Promise { + this.pendingToDeviceBatches = this.pendingToDeviceBatches.filter(batch => batch.id !== id); + return Promise.resolve(); + } } diff --git a/src/store/stub.ts b/src/store/stub.ts index c9fc57055fd..eb988a9733b 100644 --- a/src/store/stub.ts +++ b/src/store/stub.ts @@ -28,6 +28,7 @@ import { ISavedSync, IStore } from "./index"; import { RoomSummary } from "../models/room-summary"; import { ISyncResponse } from "../sync-accumulator"; import { IStateEventWithRoomId } from "../@types/search"; +import { IndexedToDeviceBatch, ToDeviceBatch } from "../models/ToDeviceMessage"; /** * Construct a stub store. This does no-ops on most store methods. @@ -270,4 +271,16 @@ export class StubStore implements IStore { public setPendingEvents(roomId: string, events: Partial[]): Promise { return Promise.resolve(); } + + public async saveToDeviceBatches(batch: ToDeviceBatch[]): Promise { + return Promise.resolve(); + } + + public getOldestToDeviceBatch(): Promise { + return Promise.resolve(null); + } + + public async removeToDeviceBatch(id: number): Promise { + return Promise.resolve(); + } } diff --git a/yarn.lock b/yarn.lock index 437517387ed..a8d843cceea 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4802,10 +4802,10 @@ matrix-events-sdk@^0.0.1-beta.7: resolved "https://registry.yarnpkg.com/matrix-events-sdk/-/matrix-events-sdk-0.0.1-beta.7.tgz#5ffe45eba1f67cc8d7c2377736c728b322524934" integrity sha512-9jl4wtWanUFSy2sr2lCjErN/oC8KTAtaeaozJtrgot1JiQcEI4Rda9OLgQ7nLKaqb4Z/QUx/fR3XpDzm5Jy1JA== -matrix-mock-request@^2.1.0: - version "2.1.0" - resolved "https://registry.yarnpkg.com/matrix-mock-request/-/matrix-mock-request-2.1.0.tgz#86f5b0ef846865d0767d3a8e64f5bcd6ca94c178" - integrity sha512-Cjpl3yP6h0yu5GKG89m1XZXZlm69Kg/qHV41N/t6SrQsgcfM3Bfavqx9YrtG0UnuXGy4bBSZIe1QiWVeFPZw1A== +matrix-mock-request@^2.1.1: + version "2.1.1" + resolved "https://registry.yarnpkg.com/matrix-mock-request/-/matrix-mock-request-2.1.1.tgz#a8fc03a2816464bb95445df4cc8885ac36786b23" + integrity sha512-CxdaUPRVB4o8JxTBMASstS2loRe+hlqeJu0Q7yyS1r36LkSSo/KAP4AuomsqxuKqaqYYnEJFJzkG0gOhxV7aqA== dependencies: expect "^28.1.0"