From 46eeac152bbc9373263cf9186e8e0c2f0215af44 Mon Sep 17 00:00:00 2001 From: Tim Date: Fri, 26 Jul 2024 13:48:23 +1200 Subject: [PATCH] switch to object options for Stream.async apis (#3213) --- .changeset/chilled-ducks-sniff.md | 24 ++++++++++++++++ packages/effect/src/Stream.ts | 6 ++-- packages/effect/src/internal/stream.ts | 34 +++++++++++++---------- packages/effect/test/Stream/async.test.ts | 6 ++-- 4 files changed, 50 insertions(+), 20 deletions(-) create mode 100644 .changeset/chilled-ducks-sniff.md diff --git a/.changeset/chilled-ducks-sniff.md b/.changeset/chilled-ducks-sniff.md new file mode 100644 index 0000000000..2f65736f7e --- /dev/null +++ b/.changeset/chilled-ducks-sniff.md @@ -0,0 +1,24 @@ +--- +"effect": major +--- + +Use object options for Stream.async apis + +Instead of: + +```ts +Stream.async((emit) => { + //... +}, 16); +``` + +You can now write: + +```ts +Stream.async( + (emit) => { + //... + }, + { bufferSize: 16 }, +); +``` diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index bbcf0bd702..1a4ca72069 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -314,7 +314,7 @@ export const as: { const _async: ( register: (emit: Emit.Emit) => Effect.Effect | void, - bufferSize?: number | "unbounded" | { + options?: { readonly bufferSize: "unbounded" } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined @@ -369,7 +369,7 @@ export { */ export const asyncEffect: ( register: (emit: Emit.Emit) => Effect.Effect, - bufferSize?: number | "unbounded" | { + options?: { readonly bufferSize: "unbounded" } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined @@ -426,7 +426,7 @@ export const asyncPush: ( */ export const asyncScoped: ( register: (emit: Emit.Emit) => Effect.Effect, - bufferSize?: number | "unbounded" | { + options?: { readonly bufferSize: "unbounded" } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 7680583983..2596ac63ac 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -466,23 +466,23 @@ export const as = dual< >(2, (self: Stream.Stream, value: B): Stream.Stream => map(self, () => value)) const queueFromBufferOptions = ( - bufferSize?: number | "unbounded" | { + options?: { + readonly bufferSize: "unbounded" + } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined ): Effect.Effect>> => { - if (bufferSize === "unbounded") { + if (options?.bufferSize === "unbounded") { return Queue.unbounded() - } else if (typeof bufferSize === "number" || bufferSize === undefined) { - return Queue.bounded(bufferSize ?? 16) } - switch (bufferSize.strategy) { + switch (options?.strategy) { case "dropping": - return Queue.dropping(bufferSize.bufferSize ?? 16) + return Queue.dropping(options.bufferSize ?? 16) case "sliding": - return Queue.sliding(bufferSize.bufferSize ?? 16) + return Queue.sliding(options.bufferSize ?? 16) default: - return Queue.bounded(bufferSize.bufferSize ?? 16) + return Queue.bounded(options?.bufferSize ?? 16) } } @@ -491,13 +491,15 @@ export const _async = ( register: ( emit: Emit.Emit ) => Effect.Effect | void, - bufferSize?: number | "unbounded" | { + options?: { + readonly bufferSize: "unbounded" + } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined ): Stream.Stream => Effect.acquireRelease( - queueFromBufferOptions(bufferSize), + queueFromBufferOptions(options), (queue) => Queue.shutdown(queue) ).pipe( Effect.flatMap((output) => @@ -546,14 +548,16 @@ export const _async = ( /** @internal */ export const asyncEffect = ( register: (emit: Emit.Emit) => Effect.Effect, - bufferSize?: number | "unbounded" | { + options?: { + readonly bufferSize: "unbounded" + } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined ): Stream.Stream => pipe( Effect.acquireRelease( - queueFromBufferOptions(bufferSize), + queueFromBufferOptions(options), (queue) => Queue.shutdown(queue) ), Effect.flatMap((output) => @@ -649,14 +653,16 @@ export const asyncPush = ( /** @internal */ export const asyncScoped = ( register: (emit: Emit.Emit) => Effect.Effect, - bufferSize?: number | "unbounded" | { + options?: { + readonly bufferSize: "unbounded" + } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined ): Stream.Stream> => pipe( Effect.acquireRelease( - queueFromBufferOptions(bufferSize), + queueFromBufferOptions(options), (queue) => Queue.shutdown(queue) ), Effect.flatMap((output) => diff --git a/packages/effect/test/Stream/async.test.ts b/packages/effect/test/Stream/async.test.ts index 7b7e979f6d..3a4a100f6b 100644 --- a/packages/effect/test/Stream/async.test.ts +++ b/packages/effect/test/Stream/async.test.ts @@ -110,7 +110,7 @@ describe("Stream", () => { ) ) return Effect.void - }, 5) + }, { bufferSize: 5 }) const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never)) const fiber = yield* $(stream, Stream.run(sink), Effect.fork) yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 })) @@ -205,7 +205,7 @@ describe("Stream", () => { ) ) return Effect.void - }, 5) + }, { bufferSize: 5 }) const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never)) const fiber = yield* $(stream, Stream.run(sink), Effect.fork) yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 })) @@ -398,7 +398,7 @@ describe("Stream", () => { ) ) return Effect.void - }, 5) + }, { bufferSize: 5 }) const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never)) const fiber = yield* $(stream, Stream.run(sink), Effect.fork) yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 }))