From 042e9569d4f4237dff480bc16ad92f7d3d998a66 Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 10 Jul 2024 22:32:15 +1200 Subject: [PATCH] allow customizing the output buffer for the Stream.async* apis --- .changeset/ten-swans-rest.md | 19 ++++++++++++ packages/effect/src/Stream.ts | 15 +++++++-- packages/effect/src/internal/stream.ts | 42 ++++++++++++++++++++++---- 3 files changed, 67 insertions(+), 9 deletions(-) create mode 100644 .changeset/ten-swans-rest.md diff --git a/.changeset/ten-swans-rest.md b/.changeset/ten-swans-rest.md new file mode 100644 index 0000000000..f6c39fd38c --- /dev/null +++ b/.changeset/ten-swans-rest.md @@ -0,0 +1,19 @@ +--- +"effect": minor +--- + +allow customizing the output buffer for the Stream.async\* apis + +```ts +import { Stream } from "effect"; + +Stream.async( + (emit) => { + // ... + }, + { + bufferSize: 16, + strategy: "dropping", // you can also use "sliding" or "suspend" + }, +); +``` diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index cfa7761bea..ecb35c6d47 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -312,7 +312,10 @@ export const as: { const _async: ( register: (emit: Emit.Emit) => Effect.Effect | void, - outputBuffer?: number + bufferSize?: number | "unbounded" | { + readonly bufferSize?: number | undefined + readonly strategy?: "dropping" | "sliding" | "suspend" | undefined + } | undefined ) => Stream = internal._async export { @@ -364,7 +367,10 @@ export { */ export const asyncEffect: ( register: (emit: Emit.Emit) => Effect.Effect, - outputBuffer?: number + bufferSize?: number | "unbounded" | { + readonly bufferSize?: number | undefined + readonly strategy?: "dropping" | "sliding" | "suspend" | undefined + } | undefined ) => Stream = internal.asyncEffect /** @@ -378,7 +384,10 @@ export const asyncEffect: ( */ export const asyncScoped: ( register: (emit: Emit.Emit) => Effect.Effect, - outputBuffer?: number + bufferSize?: number | "unbounded" | { + readonly bufferSize?: number | undefined + readonly strategy?: "dropping" | "sliding" | "suspend" | undefined + } | undefined ) => Stream> = internal.asyncScoped /** diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 1d32a02a16..ec5dc3b54a 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -461,15 +461,39 @@ export const as = dual< (self: Stream.Stream, value: B) => Stream.Stream >(2, (self: Stream.Stream, value: B): Stream.Stream => map(self, () => value)) +const queueFromBufferOptions = ( + bufferSize?: number | "unbounded" | { + readonly bufferSize?: number | undefined + readonly strategy?: "dropping" | "sliding" | "suspend" | undefined + } | undefined +): Effect.Effect>> => { + if (bufferSize === "unbounded") { + return Queue.unbounded() + } else if (typeof bufferSize === "number" || bufferSize === undefined) { + return Queue.bounded(bufferSize ?? 16) + } + switch (bufferSize.strategy) { + case "dropping": + return Queue.dropping(bufferSize.bufferSize ?? 16) + case "sliding": + return Queue.sliding(bufferSize.bufferSize ?? 16) + default: + return Queue.bounded(bufferSize.bufferSize ?? 16) + } +} + /** @internal */ export const _async = ( register: ( emit: Emit.Emit ) => Effect.Effect | void, - outputBuffer = 16 + bufferSize?: number | "unbounded" | { + readonly bufferSize?: number | undefined + readonly strategy?: "dropping" | "sliding" | "suspend" | undefined + } | undefined ): Stream.Stream => Effect.acquireRelease( - Queue.bounded>(outputBuffer), + queueFromBufferOptions(bufferSize), (queue) => Queue.shutdown(queue) ).pipe( Effect.flatMap((output) => @@ -518,11 +542,14 @@ export const _async = ( /** @internal */ export const asyncEffect = ( register: (emit: Emit.Emit) => Effect.Effect, - outputBuffer = 16 + bufferSize?: number | "unbounded" | { + readonly bufferSize?: number | undefined + readonly strategy?: "dropping" | "sliding" | "suspend" | undefined + } | undefined ): Stream.Stream => pipe( Effect.acquireRelease( - Queue.bounded>(outputBuffer), + queueFromBufferOptions(bufferSize), (queue) => Queue.shutdown(queue) ), Effect.flatMap((output) => @@ -573,11 +600,14 @@ export const asyncEffect = ( /** @internal */ export const asyncScoped = ( register: (emit: Emit.Emit) => Effect.Effect, - outputBuffer = 16 + bufferSize?: number | "unbounded" | { + readonly bufferSize?: number | undefined + readonly strategy?: "dropping" | "sliding" | "suspend" | undefined + } | undefined ): Stream.Stream> => pipe( Effect.acquireRelease( - Queue.bounded>(outputBuffer), + queueFromBufferOptions(bufferSize), (queue) => Queue.shutdown(queue) ), Effect.flatMap((output) =>