diff --git a/packages/connect-web-bench/README.md b/packages/connect-web-bench/README.md index 89848def3..9b0bfc36c 100644 --- a/packages/connect-web-bench/README.md +++ b/packages/connect-web-bench/README.md @@ -10,5 +10,5 @@ it like a web server would usually do. | code generator | bundle size | minified | compressed | |----------------|-------------------:|-----------------------:|---------------------:| -| connect | 112,346 b | 49,541 b | 13,333 b | +| connect | 112,346 b | 49,541 b | 13,335 b | | grpc-web | 414,906 b | 301,127 b | 53,279 b | diff --git a/packages/connect/src/protocol/async-iterable-story.spec.ts b/packages/connect/src/protocol/async-iterable-story.spec.ts index c6e93b510..ed458fb6f 100644 --- a/packages/connect/src/protocol/async-iterable-story.spec.ts +++ b/packages/connect/src/protocol/async-iterable-story.spec.ts @@ -160,7 +160,7 @@ describe("full story", function () { .then(() => writer.write({ value: "gamma", end: false })) .then(() => writer.write({ value: "delta", end: false })) .finally(() => { - void writer.close(); + writer.close(); }); const resp = await readAll(readerIt); @@ -194,22 +194,19 @@ describe("full story", function () { { value: "delta", end: false }, ]); }); - it("should throw if close is called on a writer that is already closed", function () { - void writer.close(); - - writer - .close() - .catch((e) => - expect(e).toEqual(new ConnectError("cannot close, already closed")) - ); + it("should allow close on a writer that is already closed", function () { + writer.close(); + writer.close(); }); it("should throw if write is called on a writer that is closed", function () { - void writer.close(); + writer.close(); writer .write({ value: "alpha", end: false }) .catch((e) => - expect(e).toEqual(new ConnectError("cannot write, already closed")) + expect(e).toEqual( + new Error("cannot write, WritableIterable already closed") + ) ); }); it("should correctly behave when consumer fails and throw is invoked", async function () { @@ -230,11 +227,11 @@ describe("full story", function () { )[] = []; try { // Iterate over the reader and purposely fail after the first read. - for (;;) { - const result = await readerIt[Symbol.asyncIterator]().next(); - resp.push(result.value as { end: false; value: string }); - throw "READER_ERROR"; - } + const itr = readerIt[Symbol.asyncIterator](); + const result = await itr.next(); + resp.push(result.value as { end: false; value: string }); + await itr.next(); + throw "READER_ERROR"; } catch (e) { // Verify we got the first send only and then verify we caught the expected error. expect(resp).toEqual([{ value: "alpha", end: false }]); @@ -265,11 +262,16 @@ describe("full story", function () { .then(() => fail("send was unexpectedly resolved.")) .catch((e) => expect(e).toBe("READER_ERROR")); - // The reader's internal writer is closed so any future reads will be rejected. + // The reader's internal writer is closed so any future reads should result in the + // done. readerIt[Symbol.asyncIterator]() .next() - .then(() => fail("reads were unexpectedly resolved")) - .catch((e) => expect(e).toBe("READER_ERROR")); + .then((result) => + expect(result).toEqual({ done: true, value: undefined }) + ) + .catch(() => + fail("expected successful done result but unexpectedly rejected") + ); }); }); diff --git a/packages/connect/src/protocol/async-iterable.spec.ts b/packages/connect/src/protocol/async-iterable.spec.ts index a291ec05a..2daba5328 100644 --- a/packages/connect/src/protocol/async-iterable.spec.ts +++ b/packages/connect/src/protocol/async-iterable.spec.ts @@ -14,6 +14,7 @@ import { createAsyncIterable, + createWritableIterable, makeIterableAbortable, pipe, pipeTo, @@ -1096,3 +1097,183 @@ describe("transforming asynchronous iterables", () => { }); }); }); + +describe("createWritableIterable()", function () { + it("works like a regular iterable on the happy path", async () => { + const wIterable = createWritableIterable(); + let readCount = 0; + const read = (async () => { + for await (const next of wIterable) { + expect(next).toBe(readCount); + readCount++; + } + })(); + const writCount = 5; + for (let i = 0; i < writCount; i++) { + await wIterable.write(i); + } + wIterable.close(); + await read; + expect(readCount).toEqual(writCount); + }); + it("write is interrupted when read fails", async () => { + const wIterable = createWritableIterable(); + const read = (async () => { + const itr = wIterable[Symbol.asyncIterator](); + const next = await itr.next(); + if (next.done === true) { + fail("expected at least one value"); + } else { + expect(await itr.throw?.(new Error("read failed"))).toEqual({ + done: true, + value: undefined, + }); + } + // All further calls to next should also result in done results. + expect(await itr.next()).toEqual({ + done: true, + value: undefined, + }); + })(); + await expectAsync(wIterable.write(1)).toBeRejected(); + await expectAsync(wIterable.write(2)).toBeRejected(); + await read; + }); + it("queues writes", async () => { + const wIterable = createWritableIterable(); + const writCount = 50; + const writes = []; + for (let i = 0; i < writCount; i++) { + writes.push(wIterable.write(i)); + } + let readCount = 0; + const read = (async () => { + for await (const next of wIterable) { + expect(next).toBe(readCount); + readCount++; + } + })(); + wIterable.close(); + await read; + expect(readCount).toEqual(writCount); + await Promise.all(writes); + }); + it("queues reads", async () => { + const wIterable = createWritableIterable(); + const writCount = 50; + const read = (async () => { + const itr = wIterable[Symbol.asyncIterator](); + const readPromises: Promise>[] = []; + for (let i = 0; i < writCount; i++) { + readPromises.push(itr.next()); + } + const reads = await Promise.all(readPromises); + expect(reads.find((r) => r.done === true)).toBeUndefined(); + expect(reads.map((r) => r.value)).toEqual([...Array(writCount).keys()]); + await expectAsync(itr.next()).toBeResolvedTo({ + done: true, + value: undefined, + }); + })(); + for (let i = 0; i < writCount; i++) { + await wIterable.write(i); + } + wIterable.close(); + await read; + }); + it("queues reads and writes", async () => { + const wIterable = createWritableIterable(); + const writCount = 50; + const read = (async () => { + const itr = wIterable[Symbol.asyncIterator](); + const readPromises: Promise>[] = []; + for (let i = 0; i < writCount; i++) { + readPromises.push(itr.next()); + } + const reads = await Promise.all(readPromises); + expect(reads.find((r) => r.done === true)).toBeUndefined(); + expect(reads.map((r) => r.value)).toEqual([...Array(writCount).keys()]); + await expectAsync(itr.next()).toBeResolvedTo({ + done: true, + value: undefined, + }); + })(); + const writes: Promise[] = []; + for (let i = 0; i < writCount; i++) { + writes.push(wIterable.write(i)); + } + wIterable.close(); + await Promise.all(writes); + await read; + }); + it("queued writes are rejected when reader throws", async () => { + const wIterable = createWritableIterable(); + const writes = []; + for (let i = 0; i < 50; i++) { + writes.push(wIterable.write(i)); + } + wIterable.close(); + const readError = new Error("read failed"); + const read = (async () => { + const it = wIterable[Symbol.asyncIterator](); + await it.throw?.(readError); + })(); + await read; + expect(await Promise.allSettled(writes)).toEqual( + new Array(50).fill({ status: "rejected", reason: readError }) + ); + }); + it("queued writes are rejected when reader calls return", async () => { + const wIterable = createWritableIterable(); + const writes = []; + for (let i = 0; i < 50; i++) { + writes.push(wIterable.write(i)); + } + wIterable.close(); + const read = (async () => { + const it = wIterable[Symbol.asyncIterator](); + await it.return?.(); + })(); + await read; + expect(await Promise.allSettled(writes)).toEqual( + new Array(50).fill({ + status: "rejected", + reason: new Error("cannot write, consumer called return"), + }) + ); + }); + it("throw before first write stops writes", async () => { + const wIterable = createWritableIterable(); + const readError = new Error("read failed"); + const read = (async () => { + const it = wIterable[Symbol.asyncIterator](); + await it.throw?.(readError); + })(); + await expectAsync(wIterable.write(1)).toBeRejectedWith(readError); + await read; + }); + it("resolves already written value and rejects future writes on return", async () => { + const wIterable = createWritableIterable(); + const read = (async () => { + const itr = wIterable[Symbol.asyncIterator](); + const next = await itr.next(); + if (next.done === true) { + fail("expected at least one value"); + } else { + expect(next.value).toEqual(1); + expect(await itr.return?.()).toEqual({ + done: true, + value: undefined, + }); + } + // All further calls to next should also result in done results. + expect(await itr.next()).toEqual({ + done: true, + value: undefined, + }); + })(); + await expectAsync(wIterable.write(1)).toBeResolved(); + await expectAsync(wIterable.write(2)).toBeRejected(); + await read; + }); +}); diff --git a/packages/connect/src/protocol/async-iterable.ts b/packages/connect/src/protocol/async-iterable.ts index 7dee67908..b258c1b7e 100644 --- a/packages/connect/src/protocol/async-iterable.ts +++ b/packages/connect/src/protocol/async-iterable.ts @@ -1278,107 +1278,140 @@ export function makeIterableAbortable( }; } -// QueueElement represents an element in the writer queue, which consists of the payload being written as well as an -// associated resolve function to be invoked/resolved when the written element is read from the queue via the async -// iterator. -interface QueueElement { - payload: IteratorResult; - resolve?: () => void; - reject?: (reason?: Error) => void; -} - -// WritableIterable represents an AsyncIterable that is able to be written to. +/** + * WritableIterable is an AsyncIterable that can be used + * to supply values imperatively to the consumer of the + * AsyncIterable. + */ export interface WritableIterable extends AsyncIterable { + /** + * Makes the payload available to the consumer of the + * iterable. + */ write: (payload: T) => Promise; - close: () => Promise; - isClosed: () => boolean; + /** + * Closes the writer indicating to its consumer that no further + * payloads will be received. + * + * Any writes that happen after close is called will return an error. + */ + close: () => void; } -// Create an instance of a WritableIterable of type T +/** + * Create a new WritableIterable. + */ export function createWritableIterable(): WritableIterable { - let queue: QueueElement[] = []; - // Represents the resolve function of the promise returned by the async iterator if no values exist in the queue at - // the time of request. It is resolved when a value is successfully received into the queue. - let queueResolve: ((val: IteratorResult) => void) | undefined; - let error: Error | undefined = undefined; - - const process = async (payload: IteratorResult) => { - // // If the writer's internal error was set, then reject any attempts at processing a payload. - if (error) { - return Promise.reject(String(error)); - } - // If there is an iterator resolver then a consumer of the async iterator is waiting on a value. So resolve that - // promise with the new value being sent and return a promise that is immediately resolved - if (queueResolve) { - queueResolve(payload); - queueResolve = undefined; - return Promise.resolve(); - } - const elem: QueueElement = { - payload, - }; - const prom = new Promise((resolve, reject) => { - elem.resolve = resolve; - elem.reject = reject; - }); - // Otherwise no one is waiting on a value yet so add it to the queue and return a promise that will be resolved - // when someone reads this value - queue.push(elem); - - return prom; - }; + // We start with two queues to capture the read and write attempts. + // + // The writes and reads each check of their counterpart is + // already available and either interact/add themselves to the queue. + const readQueue: ((result: IteratorResult) => void)[] = []; + const writeQueue: T[] = []; + let err: unknown = undefined; + let nextResolve: () => void; + let nextReject: (err: unknown) => void; + let nextPromise = new Promise((resolve, reject) => { + nextResolve = resolve; + nextReject = reject; + }); let closed = false; + // drain the readQueue in case of error/writer is closed by sending a + // done result. + function drain() { + for (const next of readQueue.splice(0, readQueue.length)) { + next({ done: true, value: undefined }); + } + } return { - isClosed() { - return closed; + close() { + closed = true; + drain(); }, - async write(payload) { + async write(payload: T) { if (closed) { - throw new ConnectError("cannot write, already closed"); + throw err ?? new Error("cannot write, WritableIterable already closed"); } - return process({ value: payload, done: false }); - }, - async close() { - if (closed) { - throw new ConnectError("cannot close, already closed"); + const read = readQueue.shift(); + if (read === undefined) { + // We didn't find a pending read so we add the payload to the write queue. + writeQueue.push(payload); + } else { + // We found a pending read so we respond with the payload. + read({ done: false, value: payload }); + if (readQueue.length > 0) { + // If there are more in the read queue we can mark the write as complete. + // as the error reporting is not guaranteed to be sequential and therefore cannot + // to linked to a specific write. + return; + } + } + // We await the next call for as many times as there are items in the queue + 1 + // + // If there are no items in the write queue that means write happened and we just have + // to wait for one more call likewise if we are the nth write in the queue we + // have to wait for n writes to complete and one more. + const limit = writeQueue.length + 1; + for (let i = 0; i < limit; i++) { + await nextPromise; } - closed = true; - return process({ value: undefined, done: true }); }, [Symbol.asyncIterator](): AsyncIterator { return { - next: async () => { - // If the writer's internal error was set, then reject any attempts at processing a payload. - if (error) { - return Promise.reject(String(error)); - } - const elem = queue.shift(); - if (!elem) { - // We don't have any payloads ready to be sent (i.e. the consumer of the iterator is consuming faster than - // senders are sending). So return a Promise ensuring we'll resolve it when we get something. - return new Promise>((resolve) => { - queueResolve = resolve; - }); + next() { + // Resolve the nextPromise to indicate + // pending writes that a read attempt has been made + // after their write. + // + // We also need to reset the promise for future writes. + nextResolve(); + nextPromise = new Promise((resolve, reject) => { + nextResolve = resolve; + nextReject = reject; + }); + const write = writeQueue.shift(); + if (write !== undefined) { + // We found a pending write so response with the payload. + return Promise.resolve({ done: false, value: write }); } - // Resolve the send promise on a successful send/close. - if (elem.resolve) { - elem.resolve(); + if (closed) { + return Promise.resolve({ done: true, value: undefined }); } - return elem.payload; + // We return a promise immediately that is either resolved/rejected + // as writes happen. + let readResolve: (result: IteratorResult) => void; + const readPromise = new Promise>( + (resolve) => (readResolve = resolve) + ); + readQueue.push(readResolve!); // eslint-disable-line @typescript-eslint/no-non-null-assertion + return readPromise; }, - throw: async (e: Error) => { - error = e; - // The reader of this iterator has failed with the given error. So anything left in the queue should be - // drained and rejected with the given error - for (const item of queue) { - if (item.reject) { - item.reject(e); - } - } - queue = []; - return new Promise>((resolve) => { - resolve({ value: undefined, done: true }); + throw(throwErr: unknown) { + err = throwErr; + closed = true; + writeQueue.splice(0, writeQueue.length); + nextPromise.catch(() => { + // To make sure that the nextPromise is always resolved. + }); + // This will reject all pending writes. + nextReject(err); + drain(); + return Promise.resolve({ done: true, value: undefined }); + }, + return() { + closed = true; + writeQueue.splice(0, writeQueue.length); + // Resolve once for the write awaiting confirmation. + nextResolve(); + // Reject all future writes. + nextPromise = Promise.reject( + new Error("cannot write, consumer called return") + ); + nextPromise.catch(() => { + // To make sure that the nextPromise is always resolved. }); + drain(); + return Promise.resolve({ done: true, value: undefined }); }, }; },