diff --git a/.changeset/empty-islands-pump.md b/.changeset/empty-islands-pump.md new file mode 100644 index 0000000000..e6ee5bab20 --- /dev/null +++ b/.changeset/empty-islands-pump.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +Add queuing strategy option for Stream.toReadableStream diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index ea1e2b8c12..7e84661c66 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -3860,7 +3860,17 @@ export const toQueueOfElements: { * @since 2.0.0 * @category destructors */ -export const toReadableStream: (self: Stream) => ReadableStream = internal.toReadableStream +export const toReadableStream: { + ( + options?: { readonly strategy?: QueuingStrategy | undefined } + ): ( + self: Stream + ) => ReadableStream + ( + self: Stream, + options?: { readonly strategy?: QueuingStrategy | undefined } + ): ReadableStream +} = internal.toReadableStream /** * Converts the stream to a `Effect`. @@ -3870,8 +3880,17 @@ export const toReadableStream: (self: Stream) => ReadableStream = * @since 2.0.0 * @category destructors */ -export const toReadableStreamEffect: (self: Stream) => Effect.Effect, never, R> = - internal.toReadableStreamEffect +export const toReadableStreamEffect: { + ( + options?: { readonly strategy?: QueuingStrategy | undefined } + ): ( + self: Stream + ) => Effect.Effect, never, R> + ( + self: Stream, + options?: { readonly strategy?: QueuingStrategy | undefined } + ): Effect.Effect, never, R> +} = internal.toReadableStreamEffect /** * Converts the stream to a `ReadableStream` using the provided runtime. @@ -3882,12 +3901,14 @@ export const toReadableStreamEffect: (self: Stream) => Effect. * @category destructors */ export const toReadableStreamRuntime: { - ( - runtime: Runtime - ): (self: Stream) => ReadableStream + ( + runtime: Runtime, + options?: { readonly strategy?: QueuingStrategy | undefined } + ): (self: Stream) => ReadableStream ( self: Stream, - runtime: Runtime + runtime: Runtime, + options?: { readonly strategy?: QueuingStrategy | undefined } ): ReadableStream } = internal.toReadableStreamRuntime diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 88d77497c6..45e29f1af4 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -6542,63 +6542,103 @@ export const toQueueOfElements = dual< )) /** @internal */ -export const toReadableStream = (self: Stream.Stream) => - toReadableStreamRuntime(self, Runtime.defaultRuntime) +export const toReadableStream = dual< + ( + options?: { readonly strategy?: QueuingStrategy | undefined } + ) => (self: Stream.Stream) => ReadableStream, + ( + self: Stream.Stream, + options?: { readonly strategy?: QueuingStrategy | undefined } + ) => ReadableStream +>( + (args) => isStream(args[0]), + ( + self: Stream.Stream, + options?: { readonly strategy?: QueuingStrategy | undefined } + ) => toReadableStreamRuntime(self, Runtime.defaultRuntime, options) +) /** @internal */ -export const toReadableStreamEffect = (self: Stream.Stream) => - Effect.map(Effect.runtime(), (runtime) => toReadableStreamRuntime(self, runtime)) +export const toReadableStreamEffect = dual< + ( + options?: { readonly strategy?: QueuingStrategy | undefined } + ) => (self: Stream.Stream) => Effect.Effect, never, R>, + ( + self: Stream.Stream, + options?: { readonly strategy?: QueuingStrategy | undefined } + ) => Effect.Effect, never, R> +>( + (args) => isStream(args[0]), + ( + self: Stream.Stream, + options?: { readonly strategy?: QueuingStrategy | undefined } + ) => Effect.map(Effect.runtime(), (runtime) => toReadableStreamRuntime(self, runtime, options)) +) /** @internal */ export const toReadableStreamRuntime = dual< - (runtime: Runtime.Runtime) => (self: Stream.Stream) => ReadableStream, - (self: Stream.Stream, runtime: Runtime.Runtime) => ReadableStream ->(2, (self: Stream.Stream, runtime: Runtime.Runtime): ReadableStream => { - const runSync = Runtime.runSync(runtime) - const runFork = Runtime.runFork(runtime) - - let pull: Effect.Effect - let scope: Scope.CloseableScope - return new ReadableStream({ - start(controller) { - scope = runSync(Scope.make()) - pull = pipe( - toPull(self), - Scope.extend(scope), - runSync, - Effect.tap((chunk) => - Effect.sync(() => { - Chunk.map(chunk, (a) => { - controller.enqueue(a) - }) - }) - ), - Effect.tapErrorCause(() => Scope.close(scope, Exit.void)), - Effect.catchTags({ - "None": () => - Effect.sync(() => { - controller.close() - }), - "Some": (error) => + ( + runtime: Runtime.Runtime, + options?: { readonly strategy?: QueuingStrategy | undefined } + ) => (self: Stream.Stream) => ReadableStream, + ( + self: Stream.Stream, + runtime: Runtime.Runtime, + options?: { readonly strategy?: QueuingStrategy | undefined } + ) => ReadableStream +>( + (args) => isStream(args[0]), + ( + self: Stream.Stream, + runtime: Runtime.Runtime, + options?: { readonly strategy?: QueuingStrategy | undefined } + ): ReadableStream => { + const runSync = Runtime.runSync(runtime) + const runFork = Runtime.runFork(runtime) + + let pull: Effect.Effect + let scope: Scope.CloseableScope + return new ReadableStream({ + start(controller) { + scope = runSync(Scope.make()) + pull = pipe( + toPull(self), + Scope.extend(scope), + runSync, + Effect.tap((chunk) => Effect.sync(() => { - controller.error(error.value) + Chunk.map(chunk, (a) => { + controller.enqueue(a) + }) }) - }), - Effect.asVoid - ) - }, - pull() { - return new Promise((resolve) => { - runFork(pull, { scope }).addObserver((_) => resolve()) - }) - }, - cancel() { - return new Promise((resolve) => { - runFork(Scope.close(scope, Exit.void)).addObserver((_) => resolve()) - }) - } - }) -}) + ), + Effect.tapErrorCause(() => Scope.close(scope, Exit.void)), + Effect.catchTags({ + "None": () => + Effect.sync(() => { + controller.close() + }), + "Some": (error) => + Effect.sync(() => { + controller.error(error.value) + }) + }), + Effect.asVoid + ) + }, + pull() { + return new Promise((resolve) => { + runFork(pull, { scope }).addObserver((_) => resolve()) + }) + }, + cancel() { + return new Promise((resolve) => { + runFork(Scope.close(scope, Exit.void)).addObserver((_) => resolve()) + }) + } + }, options?.strategy) + } +) /** @internal */ export const transduce = dual<