From 1b6f5d9500587ec6b984d5e91cc88d81230029dd Mon Sep 17 00:00:00 2001 From: Matt Hillsdon Date: Fri, 26 Jul 2024 15:27:10 +0100 Subject: [PATCH] Replace adhoc promise queueing I think we should be doing this more widely to deal with lifecycle event overlap (disconnect/connect etc) but this is a start. --- lib/accelerometer-service.ts | 37 +++++-------- lib/bluetooth-device-wrapper.ts | 92 ++++++++------------------------- lib/bluetooth-utils.ts | 18 ------- lib/button-service.ts | 4 +- lib/promise-queue.test.ts | 77 +++++++++++++++++++++++++++ lib/promise-queue.ts | 75 +++++++++++++++++++++++++++ lib/usb.ts | 20 +++---- 7 files changed, 195 insertions(+), 128 deletions(-) delete mode 100644 lib/bluetooth-utils.ts create mode 100644 lib/promise-queue.test.ts create mode 100644 lib/promise-queue.ts diff --git a/lib/accelerometer-service.ts b/lib/accelerometer-service.ts index d513570..6478cb1 100644 --- a/lib/accelerometer-service.ts +++ b/lib/accelerometer-service.ts @@ -1,7 +1,6 @@ import { AccelerometerData, AccelerometerDataEvent } from "./accelerometer.js"; -import { GattOperation, Service } from "./bluetooth-device-wrapper.js"; +import { Service } from "./bluetooth-device-wrapper.js"; import { profile } from "./bluetooth-profile.js"; -import { createGattOperationPromise } from "./bluetooth-utils.js"; import { BackgroundErrorEvent, DeviceError } from "./device.js"; import { CharacteristicDataTarget, @@ -14,7 +13,7 @@ export class AccelerometerService implements Service { private accelerometerDataCharacteristic: BluetoothRemoteGATTCharacteristic, private accelerometerPeriodCharacteristic: BluetoothRemoteGATTCharacteristic, private dispatchTypedEvent: TypedServiceEventDispatcher, - private queueGattOperation: (gattOperation: GattOperation) => void, + private queueGattOperation: (action: () => Promise) => Promise, ) { this.accelerometerDataCharacteristic.addEventListener( "characteristicvaluechanged", @@ -32,7 +31,7 @@ export class AccelerometerService implements Service { static async createService( gattServer: BluetoothRemoteGATTServer, dispatcher: TypedServiceEventDispatcher, - queueGattOperation: (gattOperation: GattOperation) => void, + queueGattOperation: (action: () => Promise) => Promise, listenerInit: boolean, ): Promise { let accelerometerService: BluetoothRemoteGATTService; @@ -76,22 +75,16 @@ export class AccelerometerService implements Service { } async getData(): Promise { - const { callback, gattOperationPromise } = createGattOperationPromise(); - this.queueGattOperation({ - callback, - operation: () => this.accelerometerDataCharacteristic.readValue(), - }); - const dataView = (await gattOperationPromise) as DataView; + const dataView = await this.queueGattOperation(() => + this.accelerometerDataCharacteristic.readValue(), + ); return this.dataViewToData(dataView); } async getPeriod(): Promise { - const { callback, gattOperationPromise } = createGattOperationPromise(); - this.queueGattOperation({ - callback, - operation: () => this.accelerometerPeriodCharacteristic.readValue(), - }); - const dataView = (await gattOperationPromise) as DataView; + const dataView = await this.queueGattOperation(() => + this.accelerometerPeriodCharacteristic.readValue(), + ); return dataView.getUint16(0, true); } @@ -104,17 +97,11 @@ export class AccelerometerService implements Service { // Values passed are rounded up to the allowed values on device. // Documentation for allowed values looks wrong. // https://lancaster-university.github.io/microbit-docs/resources/bluetooth/bluetooth_profile.html - const { callback, gattOperationPromise } = createGattOperationPromise(); const dataView = new DataView(new ArrayBuffer(2)); dataView.setUint16(0, value, true); - this.queueGattOperation({ - callback, - operation: () => - this.accelerometerPeriodCharacteristic.writeValueWithoutResponse( - dataView, - ), - }); - await gattOperationPromise; + return this.queueGattOperation(() => + this.accelerometerDataCharacteristic.writeValueWithoutResponse(dataView), + ); } async startNotifications(type: TypedServiceEvent): Promise { diff --git a/lib/bluetooth-device-wrapper.ts b/lib/bluetooth-device-wrapper.ts index 8f2dba7..7a5b1c7 100644 --- a/lib/bluetooth-device-wrapper.ts +++ b/lib/bluetooth-device-wrapper.ts @@ -9,27 +9,13 @@ import { profile } from "./bluetooth-profile.js"; import { ButtonService } from "./button-service.js"; import { BoardVersion, DeviceError } from "./device.js"; import { Logging, NullLogging } from "./logging.js"; +import { PromiseQueue } from "./promise-queue.js"; import { ServiceConnectionEventMap, TypedServiceEvent, TypedServiceEventDispatcher, } from "./service-events.js"; -export interface GattOperationCallback { - resolve: (result: DataView | void) => void; - reject: (error: DeviceError) => void; -} - -export interface GattOperation { - operation: () => Promise; - callback: GattOperationCallback; -} - -interface GattOperations { - busy: boolean; - queue: GattOperation[]; -} - const deviceIdToWrapper: Map = new Map(); const connectTimeoutDuration: number = 10000; @@ -62,7 +48,7 @@ class ServiceInfo { private serviceFactory: ( gattServer: BluetoothRemoteGATTServer, dispatcher: TypedServiceEventDispatcher, - queueGattOperation: (gattOperation: GattOperation) => void, + queueGattOperation: (action: () => Promise) => Promise, listenerInit: boolean, ) => Promise, public events: TypedServiceEvent[], @@ -75,7 +61,7 @@ class ServiceInfo { async createIfNeeded( gattServer: BluetoothRemoteGATTServer, dispatcher: TypedServiceEventDispatcher, - queueGattOperation: (gattOperation: GattOperation) => void, + queueGattOperation: (action: () => Promise) => Promise, listenerInit: boolean, ): Promise { this.service = @@ -125,11 +111,22 @@ export class BluetoothDeviceWrapper { boardVersion: BoardVersion | undefined; - private gattOperations: GattOperations = { - busy: false, - queue: [], + private disconnectedRejectionErrorFactory = () => { + return new DeviceError({ + code: "device-disconnected", + message: "Error processing gatt operations queue - device disconnected", + }); }; + private gattOperations = new PromiseQueue({ + abortCheck: () => { + if (!this.device.gatt?.connected) { + return this.disconnectedRejectionErrorFactory; + } + return undefined; + }, + }); + constructor( public readonly device: BluetoothDevice, private logging: Logging = new NullLogging(), @@ -340,55 +337,10 @@ export class BluetoothDeviceWrapper { } } - private queueGattOperation(gattOperation: GattOperation) { - this.gattOperations.queue.push(gattOperation); - this.processGattOperationQueue(); - } - - private processGattOperationQueue = (): void => { - if (!this.device.gatt?.connected) { - // No longer connected. Drop queue. - this.clearGattQueueOnDisconnect(); - return; - } - if (this.gattOperations.busy) { - // We will finish processing the current operation, then - // pick up processing the queue in the finally block. - return; - } - const gattOperation = this.gattOperations.queue.shift(); - if (!gattOperation) { - return; - } - this.gattOperations.busy = true; - gattOperation - .operation() - .then((result) => { - gattOperation.callback.resolve(result); - }) - .catch((err) => { - gattOperation.callback.reject( - new DeviceError({ code: "background-comms-error", message: err }), - ); - this.logging.error("Error processing gatt operations queue", err); - }) - .finally(() => { - this.gattOperations.busy = false; - this.processGattOperationQueue(); - }); - }; - - private clearGattQueueOnDisconnect() { - this.gattOperations.queue.forEach((op) => { - op.callback.reject( - new DeviceError({ - code: "device-disconnected", - message: - "Error processing gatt operations queue - device disconnected", - }), - ); - }); - this.gattOperations = { busy: false, queue: [] }; + private queueGattOperation(action: () => Promise): Promise { + // Previously we wrapped rejections with: + // new DeviceError({ code: "background-comms-error", message: err }), + return this.gattOperations.add(action); } private createIfNeeded( @@ -424,7 +376,7 @@ export class BluetoothDeviceWrapper { private disposeServices() { this.serviceInfo.forEach((s) => s.dispose()); - this.clearGattQueueOnDisconnect(); + this.gattOperations.clear(this.disconnectedRejectionErrorFactory); } } diff --git a/lib/bluetooth-utils.ts b/lib/bluetooth-utils.ts deleted file mode 100644 index d6057c1..0000000 --- a/lib/bluetooth-utils.ts +++ /dev/null @@ -1,18 +0,0 @@ -import { GattOperationCallback } from "./bluetooth-device-wrapper.js"; - -export const createGattOperationPromise = (): { - callback: GattOperationCallback; - gattOperationPromise: Promise; -} => { - let resolve: (result: DataView | void) => void; - let reject: () => void; - const gattOperationPromise = new Promise((res, rej) => { - resolve = res; - reject = rej; - }); - const callback = { - resolve: resolve!, - reject: reject!, - }; - return { callback, gattOperationPromise }; -}; diff --git a/lib/button-service.ts b/lib/button-service.ts index e414d9f..f1591ba 100644 --- a/lib/button-service.ts +++ b/lib/button-service.ts @@ -1,4 +1,4 @@ -import { GattOperation, Service } from "./bluetooth-device-wrapper.js"; +import { Service } from "./bluetooth-device-wrapper.js"; import { profile } from "./bluetooth-profile.js"; import { ButtonEvent, ButtonState } from "./buttons.js"; import { BackgroundErrorEvent, DeviceError } from "./device.js"; @@ -29,7 +29,7 @@ export class ButtonService implements Service { static async createService( gattServer: BluetoothRemoteGATTServer, dispatcher: TypedServiceEventDispatcher, - queueGattOperation: (gattOperation: GattOperation) => void, + queueGattOperation: (action: () => Promise) => Promise, listenerInit: boolean, ): Promise { let buttonService: BluetoothRemoteGATTService; diff --git a/lib/promise-queue.test.ts b/lib/promise-queue.test.ts new file mode 100644 index 0000000..dcbafae --- /dev/null +++ b/lib/promise-queue.test.ts @@ -0,0 +1,77 @@ +import { describe, expect, it } from "vitest"; +import { PromiseQueue } from "./promise-queue.js"; + +describe("PromiseQueue", () => { + it("waits for previous items", async () => { + const sequence: number[] = []; + const queue = new PromiseQueue(); + queue.add(async () => { + expect(sequence).toEqual([]); + sequence.push(1); + }); + queue.add(async () => { + expect(sequence).toEqual([1]); + sequence.push(2); + }); + expect( + await queue.add(async () => { + expect(sequence).toEqual([1, 2]); + sequence.push(3); + return 3; + }), + ).toEqual(3); + expect(sequence).toEqual([1, 2, 3]); + }); + + it("copes with errors", async () => { + const queue = new PromiseQueue(); + const sequence: (number | Error)[] = []; + const p1 = queue.add(() => { + sequence.push(1); + throw new Error("Oops"); + }); + const p2 = queue.add(() => { + sequence.push(2); + return Promise.resolve(2); + }); + expect(await p2).toEqual(2); + await expect(p1).rejects.toThrow("Oops"); + expect(sequence).toEqual([1, 2]); + }); + + it("clears", async () => { + const queue = new PromiseQueue(); + const rejected: Promise[] = []; + const p1 = queue.add( + () => new Promise((resolve) => setTimeout(resolve, 1000)), + ); + const p2 = queue.add( + () => new Promise((resolve) => setTimeout(resolve, 1000)), + ); + const p3 = queue.add( + () => new Promise((resolve) => setTimeout(resolve, 1000)), + ); + p1.catch(() => rejected.push(p1)); + p2.catch(() => rejected.push(p2)); + p3.catch(() => rejected.push(p3)); + queue.clear(() => new Error("Cleared!")); + await expect(p2).rejects.toThrow("Cleared!"); + await expect(p3).rejects.toThrow("Cleared!"); + expect(rejected).toEqual([p2, p3]); + }); + + it("detects abort", async () => { + let abort = false; + const queue = new PromiseQueue({ + abortCheck: () => (abort ? () => new Error("Aborted") : undefined), + }); + const p1 = queue.add(async () => { + abort = true; + }); + const p2 = queue.add(async () => { + throw new Error("Does not happen"); + }); + expect(await p1).toBeUndefined(); + await expect(p2).rejects.toThrow("Aborted"); + }); +}); diff --git a/lib/promise-queue.ts b/lib/promise-queue.ts new file mode 100644 index 0000000..2a0eb64 --- /dev/null +++ b/lib/promise-queue.ts @@ -0,0 +1,75 @@ +interface QueueEntry { + action: () => Promise; + resolve: (v: T) => void; + reject: (r: any) => void; +} + +interface Options { + /** + * If we should clear the queue return a function to create errors to reject all promises. + * Otherwise return undefined. Called before processing each entry. + */ + abortCheck?: () => (() => Error) | undefined; +} + +export class PromiseQueue { + private busy: boolean = false; + private entries: QueueEntry[] = []; + private abortCheck: () => (() => Error) | undefined; + + constructor(options: Options = {}) { + this.abortCheck = options.abortCheck ?? (() => undefined); + } + + /** + * Queue an action. + * + * @param action Async action to perform. + * @returns A promise that resolves when all prior added actions and this action have been performed. + */ + public add(action: () => Promise): Promise { + return new Promise((resolve, reject) => { + const entry: QueueEntry = { + resolve, + reject, + action, + }; + this.entries.push(entry); + if (!this.busy) { + void this.processQueue(); + } + }); + } + + private async processQueue(): Promise { + const rejection = this.abortCheck(); + if (rejection) { + this.clear(rejection); + return; + } + const entry = this.entries.shift(); + if (!entry) { + return; + } + this.busy = true; + try { + entry.resolve(await entry.action()); + } catch (e) { + entry.reject(e); + } + this.busy = false; + return this.processQueue(); + } + + /** + * Skips any queued actions that aren't in progress and rejects their + * promises with errors created with the supplied function. + */ + public clear(rejection: () => Error) { + const entries = this.entries; + this.entries = []; + entries.forEach((e) => { + e.reject(rejection()); + }); + } +} diff --git a/lib/usb.ts b/lib/usb.ts index 6bdb83e..25b76ff 100644 --- a/lib/usb.ts +++ b/lib/usb.ts @@ -22,6 +22,7 @@ import { } from "./device.js"; import { TypedEventTarget } from "./events.js"; import { Logging, NullLogging } from "./logging.js"; +import { PromiseQueue } from "./promise-queue.js"; import { DAPWrapper } from "./usb-device-wrapper.js"; import { PartialFlashing } from "./usb-partial-flashing.js"; @@ -62,7 +63,8 @@ export class MicrobitWebUSBConnection private connection: DAPWrapper | undefined; private serialState: boolean = false; - private serialStateChange: Promise | undefined; + + private serialStateChangeQueue = new PromiseQueue(); private serialListener = (data: string) => { this.dispatchTypedEvent("serialdata", new SerialDataEvent(data)); @@ -287,10 +289,7 @@ export class MicrobitWebUSBConnection } private async startSerialInternal() { - const prev = this.serialStateChange; - this.serialStateChange = (async () => { - await prev; - + return this.serialStateChangeQueue.add(async () => { if (!this.connection || this.serialState) { return; } @@ -307,22 +306,17 @@ export class MicrobitWebUSBConnection .finally(() => { this.serialState = false; }); - })(); - await this.serialStateChange; + }); } private async stopSerialInternal() { - const prev = this.serialStateChange; - this.serialStateChange = (async () => { - await prev; - + return this.serialStateChangeQueue.add(async () => { if (!this.connection || !this.serialState) { return; } this.connection.stopSerial(this.serialListener); this.dispatchTypedEvent("serialreset", new SerialResetEvent()); - })(); - await this.serialStateChange; + }); } async disconnect(): Promise {