From c8945d7c538defdf770e9119bae8a42ea5743815 Mon Sep 17 00:00:00 2001 From: Sri Krishna Paritala Date: Thu, 13 Jul 2023 09:16:28 -0600 Subject: [PATCH 1/9] Add basic tests for WritableIterable --- .../src/protocol/async-iterable.spec.ts | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/packages/connect/src/protocol/async-iterable.spec.ts b/packages/connect/src/protocol/async-iterable.spec.ts index 9be720acc..44299515a 100644 --- a/packages/connect/src/protocol/async-iterable.spec.ts +++ b/packages/connect/src/protocol/async-iterable.spec.ts @@ -14,6 +14,7 @@ import { createAsyncIterable, + createWritableIterable, makeIterableAbortable, pipe, pipeTo, @@ -1071,3 +1072,57 @@ describe("transforming asynchronous iterables", () => { }); }); }); + +describe("createWritableIterable()", function () { + it("works like a regular iterable on the happy path", async () => { + const wIterable = createWritableIterable(); + let readCount = 0; + const read = (async () => { + for await (const next of wIterable) { + expect(next).toBe(readCount); + readCount++; + } + })(); + const writCount = 5; + for (let i = 0; i < writCount; i++) { + await wIterable.write(i); + } + await wIterable.close(); + await read; + expect(readCount).toEqual(writCount); + expect(wIterable.isClosed()).toBe(true); + }); + it("write is interrupted when read fails", async () => { + const wIterable = createWritableIterable(); + (async () => { + const itr = wIterable[Symbol.asyncIterator](); + const next = await itr.next(); + if (next.done === true) { + fail("expected at least one value"); + } else { + await itr.throw?.(new Error("read failed")); + } + })(); + // Ideally the first one has to be rejected, but there may not be a way to do that. + await expectAsync(wIterable.write(1)).toBeResolved(); + await expectAsync(wIterable.write(2)).toBeRejected(); + }); + it("queues writes", async () => { + const wIterable = createWritableIterable(); + const writCount = 50; + for (let i = 0; i < writCount; i++) { + wIterable.write(i); + } + let readCount = 0; + const read = (async () => { + for await (const next of wIterable) { + expect(next).toBe(readCount); + readCount++; + } + })(); + await wIterable.close(); + await read; + expect(readCount).toEqual(writCount); + expect(wIterable.isClosed()).toBe(true); + }); +}); From f1cb2bc692e6989324478f665bc9ab657e5ac4b1 Mon Sep 17 00:00:00 2001 From: Sri Krishna Paritala Date: Wed, 19 Jul 2023 18:06:55 -0400 Subject: [PATCH 2/9] Queued implementation --- .../src/protocol/async-iterable.spec.ts | 64 +++++++- .../connect/src/protocol/async-iterable.ts | 139 ++++++++---------- 2 files changed, 120 insertions(+), 83 deletions(-) diff --git a/packages/connect/src/protocol/async-iterable.spec.ts b/packages/connect/src/protocol/async-iterable.spec.ts index 44299515a..419c0e0be 100644 --- a/packages/connect/src/protocol/async-iterable.spec.ts +++ b/packages/connect/src/protocol/async-iterable.spec.ts @@ -1100,12 +1100,20 @@ describe("createWritableIterable()", function () { if (next.done === true) { fail("expected at least one value"); } else { - await itr.throw?.(new Error("read failed")); + expect(await itr.throw?.(new Error("read failed"))).toEqual({ + done: true, + value: undefined, + }); } + // All further calls to next should also result in done results. + expect(await itr.next()).toEqual({ + done: true, + value: undefined, + }); })(); - // Ideally the first one has to be rejected, but there may not be a way to do that. - await expectAsync(wIterable.write(1)).toBeResolved(); + await expectAsync(wIterable.write(1)).toBeRejected(); await expectAsync(wIterable.write(2)).toBeRejected(); + expect(wIterable.isClosed()).toBe(true); }); it("queues writes", async () => { const wIterable = createWritableIterable(); @@ -1125,4 +1133,54 @@ describe("createWritableIterable()", function () { expect(readCount).toEqual(writCount); expect(wIterable.isClosed()).toBe(true); }); + xit("queues reads", async () => { + const wIterable = createWritableIterable(); + const writCount = 50; + const read = (async () => { + const itr = wIterable[Symbol.asyncIterator](); + const readPromises: Promise>[] = []; + for (let i = 0; i < writCount; i++) { + readPromises.push(itr.next()); + } + const reads = await Promise.all(readPromises); + expect(reads.find((r) => r.done === true)).toBeUndefined(); + expect(reads.map((r) => r.value)).toEqual([...Array(writCount).keys()]); + await expectAsync(itr.next()).toBeResolvedTo({ + done: true, + value: undefined, + }); + })(); + for (let i = 0; i < writCount; i++) { + await wIterable.write(i); + } + await wIterable.close(); + await read; + expect(wIterable.isClosed()).toBe(true); + }); + it("queues reads and writes", async () => { + const wIterable = createWritableIterable(); + const writCount = 50; + const read = (async () => { + const itr = wIterable[Symbol.asyncIterator](); + const readPromises: Promise>[] = []; + for (let i = 0; i < writCount; i++) { + readPromises.push(itr.next()); + } + const reads = await Promise.all(readPromises); + expect(reads.find((r) => r.done === true)).toBeUndefined(); + expect(reads.map((r) => r.value)).toEqual([...Array(writCount).keys()]); + await expectAsync(itr.next()).toBeResolvedTo({ + done: true, + value: undefined, + }); + })(); + const writes: Promise[] = []; + for (let i = 0; i < writCount; i++) { + writes.push(wIterable.write(i)); + } + await wIterable.close(); + await Promise.all(writes); + await read; + expect(wIterable.isClosed()).toBe(true); + }); }); diff --git a/packages/connect/src/protocol/async-iterable.ts b/packages/connect/src/protocol/async-iterable.ts index fd9341862..07296ea99 100644 --- a/packages/connect/src/protocol/async-iterable.ts +++ b/packages/connect/src/protocol/async-iterable.ts @@ -1283,107 +1283,86 @@ export function makeIterableAbortable( }; } -// QueueElement represents an element in the writer queue, which consists of the payload being written as well as an -// associated resolve function to be invoked/resolved when the written element is read from the queue via the async -// iterator. -interface QueueElement { - payload: IteratorResult; - resolve?: () => void; - reject?: (reason?: Error) => void; -} - -// WritableIterable represents an AsyncIterable that is able to be written to. +/** + * WritableIterable is an AsyncIterable that can be used + * to supply values imperatively to the reader of the + * AsyncIterable. + */ export interface WritableIterable extends AsyncIterable { write: (payload: T) => Promise; close: () => Promise; isClosed: () => boolean; } -// Create an instance of a WritableIterable of type T +/** + * Create a new WritableIterable. + */ export function createWritableIterable(): WritableIterable { - let queue: QueueElement[] = []; - // Represents the resolve function of the promise returned by the async iterator if no values exist in the queue at - // the time of request. It is resolved when a value is successfully received into the queue. - let queueResolve: ((val: IteratorResult) => void) | undefined; - let error: Error | undefined = undefined; - - const process = async (payload: IteratorResult) => { - // // If the writer's internal error was set, then reject any attempts at processing a payload. - if (error) { - return Promise.reject(String(error)); - } - // If there is an iterator resolver then a consumer of the async iterator is waiting on a value. So resolve that - // promise with the new value being sent and return a promise that is immediately resolved - if (queueResolve) { - queueResolve(payload); - queueResolve = undefined; - return Promise.resolve(); - } - const elem: QueueElement = { - payload, - }; - const prom = new Promise((resolve, reject) => { - elem.resolve = resolve; - elem.reject = reject; - }); - // Otherwise no one is waiting on a value yet so add it to the queue and return a promise that will be resolved - // when someone reads this value - queue.push(elem); - - return prom; - }; + const readQueue: ((result: IteratorResult) => void)[] = []; + const writeQueue: T[] = []; + let err: unknown = undefined; + let nextResolve: () => void; + let nextReject: (err: unknown) => void; + let nextPromise = new Promise((resolve, reject) => { + nextResolve = resolve; + nextReject = reject; + }); let closed = false; + function drain() { + for (let next of readQueue.splice(0, readQueue.length)) { + next({ done: true, value: undefined }); + } + } return { + async close() { + closed = true; + drain(); + }, isClosed() { return closed; }, - async write(payload) { + async write(payload: T) { if (closed) { - throw new ConnectError("cannot write, already closed"); + throw err ?? new Error("write failed: WritableIterable is closed"); } - return process({ value: payload, done: false }); - }, - async close() { - if (closed) { - throw new ConnectError("cannot close, already closed"); + const read = readQueue.shift(); + if (read === undefined) { + writeQueue.push(payload); + } else { + read({ done: false, value: payload }); } - closed = true; - return process({ value: undefined, done: true }); + await nextPromise; }, [Symbol.asyncIterator](): AsyncIterator { return { - next: async () => { - // If the writer's internal error was set, then reject any attempts at processing a payload. - if (error) { - return Promise.reject(String(error)); - } - const elem = queue.shift(); - if (!elem) { - // We don't have any payloads ready to be sent (i.e. the consumer of the iterator is consuming faster than - // senders are sending). So return a Promise ensuring we'll resolve it when we get something. - return new Promise>((resolve) => { - queueResolve = resolve; - }); + next() { + nextResolve(); + nextPromise = new Promise((resolve, reject) => { + nextResolve = resolve; + nextReject = reject; + }); + const write = writeQueue.shift(); + if (write !== undefined) { + return Promise.resolve({ done: false, value: write }); } - // Resolve the send promise on a successful send/close. - if (elem.resolve) { - elem.resolve(); + if (closed) { + return Promise.resolve({ done: true, value: undefined }); } - return elem.payload; + // We return a promise immediately that is either resolved/rejected + // as writes happen. + let readResolve: (result: IteratorResult) => void; + const readPromise = new Promise>( + (resolve) => (readResolve = resolve) + ); + readQueue.push(readResolve!); + return readPromise; }, - throw: async (e: Error) => { - error = e; - // The reader of this iterator has failed with the given error. So anything left in the queue should be - // drained and rejected with the given error - for (const item of queue) { - if (item.reject) { - item.reject(e); - } - } - queue = []; - return new Promise>((resolve) => { - resolve({ value: undefined, done: true }); - }); + throw(throwErr: unknown) { + err = throwErr; + closed = true; + nextReject(err); + drain(); + return Promise.resolve({ done: true, value: undefined }); }, }; }, From 6eb4fe85ca3103737e8e576cde0aeca9cc945976 Mon Sep 17 00:00:00 2001 From: Sri Krishna Paritala Date: Thu, 20 Jul 2023 02:57:34 -0400 Subject: [PATCH 3/9] Add comments --- .../src/protocol/async-iterable.spec.ts | 2 +- .../connect/src/protocol/async-iterable.ts | 24 +++++++++++++++++++ 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/packages/connect/src/protocol/async-iterable.spec.ts b/packages/connect/src/protocol/async-iterable.spec.ts index 419c0e0be..d52d64bd2 100644 --- a/packages/connect/src/protocol/async-iterable.spec.ts +++ b/packages/connect/src/protocol/async-iterable.spec.ts @@ -1133,7 +1133,7 @@ describe("createWritableIterable()", function () { expect(readCount).toEqual(writCount); expect(wIterable.isClosed()).toBe(true); }); - xit("queues reads", async () => { + it("queues reads", async () => { const wIterable = createWritableIterable(); const writCount = 50; const read = (async () => { diff --git a/packages/connect/src/protocol/async-iterable.ts b/packages/connect/src/protocol/async-iterable.ts index 07296ea99..053a44c57 100644 --- a/packages/connect/src/protocol/async-iterable.ts +++ b/packages/connect/src/protocol/async-iterable.ts @@ -1298,6 +1298,10 @@ export interface WritableIterable extends AsyncIterable { * Create a new WritableIterable. */ export function createWritableIterable(): WritableIterable { + // We start with two queues to capture the read and write attempts. + // + // The writes and reads each check of their counterpart is + // already available and either interact/add themselves to the queue. const readQueue: ((result: IteratorResult) => void)[] = []; const writeQueue: T[] = []; let err: unknown = undefined; @@ -1308,6 +1312,8 @@ export function createWritableIterable(): WritableIterable { nextReject = reject; }); let closed = false; + // drain the readQueue in case of error/writer is closed by sending a + // done result. function drain() { for (let next of readQueue.splice(0, readQueue.length)) { next({ done: true, value: undefined }); @@ -1327,15 +1333,32 @@ export function createWritableIterable(): WritableIterable { } const read = readQueue.shift(); if (read === undefined) { + // We didn't find a pending read so we add the payload to the write queue. writeQueue.push(payload); } else { + // We found a pending read so we respond with the payload. read({ done: false, value: payload }); } + if (readQueue.length > 0) { + // If there are more in the read queue we can mark the write as complete. + // as the error reporting is not guaranteed to be sequential and therefore cannot + // to linked to a specific write. + return; + } + // Wait for the next read or throw to happen to mark the write as complete. + // + // This will be the most common case of consumers calling `next` sequentially + // and reporting any error using `throw`. await nextPromise; }, [Symbol.asyncIterator](): AsyncIterator { return { next() { + // Resolve the nextPromise to indicate + // pending writes that a read attempt has been made + // after their write. + // + // We also need to reset the promise for future writes. nextResolve(); nextPromise = new Promise((resolve, reject) => { nextResolve = resolve; @@ -1343,6 +1366,7 @@ export function createWritableIterable(): WritableIterable { }); const write = writeQueue.shift(); if (write !== undefined) { + // We found a pending write so response with the payload. return Promise.resolve({ done: false, value: write }); } if (closed) { From 8465df3716490255381bf9f528ff9aa5c79a8c59 Mon Sep 17 00:00:00 2001 From: Sri Krishna Paritala Date: Thu, 20 Jul 2023 03:27:20 -0400 Subject: [PATCH 4/9] Fix tests --- .../src/protocol/async-iterable-story.spec.ts | 25 ++++++++++------ .../connect/src/protocol/async-iterable.ts | 29 ++++++++++++------- 2 files changed, 34 insertions(+), 20 deletions(-) diff --git a/packages/connect/src/protocol/async-iterable-story.spec.ts b/packages/connect/src/protocol/async-iterable-story.spec.ts index c6e93b510..4006a80f7 100644 --- a/packages/connect/src/protocol/async-iterable-story.spec.ts +++ b/packages/connect/src/protocol/async-iterable-story.spec.ts @@ -209,7 +209,9 @@ describe("full story", function () { writer .write({ value: "alpha", end: false }) .catch((e) => - expect(e).toEqual(new ConnectError("cannot write, already closed")) + expect(e).toEqual( + new Error("cannot write, WritableIterable already closed") + ) ); }); it("should correctly behave when consumer fails and throw is invoked", async function () { @@ -230,11 +232,11 @@ describe("full story", function () { )[] = []; try { // Iterate over the reader and purposely fail after the first read. - for (;;) { - const result = await readerIt[Symbol.asyncIterator]().next(); - resp.push(result.value as { end: false; value: string }); - throw "READER_ERROR"; - } + const itr = readerIt[Symbol.asyncIterator](); + const result = await itr.next(); + resp.push(result.value as { end: false; value: string }); + await itr.next(); + throw "READER_ERROR"; } catch (e) { // Verify we got the first send only and then verify we caught the expected error. expect(resp).toEqual([{ value: "alpha", end: false }]); @@ -265,11 +267,16 @@ describe("full story", function () { .then(() => fail("send was unexpectedly resolved.")) .catch((e) => expect(e).toBe("READER_ERROR")); - // The reader's internal writer is closed so any future reads will be rejected. + // The reader's internal writer is closed so any future reads should result in the + // done. readerIt[Symbol.asyncIterator]() .next() - .then(() => fail("reads were unexpectedly resolved")) - .catch((e) => expect(e).toBe("READER_ERROR")); + .then((result) => + expect(result).toEqual({ done: true, value: undefined }) + ) + .catch(() => + fail("expected successful done result but unexpectedly rejected") + ); }); }); diff --git a/packages/connect/src/protocol/async-iterable.ts b/packages/connect/src/protocol/async-iterable.ts index 053a44c57..0ca409c5e 100644 --- a/packages/connect/src/protocol/async-iterable.ts +++ b/packages/connect/src/protocol/async-iterable.ts @@ -1329,7 +1329,7 @@ export function createWritableIterable(): WritableIterable { }, async write(payload: T) { if (closed) { - throw err ?? new Error("write failed: WritableIterable is closed"); + throw err ?? new Error("cannot write, WritableIterable already closed"); } const read = readQueue.shift(); if (read === undefined) { @@ -1338,18 +1338,22 @@ export function createWritableIterable(): WritableIterable { } else { // We found a pending read so we respond with the payload. read({ done: false, value: payload }); + if (readQueue.length > 0) { + // If there are more in the read queue we can mark the write as complete. + // as the error reporting is not guaranteed to be sequential and therefore cannot + // to linked to a specific write. + return; + } } - if (readQueue.length > 0) { - // If there are more in the read queue we can mark the write as complete. - // as the error reporting is not guaranteed to be sequential and therefore cannot - // to linked to a specific write. - return; - } - // Wait for the next read or throw to happen to mark the write as complete. + // We await the next call for as many times as there are items in the queue + 1 // - // This will be the most common case of consumers calling `next` sequentially - // and reporting any error using `throw`. - await nextPromise; + // If there are no items in the write queue that means write happened and we just have + // to wait for one more call likewise if we are the nth write in the queue we + // have to wait for n writes to complete and one more. + const limit = writeQueue.length + 1; + for (let i = 0; i < limit; i++) { + await nextPromise; + } }, [Symbol.asyncIterator](): AsyncIterator { return { @@ -1384,6 +1388,9 @@ export function createWritableIterable(): WritableIterable { throw(throwErr: unknown) { err = throwErr; closed = true; + // Empty the write queue only in the case of an error. + writeQueue.splice(0, writeQueue.length); + // This will reject all pending writes. nextReject(err); drain(); return Promise.resolve({ done: true, value: undefined }); From b21deacff407c6a48bb47d837120dc668b3f0401 Mon Sep 17 00:00:00 2001 From: Sri Krishna Paritala Date: Thu, 20 Jul 2023 03:49:21 -0400 Subject: [PATCH 5/9] Minimize interface --- .../src/protocol/async-iterable-story.spec.ts | 15 ++++------- .../src/protocol/async-iterable.spec.ts | 24 ++++++++--------- .../connect/src/protocol/async-iterable.ts | 26 ++++++++++++------- 3 files changed, 32 insertions(+), 33 deletions(-) diff --git a/packages/connect/src/protocol/async-iterable-story.spec.ts b/packages/connect/src/protocol/async-iterable-story.spec.ts index 4006a80f7..ed458fb6f 100644 --- a/packages/connect/src/protocol/async-iterable-story.spec.ts +++ b/packages/connect/src/protocol/async-iterable-story.spec.ts @@ -160,7 +160,7 @@ describe("full story", function () { .then(() => writer.write({ value: "gamma", end: false })) .then(() => writer.write({ value: "delta", end: false })) .finally(() => { - void writer.close(); + writer.close(); }); const resp = await readAll(readerIt); @@ -194,17 +194,12 @@ describe("full story", function () { { value: "delta", end: false }, ]); }); - it("should throw if close is called on a writer that is already closed", function () { - void writer.close(); - - writer - .close() - .catch((e) => - expect(e).toEqual(new ConnectError("cannot close, already closed")) - ); + it("should allow close on a writer that is already closed", function () { + writer.close(); + writer.close(); }); it("should throw if write is called on a writer that is closed", function () { - void writer.close(); + writer.close(); writer .write({ value: "alpha", end: false }) diff --git a/packages/connect/src/protocol/async-iterable.spec.ts b/packages/connect/src/protocol/async-iterable.spec.ts index d52d64bd2..44614cff2 100644 --- a/packages/connect/src/protocol/async-iterable.spec.ts +++ b/packages/connect/src/protocol/async-iterable.spec.ts @@ -1087,14 +1087,13 @@ describe("createWritableIterable()", function () { for (let i = 0; i < writCount; i++) { await wIterable.write(i); } - await wIterable.close(); + wIterable.close(); await read; expect(readCount).toEqual(writCount); - expect(wIterable.isClosed()).toBe(true); }); it("write is interrupted when read fails", async () => { const wIterable = createWritableIterable(); - (async () => { + const read = (async () => { const itr = wIterable[Symbol.asyncIterator](); const next = await itr.next(); if (next.done === true) { @@ -1113,13 +1112,14 @@ describe("createWritableIterable()", function () { })(); await expectAsync(wIterable.write(1)).toBeRejected(); await expectAsync(wIterable.write(2)).toBeRejected(); - expect(wIterable.isClosed()).toBe(true); + await read; }); it("queues writes", async () => { const wIterable = createWritableIterable(); const writCount = 50; + const writes = []; for (let i = 0; i < writCount; i++) { - wIterable.write(i); + writes.push(wIterable.write(i)); } let readCount = 0; const read = (async () => { @@ -1128,17 +1128,17 @@ describe("createWritableIterable()", function () { readCount++; } })(); - await wIterable.close(); + wIterable.close(); await read; expect(readCount).toEqual(writCount); - expect(wIterable.isClosed()).toBe(true); + await Promise.all(writes); }); it("queues reads", async () => { const wIterable = createWritableIterable(); const writCount = 50; const read = (async () => { const itr = wIterable[Symbol.asyncIterator](); - const readPromises: Promise>[] = []; + const readPromises: Promise>[] = []; for (let i = 0; i < writCount; i++) { readPromises.push(itr.next()); } @@ -1153,16 +1153,15 @@ describe("createWritableIterable()", function () { for (let i = 0; i < writCount; i++) { await wIterable.write(i); } - await wIterable.close(); + wIterable.close(); await read; - expect(wIterable.isClosed()).toBe(true); }); it("queues reads and writes", async () => { const wIterable = createWritableIterable(); const writCount = 50; const read = (async () => { const itr = wIterable[Symbol.asyncIterator](); - const readPromises: Promise>[] = []; + const readPromises: Promise>[] = []; for (let i = 0; i < writCount; i++) { readPromises.push(itr.next()); } @@ -1178,9 +1177,8 @@ describe("createWritableIterable()", function () { for (let i = 0; i < writCount; i++) { writes.push(wIterable.write(i)); } - await wIterable.close(); + wIterable.close(); await Promise.all(writes); await read; - expect(wIterable.isClosed()).toBe(true); }); }); diff --git a/packages/connect/src/protocol/async-iterable.ts b/packages/connect/src/protocol/async-iterable.ts index 0ca409c5e..dd2332a67 100644 --- a/packages/connect/src/protocol/async-iterable.ts +++ b/packages/connect/src/protocol/async-iterable.ts @@ -1285,13 +1285,22 @@ export function makeIterableAbortable( /** * WritableIterable is an AsyncIterable that can be used - * to supply values imperatively to the reader of the + * to supply values imperatively to the consumer of the * AsyncIterable. */ export interface WritableIterable extends AsyncIterable { + /** + * Makes the payload available to the consumer of the + * iterable. + */ write: (payload: T) => Promise; - close: () => Promise; - isClosed: () => boolean; + /** + * Closes the writer indicating to its consumer that no further + * payloads will be received. + * + * Any writes that happen after close is called will return an error. + */ + close: () => void; } /** @@ -1302,7 +1311,7 @@ export function createWritableIterable(): WritableIterable { // // The writes and reads each check of their counterpart is // already available and either interact/add themselves to the queue. - const readQueue: ((result: IteratorResult) => void)[] = []; + const readQueue: ((result: IteratorResult) => void)[] = []; const writeQueue: T[] = []; let err: unknown = undefined; let nextResolve: () => void; @@ -1315,18 +1324,15 @@ export function createWritableIterable(): WritableIterable { // drain the readQueue in case of error/writer is closed by sending a // done result. function drain() { - for (let next of readQueue.splice(0, readQueue.length)) { + for (const next of readQueue.splice(0, readQueue.length)) { next({ done: true, value: undefined }); } } return { - async close() { + close() { closed = true; drain(); }, - isClosed() { - return closed; - }, async write(payload: T) { if (closed) { throw err ?? new Error("cannot write, WritableIterable already closed"); @@ -1382,7 +1388,7 @@ export function createWritableIterable(): WritableIterable { const readPromise = new Promise>( (resolve) => (readResolve = resolve) ); - readQueue.push(readResolve!); + readQueue.push(readResolve!); // eslint-disable-line @typescript-eslint/no-non-null-assertion return readPromise; }, throw(throwErr: unknown) { From 7e35bec5097d704427d74836a1a94e2cbeab81e3 Mon Sep 17 00:00:00 2001 From: Sri Krishna Paritala Date: Thu, 20 Jul 2023 03:56:46 -0400 Subject: [PATCH 6/9] size --- packages/connect-web-bench/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/connect-web-bench/README.md b/packages/connect-web-bench/README.md index 8d57fceb3..1a0849f09 100644 --- a/packages/connect-web-bench/README.md +++ b/packages/connect-web-bench/README.md @@ -10,5 +10,5 @@ it like a web server would usually do. | code generator | bundle size | minified | compressed | |----------------|-------------------:|-----------------------:|---------------------:| -| connect | 113,771 b | 49,913 b | 13,375 b | +| connect | 113,771 b | 49,913 b | 13,377 b | | grpc-web | 414,906 b | 301,127 b | 53,279 b | From 63244ff0c7c6a8412def45b7714e08cd986f3bce Mon Sep 17 00:00:00 2001 From: Sri Krishna Paritala Date: Wed, 26 Jul 2023 14:42:07 +0530 Subject: [PATCH 7/9] Implement `return` --- packages/connect/src/protocol/async-iterable.ts | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/packages/connect/src/protocol/async-iterable.ts b/packages/connect/src/protocol/async-iterable.ts index dd2332a67..5e8ec389e 100644 --- a/packages/connect/src/protocol/async-iterable.ts +++ b/packages/connect/src/protocol/async-iterable.ts @@ -1394,13 +1394,19 @@ export function createWritableIterable(): WritableIterable { throw(throwErr: unknown) { err = throwErr; closed = true; - // Empty the write queue only in the case of an error. writeQueue.splice(0, writeQueue.length); // This will reject all pending writes. nextReject(err); drain(); return Promise.resolve({ done: true, value: undefined }); }, + return() { + closed = true; + writeQueue.splice(0, writeQueue.length); + nextReject(new Error("cannot write, consumer called return")); + drain(); + return Promise.resolve({ done: true, value: undefined }); + }, }; }, }; From 92437dc72aef43b1cc3429624ac2246d22fa05d1 Mon Sep 17 00:00:00 2001 From: Sri Krishna Paritala Date: Mon, 31 Jul 2023 06:37:37 +0530 Subject: [PATCH 8/9] fix and add test for return --- .../src/protocol/async-iterable.spec.ts | 23 +++++++++++++++++++ .../connect/src/protocol/async-iterable.ts | 8 ++++++- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/packages/connect/src/protocol/async-iterable.spec.ts b/packages/connect/src/protocol/async-iterable.spec.ts index 44614cff2..853b554b4 100644 --- a/packages/connect/src/protocol/async-iterable.spec.ts +++ b/packages/connect/src/protocol/async-iterable.spec.ts @@ -1181,4 +1181,27 @@ describe("createWritableIterable()", function () { await Promise.all(writes); await read; }); + it("honors return of the iterator", async () => { + const wIterable = createWritableIterable(); + const read = (async () => { + const itr = wIterable[Symbol.asyncIterator](); + const next = await itr.next(); + if (next.done === true) { + fail("expected at least one value"); + } else { + expect(await itr.return?.()).toEqual({ + done: true, + value: undefined, + }); + } + // All further calls to next should also result in done results. + expect(await itr.next()).toEqual({ + done: true, + value: undefined, + }); + })(); + await expectAsync(wIterable.write(1)).toBeResolved(); + await expectAsync(wIterable.write(2)).toBeRejected(); + await read; + }); }); diff --git a/packages/connect/src/protocol/async-iterable.ts b/packages/connect/src/protocol/async-iterable.ts index 5e8ec389e..380535d46 100644 --- a/packages/connect/src/protocol/async-iterable.ts +++ b/packages/connect/src/protocol/async-iterable.ts @@ -1403,7 +1403,13 @@ export function createWritableIterable(): WritableIterable { return() { closed = true; writeQueue.splice(0, writeQueue.length); - nextReject(new Error("cannot write, consumer called return")); + nextResolve(); // Resolve once for the write awaiting confirmation. + // Reject all future writes. + nextPromise = Promise.reject( + new Error("cannot write, consumer called return") + ).catch(() => { + // This is needed because there could be no more writes. + }); drain(); return Promise.resolve({ done: true, value: undefined }); }, From dffe5a6773b87a800ec653425463089f37c09074 Mon Sep 17 00:00:00 2001 From: Sri Krishna Paritala Date: Mon, 31 Jul 2023 11:31:44 +0530 Subject: [PATCH 9/9] Add tests to cover promise rejection scenarios --- .../src/protocol/async-iterable.spec.ts | 49 ++++++++++++++++++- .../connect/src/protocol/async-iterable.ts | 11 +++-- 2 files changed, 56 insertions(+), 4 deletions(-) diff --git a/packages/connect/src/protocol/async-iterable.spec.ts b/packages/connect/src/protocol/async-iterable.spec.ts index 853b554b4..d9a097fc0 100644 --- a/packages/connect/src/protocol/async-iterable.spec.ts +++ b/packages/connect/src/protocol/async-iterable.spec.ts @@ -1181,7 +1181,53 @@ describe("createWritableIterable()", function () { await Promise.all(writes); await read; }); - it("honors return of the iterator", async () => { + it("queued writes are rejected when reader throws", async () => { + const wIterable = createWritableIterable(); + const writes = []; + for (let i = 0; i < 50; i++) { + writes.push(wIterable.write(i)); + } + wIterable.close(); + const readError = new Error("read failed"); + const read = (async () => { + const it = wIterable[Symbol.asyncIterator](); + await it.throw?.(readError); + })(); + await read; + expect(await Promise.allSettled(writes)).toEqual( + new Array(50).fill({ status: "rejected", reason: readError }) + ); + }); + it("queued writes are rejected when reader calls return", async () => { + const wIterable = createWritableIterable(); + const writes = []; + for (let i = 0; i < 50; i++) { + writes.push(wIterable.write(i)); + } + wIterable.close(); + const read = (async () => { + const it = wIterable[Symbol.asyncIterator](); + await it.return?.(); + })(); + await read; + expect(await Promise.allSettled(writes)).toEqual( + new Array(50).fill({ + status: "rejected", + reason: new Error("cannot write, consumer called return"), + }) + ); + }); + it("throw before first write stops writes", async () => { + const wIterable = createWritableIterable(); + const readError = new Error("read failed"); + const read = (async () => { + const it = wIterable[Symbol.asyncIterator](); + await it.throw?.(readError); + })(); + await expectAsync(wIterable.write(1)).toBeRejectedWith(readError); + await read; + }); + it("resolves already written value and rejects future writes on return", async () => { const wIterable = createWritableIterable(); const read = (async () => { const itr = wIterable[Symbol.asyncIterator](); @@ -1189,6 +1235,7 @@ describe("createWritableIterable()", function () { if (next.done === true) { fail("expected at least one value"); } else { + expect(next.value).toEqual(1); expect(await itr.return?.()).toEqual({ done: true, value: undefined, diff --git a/packages/connect/src/protocol/async-iterable.ts b/packages/connect/src/protocol/async-iterable.ts index 380535d46..356680180 100644 --- a/packages/connect/src/protocol/async-iterable.ts +++ b/packages/connect/src/protocol/async-iterable.ts @@ -1395,6 +1395,9 @@ export function createWritableIterable(): WritableIterable { err = throwErr; closed = true; writeQueue.splice(0, writeQueue.length); + nextPromise.catch(() => { + // To make sure that the nextPromise is always resolved. + }); // This will reject all pending writes. nextReject(err); drain(); @@ -1403,12 +1406,14 @@ export function createWritableIterable(): WritableIterable { return() { closed = true; writeQueue.splice(0, writeQueue.length); - nextResolve(); // Resolve once for the write awaiting confirmation. + // Resolve once for the write awaiting confirmation. + nextResolve(); // Reject all future writes. nextPromise = Promise.reject( new Error("cannot write, consumer called return") - ).catch(() => { - // This is needed because there could be no more writes. + ); + nextPromise.catch(() => { + // To make sure that the nextPromise is always resolved. }); drain(); return Promise.resolve({ done: true, value: undefined });