Skip to content

Commit

Permalink
allow customizing the output buffer for the Stream.async* apis
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Jul 10, 2024
1 parent 349b890 commit 042e956
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 9 deletions.
19 changes: 19 additions & 0 deletions .changeset/ten-swans-rest.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
---
"effect": minor
---

allow customizing the output buffer for the Stream.async\* apis

```ts
import { Stream } from "effect";

Stream.async<string>(
(emit) => {
// ...
},
{
bufferSize: 16,
strategy: "dropping", // you can also use "sliding" or "suspend"
},
);
```
15 changes: 12 additions & 3 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,10 @@ export const as: {

const _async: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<void, never, R> | void,
outputBuffer?: number
bufferSize?: number | "unbounded" | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
) => Stream<A, E, R> = internal._async

export {
Expand Down Expand Up @@ -364,7 +367,10 @@ export {
*/
export const asyncEffect: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R>,
outputBuffer?: number
bufferSize?: number | "unbounded" | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
) => Stream<A, E, R> = internal.asyncEffect

/**
Expand All @@ -378,7 +384,10 @@ export const asyncEffect: <A, E = never, R = never>(
*/
export const asyncScoped: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
outputBuffer?: number
bufferSize?: number | "unbounded" | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
) => Stream<A, E, Exclude<R, Scope.Scope>> = internal.asyncScoped

/**
Expand Down
42 changes: 36 additions & 6 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,15 +461,39 @@ export const as = dual<
<A, E, R, B>(self: Stream.Stream<A, E, R>, value: B) => Stream.Stream<B, E, R>
>(2, <A, E, R, B>(self: Stream.Stream<A, E, R>, value: B): Stream.Stream<B, E, R> => map(self, () => value))

const queueFromBufferOptions = <A, E>(
bufferSize?: number | "unbounded" | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Effect.Effect<Queue.Queue<Take.Take<A, E>>> => {
if (bufferSize === "unbounded") {
return Queue.unbounded()
} else if (typeof bufferSize === "number" || bufferSize === undefined) {
return Queue.bounded(bufferSize ?? 16)
}
switch (bufferSize.strategy) {
case "dropping":
return Queue.dropping(bufferSize.bufferSize ?? 16)
case "sliding":
return Queue.sliding(bufferSize.bufferSize ?? 16)
default:
return Queue.bounded(bufferSize.bufferSize ?? 16)
}
}

/** @internal */
export const _async = <A, E = never, R = never>(
register: (
emit: Emit.Emit<R, E, A, void>
) => Effect.Effect<void, never, R> | void,
outputBuffer = 16
bufferSize?: number | "unbounded" | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Stream.Stream<A, E, R> =>
Effect.acquireRelease(
Queue.bounded<Take.Take<A, E>>(outputBuffer),
queueFromBufferOptions<A, E>(bufferSize),
(queue) => Queue.shutdown(queue)
).pipe(
Effect.flatMap((output) =>
Expand Down Expand Up @@ -518,11 +542,14 @@ export const _async = <A, E = never, R = never>(
/** @internal */
export const asyncEffect = <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R>,
outputBuffer = 16
bufferSize?: number | "unbounded" | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Stream.Stream<A, E, R> =>
pipe(
Effect.acquireRelease(
Queue.bounded<Take.Take<A, E>>(outputBuffer),
queueFromBufferOptions<A, E>(bufferSize),
(queue) => Queue.shutdown(queue)
),
Effect.flatMap((output) =>
Expand Down Expand Up @@ -573,11 +600,14 @@ export const asyncEffect = <A, E = never, R = never>(
/** @internal */
export const asyncScoped = <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
outputBuffer = 16
bufferSize?: number | "unbounded" | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Stream.Stream<A, E, Exclude<R, Scope.Scope>> =>
pipe(
Effect.acquireRelease(
Queue.bounded<Take.Take<A, E>>(outputBuffer),
queueFromBufferOptions<A, E>(bufferSize),
(queue) => Queue.shutdown(queue)
),
Effect.flatMap((output) =>
Expand Down

0 comments on commit 042e956

Please sign in to comment.