Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(streams): prevent earlyZipReadableStreams() from possibly using excessive memory #5082

Merged
30 changes: 13 additions & 17 deletions streams/early_zip_readable_streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,26 +84,22 @@
export function earlyZipReadableStreams<T>(
...streams: ReadableStream<T>[]
): ReadableStream<T> {
const readers = streams.map((s) => s.getReader());
const readers = streams.map((stream) => stream.getReader());
return new ReadableStream<T>({
async start(controller) {
try {
loop:
while (true) {
for (const reader of readers) {
const { value, done } = await reader.read();
if (!done) {
controller.enqueue(value!);
} else {
await Promise.all(readers.map((reader) => reader.cancel()));
break loop;
}
}
async pull(controller) {
for (const reader of readers) {
const { done, value } = await reader.read();
if (done) {
await Promise.all(
readers.map((reader) => reader.cancel("early_zip_ended")), // A better cancel message should maybe be put here?
BlackAsLight marked this conversation as resolved.
Show resolved Hide resolved
);
return controller.close();
BlackAsLight marked this conversation as resolved.
Show resolved Hide resolved
}
controller.close();
} catch (e) {
controller.error(e);
controller.enqueue(value);
}
},
async cancel(reason) {
await Promise.all(readers.map((reader) => reader.cancel(reason)));
},
});
}
17 changes: 17 additions & 0 deletions streams/early_zip_readable_streams_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,20 @@ Deno.test("earlyZipReadableStreams() can zip three streams", async () => {
"3",
]);
});

Deno.test("earlyZipReadableStreams() forwards cancel()", async () => {
iuioiua marked this conversation as resolved.
Show resolved Hide resolved
const streams = new Array(10).fill(false).map(() =>
new ReadableStream(
{
pull(controller) {
controller.enqueue("chunk");
},
cancel(reason) {
assertEquals(reason, "I was cancelled!");
},
},
)
);

await earlyZipReadableStreams(...streams).cancel("I was cancelled!");
});
Loading