Skip to content

Commit

Permalink
use "dropping" strategy by default for Stream.async apis
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Jul 10, 2024
1 parent 9c8cde8 commit 71005fc
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 21 deletions.
24 changes: 24 additions & 0 deletions .changeset/chilled-ducks-sniff.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"effect": major
---

Use object options for Stream.async apis

Instead of:

```ts
Stream.async((emit) => {
//...
}, 16);
```

You can now write:

```ts
Stream.async(
(emit) => {
//...
},
{ bufferSize: 16 },
);
```
5 changes: 5 additions & 0 deletions .changeset/cool-birds-exercise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": major
---

use "dropping" strategy by default for Stream.async apis
6 changes: 3 additions & 3 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ export const as: {

const _async: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<void, never, R> | void,
bufferSize?: number | "unbounded" | {
options?: { readonly bufferSize: "unbounded" } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
Expand Down Expand Up @@ -367,7 +367,7 @@ export {
*/
export const asyncEffect: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R>,
bufferSize?: number | "unbounded" | {
options?: { readonly bufferSize: "unbounded" } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
Expand All @@ -384,7 +384,7 @@ export const asyncEffect: <A, E = never, R = never>(
*/
export const asyncScoped: <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
bufferSize?: number | "unbounded" | {
options?: { readonly bufferSize: "unbounded" } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
Expand Down
36 changes: 21 additions & 15 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -462,23 +462,23 @@ export const as = dual<
>(2, <A, E, R, B>(self: Stream.Stream<A, E, R>, value: B): Stream.Stream<B, E, R> => map(self, () => value))

const queueFromBufferOptions = <A, E>(
bufferSize?: number | "unbounded" | {
options?: {
readonly bufferSize: "unbounded"
} | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Effect.Effect<Queue.Queue<Take.Take<A, E>>> => {
if (bufferSize === "unbounded") {
if (options?.bufferSize === "unbounded") {
return Queue.unbounded()
} else if (typeof bufferSize === "number" || bufferSize === undefined) {
return Queue.bounded(bufferSize ?? 16)
}
switch (bufferSize.strategy) {
case "dropping":
return Queue.dropping(bufferSize.bufferSize ?? 16)
switch (options?.strategy) {
case "sliding":
return Queue.sliding(bufferSize.bufferSize ?? 16)
return Queue.sliding(options.bufferSize ?? 16)
case "suspend":
return Queue.bounded(options.bufferSize ?? 16)
default:
return Queue.bounded(bufferSize.bufferSize ?? 16)
return Queue.dropping(options?.bufferSize ?? 16)
}
}

Expand All @@ -487,13 +487,15 @@ export const _async = <A, E = never, R = never>(
register: (
emit: Emit.Emit<R, E, A, void>
) => Effect.Effect<void, never, R> | void,
bufferSize?: number | "unbounded" | {
options?: {
readonly bufferSize: "unbounded"
} | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Stream.Stream<A, E, R> =>
Effect.acquireRelease(
queueFromBufferOptions<A, E>(bufferSize),
queueFromBufferOptions<A, E>(options),
(queue) => Queue.shutdown(queue)
).pipe(
Effect.flatMap((output) =>
Expand Down Expand Up @@ -542,14 +544,16 @@ export const _async = <A, E = never, R = never>(
/** @internal */
export const asyncEffect = <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R>,
bufferSize?: number | "unbounded" | {
options?: {
readonly bufferSize: "unbounded"
} | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Stream.Stream<A, E, R> =>
pipe(
Effect.acquireRelease(
queueFromBufferOptions<A, E>(bufferSize),
queueFromBufferOptions<A, E>(options),
(queue) => Queue.shutdown(queue)
),
Effect.flatMap((output) =>
Expand Down Expand Up @@ -600,14 +604,16 @@ export const asyncEffect = <A, E = never, R = never>(
/** @internal */
export const asyncScoped = <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
bufferSize?: number | "unbounded" | {
options?: {
readonly bufferSize: "unbounded"
} | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | "suspend" | undefined
} | undefined
): Stream.Stream<A, E, Exclude<R, Scope.Scope>> =>
pipe(
Effect.acquireRelease(
queueFromBufferOptions<A, E>(bufferSize),
queueFromBufferOptions<A, E>(options),
(queue) => Queue.shutdown(queue)
),
Effect.flatMap((output) =>
Expand Down
6 changes: 3 additions & 3 deletions packages/effect/test/Stream/async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ describe("Stream", () => {
)
)
return Effect.void
}, 5)
}, { bufferSize: 5 })
const sink = pipe(Sink.take<number>(1), Sink.zipRight(Sink.never))
const fiber = yield* $(stream, Stream.run(sink), Effect.fork)
yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 }))
Expand Down Expand Up @@ -205,7 +205,7 @@ describe("Stream", () => {
)
)
return Effect.void
}, 5)
}, { bufferSize: 5 })
const sink = pipe(Sink.take<number>(1), Sink.zipRight(Sink.never))
const fiber = yield* $(stream, Stream.run(sink), Effect.fork)
yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 }))
Expand Down Expand Up @@ -398,7 +398,7 @@ describe("Stream", () => {
)
)
return Effect.void
}, 5)
}, { bufferSize: 5 })
const sink = pipe(Sink.take<number>(1), Sink.zipRight(Sink.never))
const fiber = yield* $(stream, Stream.run(sink), Effect.fork)
yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 }))
Expand Down

0 comments on commit 71005fc

Please sign in to comment.