From 4788847d9cc29be3baed685e1a05a82e73d58ff1 Mon Sep 17 00:00:00 2001
From: Matt Rossman <22670878+mattrossman@users.noreply.github.com>
Date: Mon, 27 May 2024 09:27:39 -0400
Subject: [PATCH 1/3] Add queuing strategy option for Stream.toReadableStream
Resolves #2840
---
.changeset/empty-islands-pump.md | 5 +
packages/effect/src/Stream.ts | 33 ++++--
packages/effect/src/internal/stream.ts | 140 ++++++++++++++++---------
3 files changed, 122 insertions(+), 56 deletions(-)
create mode 100644 .changeset/empty-islands-pump.md
diff --git a/.changeset/empty-islands-pump.md b/.changeset/empty-islands-pump.md
new file mode 100644
index 0000000000..357bcb8896
--- /dev/null
+++ b/.changeset/empty-islands-pump.md
@@ -0,0 +1,5 @@
+---
+"effect": patch
+---
+
+Add queuing strategy option for Stream.toReadableStream
diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts
index ea1e2b8c12..3dd51082e2 100644
--- a/packages/effect/src/Stream.ts
+++ b/packages/effect/src/Stream.ts
@@ -3860,7 +3860,17 @@ export const toQueueOfElements: {
* @since 2.0.0
* @category destructors
*/
-export const toReadableStream: (self: Stream) => ReadableStream = internal.toReadableStream
+export const toReadableStream: {
+ (
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ): (
+ self: Stream
+ ) => ReadableStream
+ (
+ self: Stream,
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ): ReadableStream
+} = internal.toReadableStream
/**
* Converts the stream to a `Effect`.
@@ -3870,8 +3880,17 @@ export const toReadableStream: (self: Stream) => ReadableStream =
* @since 2.0.0
* @category destructors
*/
-export const toReadableStreamEffect: (self: Stream) => Effect.Effect, never, R> =
- internal.toReadableStreamEffect
+export const toReadableStreamEffect: {
+ (
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ): (
+ self: Stream
+ ) => Effect.Effect, never, R>
+ (
+ self: Stream,
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ): Effect.Effect, never, R>
+} = internal.toReadableStreamEffect
/**
* Converts the stream to a `ReadableStream` using the provided runtime.
@@ -3882,12 +3901,14 @@ export const toReadableStreamEffect: (self: Stream) => Effect.
* @category destructors
*/
export const toReadableStreamRuntime: {
- (
- runtime: Runtime
+ (
+ runtime: Runtime,
+ options?: { readonly strategy?: QueuingStrategy | undefined }
): (self: Stream) => ReadableStream
(
self: Stream,
- runtime: Runtime
+ runtime: Runtime,
+ options?: { readonly strategy?: QueuingStrategy | undefined }
): ReadableStream
} = internal.toReadableStreamRuntime
diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts
index 88d77497c6..2739a7f55f 100644
--- a/packages/effect/src/internal/stream.ts
+++ b/packages/effect/src/internal/stream.ts
@@ -6542,63 +6542,103 @@ export const toQueueOfElements = dual<
))
/** @internal */
-export const toReadableStream = (self: Stream.Stream) =>
- toReadableStreamRuntime(self, Runtime.defaultRuntime)
+export const toReadableStream = dual<
+ (
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ) => (self: Stream.Stream) => ReadableStream,
+ (
+ self: Stream.Stream,
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ) => ReadableStream
+>(
+ (args) => isStream(args[0]),
+ (
+ self: Stream.Stream,
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ) => toReadableStreamRuntime(self, Runtime.defaultRuntime, options)
+)
/** @internal */
-export const toReadableStreamEffect = (self: Stream.Stream) =>
- Effect.map(Effect.runtime(), (runtime) => toReadableStreamRuntime(self, runtime))
+export const toReadableStreamEffect = dual<
+ (
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ) => (self: Stream.Stream) => Effect.Effect, never, R>,
+ (
+ self: Stream.Stream,
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ) => Effect.Effect, never, R>
+>(
+ (args) => isStream(args[0]),
+ (
+ self: Stream.Stream,
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ) => Effect.map(Effect.runtime(), (runtime) => toReadableStreamRuntime(self, runtime, options))
+)
/** @internal */
export const toReadableStreamRuntime = dual<
- (runtime: Runtime.Runtime) => (self: Stream.Stream) => ReadableStream,
- (self: Stream.Stream, runtime: Runtime.Runtime) => ReadableStream
->(2, (self: Stream.Stream, runtime: Runtime.Runtime): ReadableStream => {
- const runSync = Runtime.runSync(runtime)
- const runFork = Runtime.runFork(runtime)
-
- let pull: Effect.Effect
- let scope: Scope.CloseableScope
- return new ReadableStream({
- start(controller) {
- scope = runSync(Scope.make())
- pull = pipe(
- toPull(self),
- Scope.extend(scope),
- runSync,
- Effect.tap((chunk) =>
- Effect.sync(() => {
- Chunk.map(chunk, (a) => {
- controller.enqueue(a)
- })
- })
- ),
- Effect.tapErrorCause(() => Scope.close(scope, Exit.void)),
- Effect.catchTags({
- "None": () =>
- Effect.sync(() => {
- controller.close()
- }),
- "Some": (error) =>
+ (
+ runtime: Runtime.Runtime,
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ) => (self: Stream.Stream) => ReadableStream,
+ (
+ self: Stream.Stream,
+ runtime: Runtime.Runtime,
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ) => ReadableStream
+>(
+ (args) => isStream(args[0]),
+ (
+ self: Stream.Stream,
+ runtime: Runtime.Runtime,
+ options?: { readonly strategy?: QueuingStrategy | undefined }
+ ): ReadableStream => {
+ const runSync = Runtime.runSync(runtime)
+ const runFork = Runtime.runFork(runtime)
+
+ let pull: Effect.Effect
+ let scope: Scope.CloseableScope
+ return new ReadableStream({
+ start(controller) {
+ scope = runSync(Scope.make())
+ pull = pipe(
+ toPull(self),
+ Scope.extend(scope),
+ runSync,
+ Effect.tap((chunk) =>
Effect.sync(() => {
- controller.error(error.value)
+ Chunk.map(chunk, (a) => {
+ controller.enqueue(a)
+ })
})
- }),
- Effect.asVoid
- )
- },
- pull() {
- return new Promise((resolve) => {
- runFork(pull, { scope }).addObserver((_) => resolve())
- })
- },
- cancel() {
- return new Promise((resolve) => {
- runFork(Scope.close(scope, Exit.void)).addObserver((_) => resolve())
- })
- }
- })
-})
+ ),
+ Effect.tapErrorCause(() => Scope.close(scope, Exit.void)),
+ Effect.catchTags({
+ "None": () =>
+ Effect.sync(() => {
+ controller.close()
+ }),
+ "Some": (error) =>
+ Effect.sync(() => {
+ controller.error(error.value)
+ })
+ }),
+ Effect.asVoid
+ )
+ },
+ pull() {
+ return new Promise((resolve) => {
+ runFork(pull, { scope }).addObserver((_) => resolve())
+ })
+ },
+ cancel() {
+ return new Promise((resolve) => {
+ runFork(Scope.close(scope, Exit.void)).addObserver((_) => resolve())
+ })
+ }
+ }, options?.strategy)
+ }
+)
/** @internal */
export const transduce = dual<
From ebfa2d07859e300ce9c51d32be91eaba49f2fafc Mon Sep 17 00:00:00 2001
From: Matt Rossman <22670878+mattrossman@users.noreply.github.com>
Date: Mon, 27 May 2024 10:35:03 -0400
Subject: [PATCH 2/3] Fix inference on QueuingStrategy type argument
---
packages/effect/src/Stream.ts | 6 +++---
packages/effect/src/internal/stream.ts | 6 +++---
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts
index 3dd51082e2..7e84661c66 100644
--- a/packages/effect/src/Stream.ts
+++ b/packages/effect/src/Stream.ts
@@ -3863,7 +3863,7 @@ export const toQueueOfElements: {
export const toReadableStream: {
(
options?: { readonly strategy?: QueuingStrategy | undefined }
- ): (
+ ): (
self: Stream
) => ReadableStream
(
@@ -3883,7 +3883,7 @@ export const toReadableStream: {
export const toReadableStreamEffect: {
(
options?: { readonly strategy?: QueuingStrategy | undefined }
- ): (
+ ): (
self: Stream
) => Effect.Effect, never, R>
(
@@ -3904,7 +3904,7 @@ export const toReadableStreamRuntime: {
(
runtime: Runtime,
options?: { readonly strategy?: QueuingStrategy | undefined }
- ): (self: Stream) => ReadableStream
+ ): (self: Stream) => ReadableStream
(
self: Stream,
runtime: Runtime,
diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts
index 2739a7f55f..45e29f1af4 100644
--- a/packages/effect/src/internal/stream.ts
+++ b/packages/effect/src/internal/stream.ts
@@ -6545,7 +6545,7 @@ export const toQueueOfElements = dual<
export const toReadableStream = dual<
(
options?: { readonly strategy?: QueuingStrategy | undefined }
- ) => (self: Stream.Stream) => ReadableStream,
+ ) => (self: Stream.Stream) => ReadableStream,
(
self: Stream.Stream,
options?: { readonly strategy?: QueuingStrategy | undefined }
@@ -6562,7 +6562,7 @@ export const toReadableStream = dual<
export const toReadableStreamEffect = dual<
(
options?: { readonly strategy?: QueuingStrategy | undefined }
- ) => (self: Stream.Stream) => Effect.Effect, never, R>,
+ ) => (self: Stream.Stream) => Effect.Effect, never, R>,
(
self: Stream.Stream,
options?: { readonly strategy?: QueuingStrategy | undefined }
@@ -6580,7 +6580,7 @@ export const toReadableStreamRuntime = dual<
(
runtime: Runtime.Runtime,
options?: { readonly strategy?: QueuingStrategy | undefined }
- ) => (self: Stream.Stream) => ReadableStream,
+ ) => (self: Stream.Stream) => ReadableStream,
(
self: Stream.Stream,
runtime: Runtime.Runtime,
From f730c1a48844d96a807d02f21ea1d41c368d16d4 Mon Sep 17 00:00:00 2001
From: Matt Rossman <22670878+mattrossman@users.noreply.github.com>
Date: Tue, 28 May 2024 08:32:09 -0400
Subject: [PATCH 3/3] Update changeset (minor)
Co-authored-by: Tim
---
.changeset/empty-islands-pump.md | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/.changeset/empty-islands-pump.md b/.changeset/empty-islands-pump.md
index 357bcb8896..e6ee5bab20 100644
--- a/.changeset/empty-islands-pump.md
+++ b/.changeset/empty-islands-pump.md
@@ -1,5 +1,5 @@
---
-"effect": patch
+"effect": minor
---
Add queuing strategy option for Stream.toReadableStream