Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update the WritableIterable behavior #724

Merged
merged 12 commits into from
Jul 31, 2023
2 changes: 1 addition & 1 deletion packages/connect-web-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ it like a web server would usually do.

| code generator | bundle size | minified | compressed |
|----------------|-------------------:|-----------------------:|---------------------:|
| connect | 112,346 b | 49,541 b | 13,333 b |
| connect | 112,346 b | 49,541 b | 13,335 b |
| grpc-web | 414,906 b | 301,127 b | 53,279 b |
40 changes: 21 additions & 19 deletions packages/connect/src/protocol/async-iterable-story.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ describe("full story", function () {
.then(() => writer.write({ value: "gamma", end: false }))
.then(() => writer.write({ value: "delta", end: false }))
.finally(() => {
void writer.close();
writer.close();
});

const resp = await readAll(readerIt);
Expand Down Expand Up @@ -194,22 +194,19 @@ describe("full story", function () {
{ value: "delta", end: false },
]);
});
it("should throw if close is called on a writer that is already closed", function () {
void writer.close();

writer
.close()
.catch((e) =>
expect(e).toEqual(new ConnectError("cannot close, already closed"))
);
it("should allow close on a writer that is already closed", function () {
writer.close();
writer.close();
});
it("should throw if write is called on a writer that is closed", function () {
void writer.close();
writer.close();

writer
.write({ value: "alpha", end: false })
.catch((e) =>
expect(e).toEqual(new ConnectError("cannot write, already closed"))
expect(e).toEqual(
new Error("cannot write, WritableIterable already closed")
)
timostamm marked this conversation as resolved.
Show resolved Hide resolved
);
});
it("should correctly behave when consumer fails and throw is invoked", async function () {
Expand All @@ -230,11 +227,11 @@ describe("full story", function () {
)[] = [];
try {
// Iterate over the reader and purposely fail after the first read.
for (;;) {
const result = await readerIt[Symbol.asyncIterator]().next();
resp.push(result.value as { end: false; value: string });
throw "READER_ERROR";
}
const itr = readerIt[Symbol.asyncIterator]();
const result = await itr.next();
resp.push(result.value as { end: false; value: string });
await itr.next();
throw "READER_ERROR";
} catch (e) {
// Verify we got the first send only and then verify we caught the expected error.
expect(resp).toEqual([{ value: "alpha", end: false }]);
Expand Down Expand Up @@ -265,11 +262,16 @@ describe("full story", function () {
.then(() => fail("send was unexpectedly resolved."))
.catch((e) => expect(e).toBe("READER_ERROR"));

// The reader's internal writer is closed so any future reads will be rejected.
// The reader's internal writer is closed so any future reads should result in the
// done.
readerIt[Symbol.asyncIterator]()
.next()
.then(() => fail("reads were unexpectedly resolved"))
.catch((e) => expect(e).toBe("READER_ERROR"));
.then((result) =>
expect(result).toEqual({ done: true, value: undefined })
)
.catch(() =>
fail("expected successful done result but unexpectedly rejected")
);
Comment on lines +269 to +274
Copy link
Member Author

@srikrsna-buf srikrsna-buf Jul 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed the behavior for next to return {done: true} when the writer is closed either due to an error or with call toclose. This is what I think should be the behavior of Iterables outlined here, based on these two paragraphs:

If an iterator returns a result with done: true, any subsequent calls to next() are expected to return done: true as well, although this is not enforced on the language level.

throw(exception) Optional
A function that accepts zero or one argument and returns an object conforming to the IteratorResult interface, typically with done equal to true. Calling this method tells the iterator that the caller detects an error condition, and exception is typically an Error instance.

});
});

Expand Down
181 changes: 181 additions & 0 deletions packages/connect/src/protocol/async-iterable.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import {
createAsyncIterable,
createWritableIterable,
makeIterableAbortable,
pipe,
pipeTo,
Expand Down Expand Up @@ -1096,3 +1097,183 @@ describe("transforming asynchronous iterables", () => {
});
});
});

describe("createWritableIterable()", function () {
it("works like a regular iterable on the happy path", async () => {
const wIterable = createWritableIterable<number>();
let readCount = 0;
const read = (async () => {
for await (const next of wIterable) {
expect(next).toBe(readCount);
readCount++;
}
})();
const writCount = 5;
for (let i = 0; i < writCount; i++) {
await wIterable.write(i);
}
wIterable.close();
await read;
expect(readCount).toEqual(writCount);
});
it("write is interrupted when read fails", async () => {
const wIterable = createWritableIterable<number>();
const read = (async () => {
const itr = wIterable[Symbol.asyncIterator]();
const next = await itr.next();
if (next.done === true) {
fail("expected at least one value");
} else {
expect(await itr.throw?.(new Error("read failed"))).toEqual({
done: true,
value: undefined,
});
}
// All further calls to next should also result in done results.
expect(await itr.next()).toEqual({
done: true,
value: undefined,
});
})();
await expectAsync(wIterable.write(1)).toBeRejected();
await expectAsync(wIterable.write(2)).toBeRejected();
await read;
});
it("queues writes", async () => {
const wIterable = createWritableIterable<number>();
const writCount = 50;
const writes = [];
for (let i = 0; i < writCount; i++) {
writes.push(wIterable.write(i));
}
let readCount = 0;
const read = (async () => {
for await (const next of wIterable) {
expect(next).toBe(readCount);
readCount++;
}
})();
wIterable.close();
await read;
expect(readCount).toEqual(writCount);
await Promise.all(writes);
});
it("queues reads", async () => {
const wIterable = createWritableIterable<number>();
const writCount = 50;
const read = (async () => {
const itr = wIterable[Symbol.asyncIterator]();
const readPromises: Promise<IteratorResult<number, number>>[] = [];
for (let i = 0; i < writCount; i++) {
readPromises.push(itr.next());
}
const reads = await Promise.all(readPromises);
expect(reads.find((r) => r.done === true)).toBeUndefined();
expect(reads.map((r) => r.value)).toEqual([...Array(writCount).keys()]);
await expectAsync(itr.next()).toBeResolvedTo({
done: true,
value: undefined,
});
})();
for (let i = 0; i < writCount; i++) {
await wIterable.write(i);
}
wIterable.close();
await read;
});
it("queues reads and writes", async () => {
const wIterable = createWritableIterable<number>();
const writCount = 50;
const read = (async () => {
const itr = wIterable[Symbol.asyncIterator]();
const readPromises: Promise<IteratorResult<number, number>>[] = [];
for (let i = 0; i < writCount; i++) {
readPromises.push(itr.next());
}
const reads = await Promise.all(readPromises);
expect(reads.find((r) => r.done === true)).toBeUndefined();
expect(reads.map((r) => r.value)).toEqual([...Array(writCount).keys()]);
await expectAsync(itr.next()).toBeResolvedTo({
done: true,
value: undefined,
});
})();
const writes: Promise<void>[] = [];
for (let i = 0; i < writCount; i++) {
writes.push(wIterable.write(i));
}
wIterable.close();
await Promise.all(writes);
await read;
});
it("queued writes are rejected when reader throws", async () => {
const wIterable = createWritableIterable<number>();
const writes = [];
for (let i = 0; i < 50; i++) {
writes.push(wIterable.write(i));
}
wIterable.close();
const readError = new Error("read failed");
const read = (async () => {
const it = wIterable[Symbol.asyncIterator]();
await it.throw?.(readError);
})();
await read;
expect(await Promise.allSettled(writes)).toEqual(
new Array(50).fill({ status: "rejected", reason: readError })
);
});
it("queued writes are rejected when reader calls return", async () => {
const wIterable = createWritableIterable<number>();
const writes = [];
for (let i = 0; i < 50; i++) {
writes.push(wIterable.write(i));
}
wIterable.close();
const read = (async () => {
const it = wIterable[Symbol.asyncIterator]();
await it.return?.();
})();
await read;
expect(await Promise.allSettled(writes)).toEqual(
new Array(50).fill({
status: "rejected",
reason: new Error("cannot write, consumer called return"),
})
);
});
it("throw before first write stops writes", async () => {
const wIterable = createWritableIterable<number>();
const readError = new Error("read failed");
const read = (async () => {
const it = wIterable[Symbol.asyncIterator]();
await it.throw?.(readError);
})();
await expectAsync(wIterable.write(1)).toBeRejectedWith(readError);
await read;
});
it("resolves already written value and rejects future writes on return", async () => {
const wIterable = createWritableIterable<number>();
const read = (async () => {
const itr = wIterable[Symbol.asyncIterator]();
const next = await itr.next();
if (next.done === true) {
fail("expected at least one value");
} else {
expect(next.value).toEqual(1);
expect(await itr.return?.()).toEqual({
done: true,
value: undefined,
});
}
// All further calls to next should also result in done results.
expect(await itr.next()).toEqual({
done: true,
value: undefined,
});
})();
await expectAsync(wIterable.write(1)).toBeResolved();
await expectAsync(wIterable.write(2)).toBeRejected();
await read;
});
});
Loading