Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(streams): prevent earlyZipReadableStreams() from possibly using excessive memory #5082

Merged
33 changes: 16 additions & 17 deletions streams/early_zip_readable_streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,26 +84,25 @@
export function earlyZipReadableStreams<T>(
...streams: ReadableStream<T>[]
): ReadableStream<T> {
const readers = streams.map((s) => s.getReader());
const readers = streams.map((stream) => stream.getReader());
return new ReadableStream<T>({
async start(controller) {
try {
loop:
while (true) {
for (const reader of readers) {
const { value, done } = await reader.read();
if (!done) {
controller.enqueue(value!);
} else {
await Promise.all(readers.map((reader) => reader.cancel()));
break loop;
}
}
async pull(controller) {
for (let i = 0; i < readers.length; ++i) {
const { done, value } = await readers[i]!.read();
if (done) {
await Promise.all(
readers.map((reader) =>
reader.cancel(`Stream at index ${i} ended`)
),
);
controller.close();
return;
}
controller.close();
} catch (e) {
controller.error(e);
controller.enqueue(value);
}
},
async cancel(reason) {
await Promise.all(readers.map((reader) => reader.cancel(reason)));
},
});
}
21 changes: 21 additions & 0 deletions streams/early_zip_readable_streams_test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,27 @@ Deno.test("earlyZipReadableStreams() can zip three streams", async () => {
]);
});

Deno.test("earlyZipReadableStreams() forwards cancel()", async () => {
iuioiua marked this conversation as resolved.
Show resolved Hide resolved
const num = 10;
let cancelled = 0;
const streams = new Array(num).fill(false).map(() =>
new ReadableStream(
{
pull(controller) {
controller.enqueue("chunk");
},
cancel(reason) {
cancelled++;
assertEquals(reason, "I was cancelled!");
},
},
)
);

await earlyZipReadableStreams(...streams).cancel("I was cancelled!");
assertEquals(cancelled, num);
});

Deno.test("earlyZipReadableStreams() controller error", async () => {
const errorMsg = "Test error";
const stream = new ReadableStream({
Expand Down
Loading