Skip to content

Commit

Permalink
Add queuing strategy option for Stream.toReadableStream (#2864)
Browse files Browse the repository at this point in the history
Co-authored-by: Tim <hello@timsmart.co>
  • Loading branch information
mattrossman and tim-smart committed Jun 6, 2024
1 parent 5bd549e commit 9305b76
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 57 deletions.
5 changes: 5 additions & 0 deletions .changeset/empty-islands-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

Add queuing strategy option for Stream.toReadableStream
35 changes: 28 additions & 7 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3860,7 +3860,17 @@ export const toQueueOfElements: {
* @since 2.0.0
* @category destructors
*/
export const toReadableStream: <A, E>(self: Stream<A, E>) => ReadableStream<A> = internal.toReadableStream
export const toReadableStream: {
<A>(
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
): <E>(
self: Stream<A, E>
) => ReadableStream<A>
<A, E>(
self: Stream<A, E>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
): ReadableStream<A>
} = internal.toReadableStream

/**
* Converts the stream to a `Effect<ReadableStream>`.
Expand All @@ -3870,8 +3880,17 @@ export const toReadableStream: <A, E>(self: Stream<A, E>) => ReadableStream<A> =
* @since 2.0.0
* @category destructors
*/
export const toReadableStreamEffect: <A, E, R>(self: Stream<A, E, R>) => Effect.Effect<ReadableStream<A>, never, R> =
internal.toReadableStreamEffect
export const toReadableStreamEffect: {
<A>(
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
): <E, R>(
self: Stream<A, E, R>
) => Effect.Effect<ReadableStream<A>, never, R>
<A, E, R>(
self: Stream<A, E, R>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
): Effect.Effect<ReadableStream<A>, never, R>
} = internal.toReadableStreamEffect

/**
* Converts the stream to a `ReadableStream` using the provided runtime.
Expand All @@ -3882,12 +3901,14 @@ export const toReadableStreamEffect: <A, E, R>(self: Stream<A, E, R>) => Effect.
* @category destructors
*/
export const toReadableStreamRuntime: {
<XR>(
runtime: Runtime<XR>
): <A, E, R extends XR>(self: Stream<A, E, R>) => ReadableStream<A>
<A, XR>(
runtime: Runtime<XR>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
): <E, R extends XR>(self: Stream<A, E, R>) => ReadableStream<A>
<A, E, XR, R extends XR>(
self: Stream<A, E, R>,
runtime: Runtime<XR>
runtime: Runtime<XR>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
): ReadableStream<A>
} = internal.toReadableStreamRuntime

Expand Down
140 changes: 90 additions & 50 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6542,63 +6542,103 @@ export const toQueueOfElements = dual<
))

/** @internal */
export const toReadableStream = <A, E>(self: Stream.Stream<A, E>) =>
toReadableStreamRuntime(self, Runtime.defaultRuntime)
export const toReadableStream = dual<
<A>(
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
) => <E>(self: Stream.Stream<A, E>) => ReadableStream<A>,
<A, E>(
self: Stream.Stream<A, E>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
) => ReadableStream<A>
>(
(args) => isStream(args[0]),
<A, E>(
self: Stream.Stream<A, E>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
) => toReadableStreamRuntime(self, Runtime.defaultRuntime, options)
)

/** @internal */
export const toReadableStreamEffect = <A, E, R>(self: Stream.Stream<A, E, R>) =>
Effect.map(Effect.runtime<R>(), (runtime) => toReadableStreamRuntime(self, runtime))
export const toReadableStreamEffect = dual<
<A>(
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
) => <E, R>(self: Stream.Stream<A, E, R>) => Effect.Effect<ReadableStream<A>, never, R>,
<A, E, R>(
self: Stream.Stream<A, E, R>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
) => Effect.Effect<ReadableStream<A>, never, R>
>(
(args) => isStream(args[0]),
<A, E, R>(
self: Stream.Stream<A, E, R>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
) => Effect.map(Effect.runtime<R>(), (runtime) => toReadableStreamRuntime(self, runtime, options))
)

/** @internal */
export const toReadableStreamRuntime = dual<
<XR>(runtime: Runtime.Runtime<XR>) => <A, E, R extends XR>(self: Stream.Stream<A, E, R>) => ReadableStream<A>,
<A, E, XR, R extends XR>(self: Stream.Stream<A, E, R>, runtime: Runtime.Runtime<XR>) => ReadableStream<A>
>(2, <A, E, XR, R extends XR>(self: Stream.Stream<A, E, R>, runtime: Runtime.Runtime<XR>): ReadableStream<A> => {
const runSync = Runtime.runSync(runtime)
const runFork = Runtime.runFork(runtime)

let pull: Effect.Effect<void, never, R>
let scope: Scope.CloseableScope
return new ReadableStream<A>({
start(controller) {
scope = runSync(Scope.make())
pull = pipe(
toPull(self),
Scope.extend(scope),
runSync,
Effect.tap((chunk) =>
Effect.sync(() => {
Chunk.map(chunk, (a) => {
controller.enqueue(a)
})
})
),
Effect.tapErrorCause(() => Scope.close(scope, Exit.void)),
Effect.catchTags({
"None": () =>
Effect.sync(() => {
controller.close()
}),
"Some": (error) =>
<A, XR>(
runtime: Runtime.Runtime<XR>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
) => <E, R extends XR>(self: Stream.Stream<A, E, R>) => ReadableStream<A>,
<A, E, XR, R extends XR>(
self: Stream.Stream<A, E, R>,
runtime: Runtime.Runtime<XR>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
) => ReadableStream<A>
>(
(args) => isStream(args[0]),
<A, E, XR, R extends XR>(
self: Stream.Stream<A, E, R>,
runtime: Runtime.Runtime<XR>,
options?: { readonly strategy?: QueuingStrategy<A> | undefined }
): ReadableStream<A> => {
const runSync = Runtime.runSync(runtime)
const runFork = Runtime.runFork(runtime)

let pull: Effect.Effect<void, never, R>
let scope: Scope.CloseableScope
return new ReadableStream<A>({
start(controller) {
scope = runSync(Scope.make())
pull = pipe(
toPull(self),
Scope.extend(scope),
runSync,
Effect.tap((chunk) =>
Effect.sync(() => {
controller.error(error.value)
Chunk.map(chunk, (a) => {
controller.enqueue(a)
})
})
}),
Effect.asVoid
)
},
pull() {
return new Promise<void>((resolve) => {
runFork(pull, { scope }).addObserver((_) => resolve())
})
},
cancel() {
return new Promise<void>((resolve) => {
runFork(Scope.close(scope, Exit.void)).addObserver((_) => resolve())
})
}
})
})
),
Effect.tapErrorCause(() => Scope.close(scope, Exit.void)),
Effect.catchTags({
"None": () =>
Effect.sync(() => {
controller.close()
}),
"Some": (error) =>
Effect.sync(() => {
controller.error(error.value)
})
}),
Effect.asVoid
)
},
pull() {
return new Promise<void>((resolve) => {
runFork(pull, { scope }).addObserver((_) => resolve())
})
},
cancel() {
return new Promise<void>((resolve) => {
runFork(Scope.close(scope, Exit.void)).addObserver((_) => resolve())
})
}
}, options?.strategy)
}
)

/** @internal */
export const transduce = dual<
Expand Down

0 comments on commit 9305b76

Please sign in to comment.