Skip to content

Commit

Permalink
Replace adhoc promise queueing (#19)
Browse files Browse the repository at this point in the history
I think we should be doing this more widely to deal with lifecycle
event overlap (disconnect/connect etc) but this is a start.
  • Loading branch information
microbit-matt-hillsdon authored Jul 29, 2024
1 parent 5943a10 commit 06289c9
Show file tree
Hide file tree
Showing 7 changed files with 195 additions and 128 deletions.
37 changes: 12 additions & 25 deletions lib/accelerometer-service.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { AccelerometerData, AccelerometerDataEvent } from "./accelerometer.js";
import { GattOperation, Service } from "./bluetooth-device-wrapper.js";
import { Service } from "./bluetooth-device-wrapper.js";
import { profile } from "./bluetooth-profile.js";
import { createGattOperationPromise } from "./bluetooth-utils.js";
import { BackgroundErrorEvent, DeviceError } from "./device.js";
import {
CharacteristicDataTarget,
Expand All @@ -14,7 +13,7 @@ export class AccelerometerService implements Service {
private accelerometerDataCharacteristic: BluetoothRemoteGATTCharacteristic,
private accelerometerPeriodCharacteristic: BluetoothRemoteGATTCharacteristic,
private dispatchTypedEvent: TypedServiceEventDispatcher,
private queueGattOperation: (gattOperation: GattOperation) => void,
private queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
) {
this.accelerometerDataCharacteristic.addEventListener(
"characteristicvaluechanged",
Expand All @@ -32,7 +31,7 @@ export class AccelerometerService implements Service {
static async createService(
gattServer: BluetoothRemoteGATTServer,
dispatcher: TypedServiceEventDispatcher,
queueGattOperation: (gattOperation: GattOperation) => void,
queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
listenerInit: boolean,
): Promise<AccelerometerService | undefined> {
let accelerometerService: BluetoothRemoteGATTService;
Expand Down Expand Up @@ -76,22 +75,16 @@ export class AccelerometerService implements Service {
}

async getData(): Promise<AccelerometerData> {
const { callback, gattOperationPromise } = createGattOperationPromise();
this.queueGattOperation({
callback,
operation: () => this.accelerometerDataCharacteristic.readValue(),
});
const dataView = (await gattOperationPromise) as DataView;
const dataView = await this.queueGattOperation(() =>
this.accelerometerDataCharacteristic.readValue(),
);
return this.dataViewToData(dataView);
}

async getPeriod(): Promise<number> {
const { callback, gattOperationPromise } = createGattOperationPromise();
this.queueGattOperation({
callback,
operation: () => this.accelerometerPeriodCharacteristic.readValue(),
});
const dataView = (await gattOperationPromise) as DataView;
const dataView = await this.queueGattOperation(() =>
this.accelerometerPeriodCharacteristic.readValue(),
);
return dataView.getUint16(0, true);
}

Expand All @@ -104,17 +97,11 @@ export class AccelerometerService implements Service {
// Values passed are rounded up to the allowed values on device.
// Documentation for allowed values looks wrong.
// https://lancaster-university.github.io/microbit-docs/resources/bluetooth/bluetooth_profile.html
const { callback, gattOperationPromise } = createGattOperationPromise();
const dataView = new DataView(new ArrayBuffer(2));
dataView.setUint16(0, value, true);
this.queueGattOperation({
callback,
operation: () =>
this.accelerometerPeriodCharacteristic.writeValueWithoutResponse(
dataView,
),
});
await gattOperationPromise;
return this.queueGattOperation(() =>
this.accelerometerDataCharacteristic.writeValueWithoutResponse(dataView),
);
}

async startNotifications(type: TypedServiceEvent): Promise<void> {
Expand Down
92 changes: 22 additions & 70 deletions lib/bluetooth-device-wrapper.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,13 @@ import { profile } from "./bluetooth-profile.js";
import { ButtonService } from "./button-service.js";
import { BoardVersion, DeviceError } from "./device.js";
import { Logging, NullLogging } from "./logging.js";
import { PromiseQueue } from "./promise-queue.js";
import {
ServiceConnectionEventMap,
TypedServiceEvent,
TypedServiceEventDispatcher,
} from "./service-events.js";

export interface GattOperationCallback {
resolve: (result: DataView | void) => void;
reject: (error: DeviceError) => void;
}

export interface GattOperation {
operation: () => Promise<DataView | void>;
callback: GattOperationCallback;
}

interface GattOperations {
busy: boolean;
queue: GattOperation[];
}

const deviceIdToWrapper: Map<string, BluetoothDeviceWrapper> = new Map();

const connectTimeoutDuration: number = 10000;
Expand Down Expand Up @@ -62,7 +48,7 @@ class ServiceInfo<T extends Service> {
private serviceFactory: (
gattServer: BluetoothRemoteGATTServer,
dispatcher: TypedServiceEventDispatcher,
queueGattOperation: (gattOperation: GattOperation) => void,
queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
listenerInit: boolean,
) => Promise<T | undefined>,
public events: TypedServiceEvent[],
Expand All @@ -75,7 +61,7 @@ class ServiceInfo<T extends Service> {
async createIfNeeded(
gattServer: BluetoothRemoteGATTServer,
dispatcher: TypedServiceEventDispatcher,
queueGattOperation: (gattOperation: GattOperation) => void,
queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
listenerInit: boolean,
): Promise<T | undefined> {
this.service =
Expand Down Expand Up @@ -132,11 +118,22 @@ export class BluetoothDeviceWrapper {

boardVersion: BoardVersion | undefined;

private gattOperations: GattOperations = {
busy: false,
queue: [],
private disconnectedRejectionErrorFactory = () => {
return new DeviceError({
code: "device-disconnected",
message: "Error processing gatt operations queue - device disconnected",
});
};

private gattOperations = new PromiseQueue({
abortCheck: () => {
if (!this.device.gatt?.connected) {
return this.disconnectedRejectionErrorFactory;
}
return undefined;
},
});

constructor(
public readonly device: BluetoothDevice,
private logging: Logging = new NullLogging(),
Expand Down Expand Up @@ -357,55 +354,10 @@ export class BluetoothDeviceWrapper {
}
}

private queueGattOperation(gattOperation: GattOperation) {
this.gattOperations.queue.push(gattOperation);
this.processGattOperationQueue();
}

private processGattOperationQueue = (): void => {
if (!this.device.gatt?.connected) {
// No longer connected. Drop queue.
this.clearGattQueueOnDisconnect();
return;
}
if (this.gattOperations.busy) {
// We will finish processing the current operation, then
// pick up processing the queue in the finally block.
return;
}
const gattOperation = this.gattOperations.queue.shift();
if (!gattOperation) {
return;
}
this.gattOperations.busy = true;
gattOperation
.operation()
.then((result) => {
gattOperation.callback.resolve(result);
})
.catch((err) => {
gattOperation.callback.reject(
new DeviceError({ code: "background-comms-error", message: err }),
);
this.logging.error("Error processing gatt operations queue", err);
})
.finally(() => {
this.gattOperations.busy = false;
this.processGattOperationQueue();
});
};

private clearGattQueueOnDisconnect() {
this.gattOperations.queue.forEach((op) => {
op.callback.reject(
new DeviceError({
code: "device-disconnected",
message:
"Error processing gatt operations queue - device disconnected",
}),
);
});
this.gattOperations = { busy: false, queue: [] };
private queueGattOperation<T>(action: () => Promise<T>): Promise<T> {
// Previously we wrapped rejections with:
// new DeviceError({ code: "background-comms-error", message: err }),
return this.gattOperations.add(action);
}

private createIfNeeded<T extends Service>(
Expand Down Expand Up @@ -441,7 +393,7 @@ export class BluetoothDeviceWrapper {

private disposeServices() {
this.serviceInfo.forEach((s) => s.dispose());
this.clearGattQueueOnDisconnect();
this.gattOperations.clear(this.disconnectedRejectionErrorFactory);
}
}

Expand Down
18 changes: 0 additions & 18 deletions lib/bluetooth-utils.ts

This file was deleted.

4 changes: 2 additions & 2 deletions lib/button-service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { GattOperation, Service } from "./bluetooth-device-wrapper.js";
import { Service } from "./bluetooth-device-wrapper.js";
import { profile } from "./bluetooth-profile.js";
import { ButtonEvent, ButtonState } from "./buttons.js";
import { BackgroundErrorEvent, DeviceError } from "./device.js";
Expand Down Expand Up @@ -29,7 +29,7 @@ export class ButtonService implements Service {
static async createService(
gattServer: BluetoothRemoteGATTServer,
dispatcher: TypedServiceEventDispatcher,
queueGattOperation: (gattOperation: GattOperation) => void,
queueGattOperation: <R>(action: () => Promise<R>) => Promise<R>,
listenerInit: boolean,
): Promise<ButtonService | undefined> {
let buttonService: BluetoothRemoteGATTService;
Expand Down
77 changes: 77 additions & 0 deletions lib/promise-queue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import { describe, expect, it } from "vitest";
import { PromiseQueue } from "./promise-queue.js";

describe("PromiseQueue", () => {
it("waits for previous items", async () => {
const sequence: number[] = [];
const queue = new PromiseQueue();
queue.add(async () => {
expect(sequence).toEqual([]);
sequence.push(1);
});
queue.add(async () => {
expect(sequence).toEqual([1]);
sequence.push(2);
});
expect(
await queue.add(async () => {
expect(sequence).toEqual([1, 2]);
sequence.push(3);
return 3;
}),
).toEqual(3);
expect(sequence).toEqual([1, 2, 3]);
});

it("copes with errors", async () => {
const queue = new PromiseQueue();
const sequence: (number | Error)[] = [];
const p1 = queue.add(() => {
sequence.push(1);
throw new Error("Oops");
});
const p2 = queue.add(() => {
sequence.push(2);
return Promise.resolve(2);
});
expect(await p2).toEqual(2);
await expect(p1).rejects.toThrow("Oops");
expect(sequence).toEqual([1, 2]);
});

it("clears", async () => {
const queue = new PromiseQueue();
const rejected: Promise<unknown>[] = [];
const p1 = queue.add(
() => new Promise((resolve) => setTimeout(resolve, 1000)),
);
const p2 = queue.add(
() => new Promise((resolve) => setTimeout(resolve, 1000)),
);
const p3 = queue.add(
() => new Promise((resolve) => setTimeout(resolve, 1000)),
);
p1.catch(() => rejected.push(p1));
p2.catch(() => rejected.push(p2));
p3.catch(() => rejected.push(p3));
queue.clear(() => new Error("Cleared!"));
await expect(p2).rejects.toThrow("Cleared!");
await expect(p3).rejects.toThrow("Cleared!");
expect(rejected).toEqual([p2, p3]);
});

it("detects abort", async () => {
let abort = false;
const queue = new PromiseQueue({
abortCheck: () => (abort ? () => new Error("Aborted") : undefined),
});
const p1 = queue.add(async () => {
abort = true;
});
const p2 = queue.add(async () => {
throw new Error("Does not happen");
});
expect(await p1).toBeUndefined();
await expect(p2).rejects.toThrow("Aborted");
});
});
Loading

0 comments on commit 06289c9

Please sign in to comment.