Skip to content

Commit

Permalink
Update the WritableIterable behavior (#724)
Browse files Browse the repository at this point in the history
  • Loading branch information
srikrsna-buf authored Jul 31, 2023
1 parent 24375b7 commit 1fe3501
Show file tree
Hide file tree
Showing 4 changed files with 319 additions and 103 deletions.
2 changes: 1 addition & 1 deletion packages/connect-web-bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ it like a web server would usually do.

| code generator | bundle size | minified | compressed |
|----------------|-------------------:|-----------------------:|---------------------:|
| connect | 112,346 b | 49,541 b | 13,333 b |
| connect | 112,346 b | 49,541 b | 13,335 b |
| grpc-web | 414,906 b | 301,127 b | 53,279 b |
40 changes: 21 additions & 19 deletions packages/connect/src/protocol/async-iterable-story.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ describe("full story", function () {
.then(() => writer.write({ value: "gamma", end: false }))
.then(() => writer.write({ value: "delta", end: false }))
.finally(() => {
void writer.close();
writer.close();
});

const resp = await readAll(readerIt);
Expand Down Expand Up @@ -194,22 +194,19 @@ describe("full story", function () {
{ value: "delta", end: false },
]);
});
it("should throw if close is called on a writer that is already closed", function () {
void writer.close();

writer
.close()
.catch((e) =>
expect(e).toEqual(new ConnectError("cannot close, already closed"))
);
it("should allow close on a writer that is already closed", function () {
writer.close();
writer.close();
});
it("should throw if write is called on a writer that is closed", function () {
void writer.close();
writer.close();

writer
.write({ value: "alpha", end: false })
.catch((e) =>
expect(e).toEqual(new ConnectError("cannot write, already closed"))
expect(e).toEqual(
new Error("cannot write, WritableIterable already closed")
)
);
});
it("should correctly behave when consumer fails and throw is invoked", async function () {
Expand All @@ -230,11 +227,11 @@ describe("full story", function () {
)[] = [];
try {
// Iterate over the reader and purposely fail after the first read.
for (;;) {
const result = await readerIt[Symbol.asyncIterator]().next();
resp.push(result.value as { end: false; value: string });
throw "READER_ERROR";
}
const itr = readerIt[Symbol.asyncIterator]();
const result = await itr.next();
resp.push(result.value as { end: false; value: string });
await itr.next();
throw "READER_ERROR";
} catch (e) {
// Verify we got the first send only and then verify we caught the expected error.
expect(resp).toEqual([{ value: "alpha", end: false }]);
Expand Down Expand Up @@ -265,11 +262,16 @@ describe("full story", function () {
.then(() => fail("send was unexpectedly resolved."))
.catch((e) => expect(e).toBe("READER_ERROR"));

// The reader's internal writer is closed so any future reads will be rejected.
// The reader's internal writer is closed so any future reads should result in the
// done.
readerIt[Symbol.asyncIterator]()
.next()
.then(() => fail("reads were unexpectedly resolved"))
.catch((e) => expect(e).toBe("READER_ERROR"));
.then((result) =>
expect(result).toEqual({ done: true, value: undefined })
)
.catch(() =>
fail("expected successful done result but unexpectedly rejected")
);
});
});

Expand Down
181 changes: 181 additions & 0 deletions packages/connect/src/protocol/async-iterable.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

import {
createAsyncIterable,
createWritableIterable,
makeIterableAbortable,
pipe,
pipeTo,
Expand Down Expand Up @@ -1096,3 +1097,183 @@ describe("transforming asynchronous iterables", () => {
});
});
});

describe("createWritableIterable()", function () {
it("works like a regular iterable on the happy path", async () => {
const wIterable = createWritableIterable<number>();
let readCount = 0;
const read = (async () => {
for await (const next of wIterable) {
expect(next).toBe(readCount);
readCount++;
}
})();
const writCount = 5;
for (let i = 0; i < writCount; i++) {
await wIterable.write(i);
}
wIterable.close();
await read;
expect(readCount).toEqual(writCount);
});
it("write is interrupted when read fails", async () => {
const wIterable = createWritableIterable<number>();
const read = (async () => {
const itr = wIterable[Symbol.asyncIterator]();
const next = await itr.next();
if (next.done === true) {
fail("expected at least one value");
} else {
expect(await itr.throw?.(new Error("read failed"))).toEqual({
done: true,
value: undefined,
});
}
// All further calls to next should also result in done results.
expect(await itr.next()).toEqual({
done: true,
value: undefined,
});
})();
await expectAsync(wIterable.write(1)).toBeRejected();
await expectAsync(wIterable.write(2)).toBeRejected();
await read;
});
it("queues writes", async () => {
const wIterable = createWritableIterable<number>();
const writCount = 50;
const writes = [];
for (let i = 0; i < writCount; i++) {
writes.push(wIterable.write(i));
}
let readCount = 0;
const read = (async () => {
for await (const next of wIterable) {
expect(next).toBe(readCount);
readCount++;
}
})();
wIterable.close();
await read;
expect(readCount).toEqual(writCount);
await Promise.all(writes);
});
it("queues reads", async () => {
const wIterable = createWritableIterable<number>();
const writCount = 50;
const read = (async () => {
const itr = wIterable[Symbol.asyncIterator]();
const readPromises: Promise<IteratorResult<number, number>>[] = [];
for (let i = 0; i < writCount; i++) {
readPromises.push(itr.next());
}
const reads = await Promise.all(readPromises);
expect(reads.find((r) => r.done === true)).toBeUndefined();
expect(reads.map((r) => r.value)).toEqual([...Array(writCount).keys()]);
await expectAsync(itr.next()).toBeResolvedTo({
done: true,
value: undefined,
});
})();
for (let i = 0; i < writCount; i++) {
await wIterable.write(i);
}
wIterable.close();
await read;
});
it("queues reads and writes", async () => {
const wIterable = createWritableIterable<number>();
const writCount = 50;
const read = (async () => {
const itr = wIterable[Symbol.asyncIterator]();
const readPromises: Promise<IteratorResult<number, number>>[] = [];
for (let i = 0; i < writCount; i++) {
readPromises.push(itr.next());
}
const reads = await Promise.all(readPromises);
expect(reads.find((r) => r.done === true)).toBeUndefined();
expect(reads.map((r) => r.value)).toEqual([...Array(writCount).keys()]);
await expectAsync(itr.next()).toBeResolvedTo({
done: true,
value: undefined,
});
})();
const writes: Promise<void>[] = [];
for (let i = 0; i < writCount; i++) {
writes.push(wIterable.write(i));
}
wIterable.close();
await Promise.all(writes);
await read;
});
it("queued writes are rejected when reader throws", async () => {
const wIterable = createWritableIterable<number>();
const writes = [];
for (let i = 0; i < 50; i++) {
writes.push(wIterable.write(i));
}
wIterable.close();
const readError = new Error("read failed");
const read = (async () => {
const it = wIterable[Symbol.asyncIterator]();
await it.throw?.(readError);
})();
await read;
expect(await Promise.allSettled(writes)).toEqual(
new Array(50).fill({ status: "rejected", reason: readError })
);
});
it("queued writes are rejected when reader calls return", async () => {
const wIterable = createWritableIterable<number>();
const writes = [];
for (let i = 0; i < 50; i++) {
writes.push(wIterable.write(i));
}
wIterable.close();
const read = (async () => {
const it = wIterable[Symbol.asyncIterator]();
await it.return?.();
})();
await read;
expect(await Promise.allSettled(writes)).toEqual(
new Array(50).fill({
status: "rejected",
reason: new Error("cannot write, consumer called return"),
})
);
});
it("throw before first write stops writes", async () => {
const wIterable = createWritableIterable<number>();
const readError = new Error("read failed");
const read = (async () => {
const it = wIterable[Symbol.asyncIterator]();
await it.throw?.(readError);
})();
await expectAsync(wIterable.write(1)).toBeRejectedWith(readError);
await read;
});
it("resolves already written value and rejects future writes on return", async () => {
const wIterable = createWritableIterable<number>();
const read = (async () => {
const itr = wIterable[Symbol.asyncIterator]();
const next = await itr.next();
if (next.done === true) {
fail("expected at least one value");
} else {
expect(next.value).toEqual(1);
expect(await itr.return?.()).toEqual({
done: true,
value: undefined,
});
}
// All further calls to next should also result in done results.
expect(await itr.next()).toEqual({
done: true,
value: undefined,
});
})();
await expectAsync(wIterable.write(1)).toBeResolved();
await expectAsync(wIterable.write(2)).toBeRejected();
await read;
});
});
Loading

0 comments on commit 1fe3501

Please sign in to comment.