From 1ac5efa9ee79c99d2b24531e7648ea568df230c4 Mon Sep 17 00:00:00 2001 From: BlackAsLight <44320105+BlackAsLight@users.noreply.github.com> Date: Wed, 19 Jun 2024 16:34:21 +1000 Subject: [PATCH 1/8] refactor(streams): `earlyZipReadableStreams` --- streams/early_zip_readable_streams.ts | 30 ++++++++++++--------------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index 800392dabc6a..3d58e5de2384 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -84,26 +84,22 @@ export function earlyZipReadableStreams( ...streams: ReadableStream[] ): ReadableStream { - const readers = streams.map((s) => s.getReader()); + const readers = streams.map((stream) => stream.getReader()); return new ReadableStream({ - async start(controller) { - try { - loop: - while (true) { - for (const reader of readers) { - const { value, done } = await reader.read(); - if (!done) { - controller.enqueue(value!); - } else { - await Promise.all(readers.map((reader) => reader.cancel())); - break loop; - } - } + async pull(controller) { + for (const reader of readers) { + const { done, value } = await reader.read(); + if (done) { + await Promise.all( + readers.map((reader) => reader.cancel("early_zip_ended")), // A better cancel message should maybe be put here? + ); + return controller.close(); } - controller.close(); - } catch (e) { - controller.error(e); + controller.enqueue(value); } }, + async cancel(reason) { + await Promise.all(readers.map((reader) => reader.cancel(reason))); + }, }); } From 8fb9eb0f4d9abc6962e27ea677aaa7fc07045fd3 Mon Sep 17 00:00:00 2001 From: BlackAsLight <44320105+BlackAsLight@users.noreply.github.com> Date: Wed, 19 Jun 2024 16:50:26 +1000 Subject: [PATCH 2/8] add(streams): test for cancelling stream --- streams/early_zip_readable_streams_test.ts | 123 ++++++++++++--------- 1 file changed, 69 insertions(+), 54 deletions(-) diff --git a/streams/early_zip_readable_streams_test.ts b/streams/early_zip_readable_streams_test.ts index b6d4529ffe9e..5eff0d1aa0e6 100644 --- a/streams/early_zip_readable_streams_test.ts +++ b/streams/early_zip_readable_streams_test.ts @@ -1,62 +1,77 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. -import { earlyZipReadableStreams } from "./early_zip_readable_streams.ts"; -import { assertEquals } from "@std/assert"; +import { earlyZipReadableStreams } from "./early_zip_readable_streams.ts" +import { assertEquals } from "@std/assert" Deno.test("earlyZipReadableStreams() handles short first", async () => { - const textStream = ReadableStream.from(["1", "2", "3"]); - const textStream2 = ReadableStream.from(["a", "b", "c", "d", "e"]); - - const buf = await Array.fromAsync( - earlyZipReadableStreams(textStream, textStream2), - ); - - assertEquals(buf, [ - "1", - "a", - "2", - "b", - "3", - "c", - ]); -}); + const textStream = ReadableStream.from(["1", "2", "3"]) + const textStream2 = ReadableStream.from(["a", "b", "c", "d", "e"]) + + const buf = await Array.fromAsync( + earlyZipReadableStreams(textStream, textStream2), + ) + + assertEquals(buf, [ + "1", + "a", + "2", + "b", + "3", + "c", + ]) +}) Deno.test("earlyZipReadableStreams() handles long first", async () => { - const textStream = ReadableStream.from(["a", "b", "c", "d", "e"]); - const textStream2 = ReadableStream.from(["1", "2", "3"]); - - const buf = await Array.fromAsync( - earlyZipReadableStreams(textStream, textStream2), - ); - - assertEquals(buf, [ - "a", - "1", - "b", - "2", - "c", - "3", - "d", - ]); -}); + const textStream = ReadableStream.from(["a", "b", "c", "d", "e"]) + const textStream2 = ReadableStream.from(["1", "2", "3"]) + + const buf = await Array.fromAsync( + earlyZipReadableStreams(textStream, textStream2), + ) + + assertEquals(buf, [ + "a", + "1", + "b", + "2", + "c", + "3", + "d", + ]) +}) Deno.test("earlyZipReadableStreams() can zip three streams", async () => { - const textStream = ReadableStream.from(["a", "b", "c", "d", "e"]); - const textStream2 = ReadableStream.from(["1", "2", "3"]); - const textStream3 = ReadableStream.from(["x", "y"]); - - const buf = await Array.fromAsync( - earlyZipReadableStreams(textStream, textStream2, textStream3), - ); - - assertEquals(buf, [ - "a", - "1", - "x", - "b", - "2", - "y", - "c", - "3", - ]); -}); + const textStream = ReadableStream.from(["a", "b", "c", "d", "e"]) + const textStream2 = ReadableStream.from(["1", "2", "3"]) + const textStream3 = ReadableStream.from(["x", "y"]) + + const buf = await Array.fromAsync( + earlyZipReadableStreams(textStream, textStream2, textStream3), + ) + + assertEquals(buf, [ + "a", + "1", + "x", + "b", + "2", + "y", + "c", + "3", + ]) +}) + +Deno.test("earlyZipReadableStreams() forwards cancel()", async () => { + const streams = new Array(10).fill(false).map(() => new ReadableStream( + { + pull(controller) { + controller.enqueue('chunk') + }, + cancel(reason) { + assertEquals(reason, 'I was cancelled!') + } + } + )) + + await earlyZipReadableStreams(...streams).cancel('I was cancelled!') +}) From 807945de71170022c9f8585a9e913e3296042af8 Mon Sep 17 00:00:00 2001 From: BlackAsLight <44320105+BlackAsLight@users.noreply.github.com> Date: Wed, 19 Jun 2024 16:51:05 +1000 Subject: [PATCH 3/8] chore(streams): fmt --- streams/early_zip_readable_streams_test.ts | 122 +++++++++++---------- 1 file changed, 62 insertions(+), 60 deletions(-) diff --git a/streams/early_zip_readable_streams_test.ts b/streams/early_zip_readable_streams_test.ts index 5eff0d1aa0e6..a91ec94f4258 100644 --- a/streams/early_zip_readable_streams_test.ts +++ b/streams/early_zip_readable_streams_test.ts @@ -1,77 +1,79 @@ // Copyright 2018-2024 the Deno authors. All rights reserved. MIT license. -import { earlyZipReadableStreams } from "./early_zip_readable_streams.ts" -import { assertEquals } from "@std/assert" +import { earlyZipReadableStreams } from "./early_zip_readable_streams.ts"; +import { assertEquals } from "@std/assert"; Deno.test("earlyZipReadableStreams() handles short first", async () => { - const textStream = ReadableStream.from(["1", "2", "3"]) - const textStream2 = ReadableStream.from(["a", "b", "c", "d", "e"]) + const textStream = ReadableStream.from(["1", "2", "3"]); + const textStream2 = ReadableStream.from(["a", "b", "c", "d", "e"]); - const buf = await Array.fromAsync( - earlyZipReadableStreams(textStream, textStream2), - ) + const buf = await Array.fromAsync( + earlyZipReadableStreams(textStream, textStream2), + ); - assertEquals(buf, [ - "1", - "a", - "2", - "b", - "3", - "c", - ]) -}) + assertEquals(buf, [ + "1", + "a", + "2", + "b", + "3", + "c", + ]); +}); Deno.test("earlyZipReadableStreams() handles long first", async () => { - const textStream = ReadableStream.from(["a", "b", "c", "d", "e"]) - const textStream2 = ReadableStream.from(["1", "2", "3"]) + const textStream = ReadableStream.from(["a", "b", "c", "d", "e"]); + const textStream2 = ReadableStream.from(["1", "2", "3"]); - const buf = await Array.fromAsync( - earlyZipReadableStreams(textStream, textStream2), - ) + const buf = await Array.fromAsync( + earlyZipReadableStreams(textStream, textStream2), + ); - assertEquals(buf, [ - "a", - "1", - "b", - "2", - "c", - "3", - "d", - ]) -}) + assertEquals(buf, [ + "a", + "1", + "b", + "2", + "c", + "3", + "d", + ]); +}); Deno.test("earlyZipReadableStreams() can zip three streams", async () => { - const textStream = ReadableStream.from(["a", "b", "c", "d", "e"]) - const textStream2 = ReadableStream.from(["1", "2", "3"]) - const textStream3 = ReadableStream.from(["x", "y"]) + const textStream = ReadableStream.from(["a", "b", "c", "d", "e"]); + const textStream2 = ReadableStream.from(["1", "2", "3"]); + const textStream3 = ReadableStream.from(["x", "y"]); - const buf = await Array.fromAsync( - earlyZipReadableStreams(textStream, textStream2, textStream3), - ) + const buf = await Array.fromAsync( + earlyZipReadableStreams(textStream, textStream2, textStream3), + ); - assertEquals(buf, [ - "a", - "1", - "x", - "b", - "2", - "y", - "c", - "3", - ]) -}) + assertEquals(buf, [ + "a", + "1", + "x", + "b", + "2", + "y", + "c", + "3", + ]); +}); Deno.test("earlyZipReadableStreams() forwards cancel()", async () => { - const streams = new Array(10).fill(false).map(() => new ReadableStream( - { - pull(controller) { - controller.enqueue('chunk') - }, - cancel(reason) { - assertEquals(reason, 'I was cancelled!') - } - } - )) + const streams = new Array(10).fill(false).map(() => + new ReadableStream( + { + pull(controller) { + controller.enqueue("chunk"); + }, + cancel(reason) { + assertEquals(reason, "I was cancelled!"); + }, + }, + ) + ); - await earlyZipReadableStreams(...streams).cancel('I was cancelled!') -}) + await earlyZipReadableStreams(...streams).cancel("I was cancelled!"); +}); From 5c5f4fbce0ca89e453fba419fa4d5caf5bf81883 Mon Sep 17 00:00:00 2001 From: Doctor <44320105+BlackAsLight@users.noreply.github.com> Date: Thu, 20 Jun 2024 10:30:29 +1000 Subject: [PATCH 4/8] nit(streams): Make one line into two Co-authored-by: Asher Gomez --- streams/early_zip_readable_streams.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index 3d58e5de2384..333a8a1a7ff6 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -93,7 +93,8 @@ export function earlyZipReadableStreams( await Promise.all( readers.map((reader) => reader.cancel("early_zip_ended")), // A better cancel message should maybe be put here? ); - return controller.close(); + controller.close(); + return; } controller.enqueue(value); } From d30a6757f4f853267e625d1bf42921ed316512b1 Mon Sep 17 00:00:00 2001 From: BlackAsLight <44320105+BlackAsLight@users.noreply.github.com> Date: Thu, 20 Jun 2024 10:33:00 +1000 Subject: [PATCH 5/8] improve(streams): test to assert all streams were actually cancelled --- streams/early_zip_readable_streams_test.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/streams/early_zip_readable_streams_test.ts b/streams/early_zip_readable_streams_test.ts index a91ec94f4258..1a0dc936c146 100644 --- a/streams/early_zip_readable_streams_test.ts +++ b/streams/early_zip_readable_streams_test.ts @@ -62,13 +62,16 @@ Deno.test("earlyZipReadableStreams() can zip three streams", async () => { }); Deno.test("earlyZipReadableStreams() forwards cancel()", async () => { - const streams = new Array(10).fill(false).map(() => + const num = 10; + let cancelled = 0; + const streams = new Array(num).fill(false).map(() => new ReadableStream( { pull(controller) { controller.enqueue("chunk"); }, cancel(reason) { + cancelled++; assertEquals(reason, "I was cancelled!"); }, }, @@ -76,4 +79,5 @@ Deno.test("earlyZipReadableStreams() forwards cancel()", async () => { ); await earlyZipReadableStreams(...streams).cancel("I was cancelled!"); + assertEquals(cancelled, num); }); From 9977b1419fdca4d6a81173b3466fada636e072a1 Mon Sep 17 00:00:00 2001 From: BlackAsLight <44320105+BlackAsLight@users.noreply.github.com> Date: Thu, 20 Jun 2024 11:03:41 +1000 Subject: [PATCH 6/8] adjust(streams): reason for cancelling streams --- streams/early_zip_readable_streams.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index 333a8a1a7ff6..9e8ede21f466 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -87,11 +87,11 @@ export function earlyZipReadableStreams( const readers = streams.map((stream) => stream.getReader()); return new ReadableStream({ async pull(controller) { - for (const reader of readers) { - const { done, value } = await reader.read(); + for (let i = 0; i < readers.length; ++i) { + const { done, value } = await readers[i]!.read(); if (done) { await Promise.all( - readers.map((reader) => reader.cancel("early_zip_ended")), // A better cancel message should maybe be put here? + readers.map((reader) => reader.cancel(`Stream ended at ${i}`)), ); controller.close(); return; From 8e8e28048d71db9e8c2303bb7a79d49774dd891e Mon Sep 17 00:00:00 2001 From: Asher Gomez Date: Thu, 20 Jun 2024 15:32:50 +1000 Subject: [PATCH 7/8] tweak cancel reason --- streams/early_zip_readable_streams.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index 9e8ede21f466..7a2b0a451826 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -91,7 +91,7 @@ export function earlyZipReadableStreams( const { done, value } = await readers[i]!.read(); if (done) { await Promise.all( - readers.map((reader) => reader.cancel(`Stream ended at ${i}`)), + readers.map((reader) => reader.cancel(`Stream ${i} ended`)), ); controller.close(); return; From c59b2b6bbfa6effd82b46c029435e070f0f24af9 Mon Sep 17 00:00:00 2001 From: Asher Gomez Date: Thu, 20 Jun 2024 15:34:18 +1000 Subject: [PATCH 8/8] tweak --- streams/early_zip_readable_streams.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/streams/early_zip_readable_streams.ts b/streams/early_zip_readable_streams.ts index 7a2b0a451826..cb857244de30 100644 --- a/streams/early_zip_readable_streams.ts +++ b/streams/early_zip_readable_streams.ts @@ -91,7 +91,9 @@ export function earlyZipReadableStreams( const { done, value } = await readers[i]!.read(); if (done) { await Promise.all( - readers.map((reader) => reader.cancel(`Stream ${i} ended`)), + readers.map((reader) => + reader.cancel(`Stream at index ${i} ended`) + ), ); controller.close(); return;