diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index 800392dabc6a..cb857244de30 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -84,26 +84,25 @@ export function earlyZipReadableStreams( ...streams: ReadableStream[] ): ReadableStream { - const readers = streams.map((s) => s.getReader()); + const readers = streams.map((stream) => stream.getReader()); return new ReadableStream({ - async start(controller) { - try { - loop: - while (true) { - for (const reader of readers) { - const { value, done } = await reader.read(); - if (!done) { - controller.enqueue(value!); - } else { - await Promise.all(readers.map((reader) => reader.cancel())); - break loop; - } - } + async pull(controller) { + for (let i = 0; i < readers.length; ++i) { + const { done, value } = await readers[i]!.read(); + if (done) { + await Promise.all( + readers.map((reader) => + reader.cancel(`Stream at index ${i} ended`) + ), + ); + controller.close(); + return; } - controller.close(); - } catch (e) { - controller.error(e); + controller.enqueue(value); } }, + async cancel(reason) { + await Promise.all(readers.map((reader) => reader.cancel(reason))); + }, }); } diff --git a/streams/early_zip_readable_streams_test.ts b/streams/early_zip_readable_streams_test.ts index 315a176007e1..0efdf55a1661 100644 --- a/streams/early_zip_readable_streams_test.ts +++ b/streams/early_zip_readable_streams_test.ts @@ -61,6 +61,27 @@ Deno.test("earlyZipReadableStreams() can zip three streams", async () => { ]); }); +Deno.test("earlyZipReadableStreams() forwards cancel()", async () => { + const num = 10; + let cancelled = 0; + const streams = new Array(num).fill(false).map(() => + new ReadableStream( + { + pull(controller) { + controller.enqueue("chunk"); + }, + cancel(reason) { + cancelled++; + assertEquals(reason, "I was cancelled!"); + }, + }, + ) + ); + + await earlyZipReadableStreams(...streams).cancel("I was cancelled!"); + assertEquals(cancelled, num); +}); + Deno.test("earlyZipReadableStreams() controller error", async () => { const errorMsg = "Test error"; const stream = new ReadableStream({