diff --git a/.changeset/stream-share.md b/.changeset/stream-share.md new file mode 100644 index 0000000000..f1dee66bd5 --- /dev/null +++ b/.changeset/stream-share.md @@ -0,0 +1,10 @@ +--- +"effect": minor +--- + +add `Stream.share` api + +The `Stream.share` api is a ref counted variant of the broadcast apis. + +It allows you to share a stream between multiple consumers, and will close the +upstream when the last consumer ends. diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 532668cab9..adbc0c9242 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -549,6 +549,42 @@ export const broadcast: { ): Effect.Effect>, never, Scope.Scope | R> } = internal.broadcast +/** + * Returns a new Stream that multicasts the original Stream, subscribing to it as soon as the first consumer subscribes. + * As long as there is at least one consumer, the upstream will continue running and emitting data. + * When all consumers have exited, the upstream will be finalized. + * + * @since 3.8.0 + * @category utils + */ +export const share: { + ( + config: { + readonly capacity: "unbounded" + readonly replay?: number | undefined + readonly idleTimeToLive?: Duration.DurationInput | undefined + } | { + readonly capacity: number + readonly strategy?: "sliding" | "dropping" | "suspend" | undefined + readonly replay?: number | undefined + readonly idleTimeToLive?: Duration.DurationInput | undefined + } + ): (self: Stream) => Effect.Effect, never, R | Scope.Scope> + ( + self: Stream, + config: { + readonly capacity: "unbounded" + readonly replay?: number | undefined + readonly idleTimeToLive?: Duration.DurationInput | undefined + } | { + readonly capacity: number + readonly strategy?: "sliding" | "dropping" | "suspend" | undefined + readonly replay?: number | undefined + readonly idleTimeToLive?: Duration.DurationInput | undefined + } + ): Effect.Effect, never, R | Scope.Scope> +} = internal.share + /** * Fan out the stream, producing a dynamic number of streams that have the * same elements as this stream. The driver stream will only ever advance the diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 1524d0e221..3a4b6d0bb9 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -21,6 +21,7 @@ import { pipeArguments } from "../Pipeable.js" import { hasProperty, isTagged, type Predicate, type Refinement } from "../Predicate.js" import * as PubSub from "../PubSub.js" import * as Queue from "../Queue.js" +import * as RcRef from "../RcRef.js" import * as Ref from "../Ref.js" import * as Runtime from "../Runtime.js" import * as Schedule from "../Schedule.js" @@ -837,6 +838,58 @@ export const broadcastDynamic = dual< ): Effect.Effect, never, Scope.Scope | R> => Effect.map(toPubSub(self, maximumLag), (pubsub) => flattenTake(fromPubSub(pubsub)))) +export const share = dual< + ( + config: { + readonly capacity: "unbounded" + readonly replay?: number | undefined + readonly idleTimeToLive?: Duration.DurationInput | undefined + } | { + readonly capacity: number + readonly strategy?: "sliding" | "dropping" | "suspend" | undefined + readonly replay?: number | undefined + readonly idleTimeToLive?: Duration.DurationInput | undefined + } + ) => ( + self: Stream.Stream + ) => Effect.Effect, never, R | Scope.Scope>, + ( + self: Stream.Stream, + config: { + readonly capacity: "unbounded" + readonly replay?: number | undefined + readonly idleTimeToLive?: Duration.DurationInput | undefined + } | { + readonly capacity: number + readonly strategy?: "sliding" | "dropping" | "suspend" | undefined + readonly replay?: number | undefined + readonly idleTimeToLive?: Duration.DurationInput | undefined + } + ) => Effect.Effect, never, R | Scope.Scope> +>( + 2, + ( + self: Stream.Stream, + options: { + readonly capacity: "unbounded" + readonly replay?: number | undefined + readonly idleTimeToLive?: Duration.DurationInput | undefined + } | { + readonly capacity: number + readonly strategy?: "sliding" | "dropping" | "suspend" | undefined + readonly replay?: number | undefined + readonly idleTimeToLive?: Duration.DurationInput | undefined + } + ): Effect.Effect, never, R | Scope.Scope> => + Effect.map( + RcRef.make({ + acquire: broadcastDynamic(self, options), + idleTimeToLive: options.idleTimeToLive + }), + (rcRef) => unwrapScoped(RcRef.get(rcRef)) + ) +) + /** @internal */ export const broadcastedQueues = dual< ( diff --git a/packages/effect/test/Stream/broadcasting.test.ts b/packages/effect/test/Stream/broadcasting.test.ts index b9fedaed0e..e83cb7b220 100644 --- a/packages/effect/test/Stream/broadcasting.test.ts +++ b/packages/effect/test/Stream/broadcasting.test.ts @@ -5,8 +5,11 @@ import * as Either from "effect/Either" import * as Fiber from "effect/Fiber" import { pipe } from "effect/Function" import * as Ref from "effect/Ref" +import * as Schedule from "effect/Schedule" +import * as Sink from "effect/Sink" import * as Stream from "effect/Stream" import * as it from "effect/test/utils/extend" +import * as TestClock from "effect/TestClock" import { assert, describe } from "vitest" describe("Stream", () => { @@ -101,4 +104,95 @@ describe("Stream", () => { ) assert.deepStrictEqual(Array.from(result), [0, 1, 2, 3, 4]) })) + + it.scoped("share sequenced", () => + Effect.gen(function*() { + const sharedStream = yield* Stream.fromSchedule(Schedule.spaced("1 seconds")).pipe( + Stream.share({ capacity: 16 }) + ) + + const firstFiber = yield* sharedStream.pipe( + Stream.take(1), + Stream.run(Sink.collectAll()), + Effect.map(Array.from), + Effect.fork + ) + + yield* TestClock.adjust("1 second") + + const first = yield* Fiber.join(firstFiber) + assert.deepStrictEqual(first, [0]) + + const secondFiber = yield* sharedStream.pipe( + Stream.take(1), + Stream.run(Sink.collectAll()), + Effect.map(Array.from), + Effect.fork + ) + + yield* TestClock.adjust("1 second") + + const second = yield* Fiber.join(secondFiber) + assert.deepStrictEqual(second, [0]) + })) + + it.scoped("share sequenced with idleTimeToLive", () => + Effect.gen(function*() { + const sharedStream = yield* Stream.fromSchedule(Schedule.spaced("1 seconds")).pipe( + Stream.share({ + capacity: 16, + idleTimeToLive: "1 second" + }) + ) + + const firstFiber = yield* sharedStream.pipe( + Stream.take(1), + Stream.run(Sink.collectAll()), + Effect.map(Array.from), + Effect.fork + ) + + yield* TestClock.adjust("1 second") + + const first = yield* Fiber.join(firstFiber) + assert.deepStrictEqual(first, [0]) + + const secondFiber = yield* sharedStream.pipe( + Stream.take(1), + Stream.run(Sink.collectAll()), + Effect.map(Array.from), + Effect.fork + ) + + yield* TestClock.adjust("1 second") + + const second = yield* Fiber.join(secondFiber) + assert.deepStrictEqual(second, [1]) + })) + + it.scoped("share parallel", () => + Effect.gen(function*() { + const sharedStream = yield* Stream.fromSchedule(Schedule.spaced("1 seconds")).pipe( + Stream.share({ capacity: 16 }) + ) + + const fiber1 = yield* sharedStream.pipe( + Stream.take(1), + Stream.run(Sink.collectAll()), + Effect.map((x) => Array.from(x)), + Effect.fork + ) + const fiber2 = yield* sharedStream.pipe( + Stream.take(2), + Stream.run(Sink.collectAll()), + Effect.map((x) => Array.from(x)), + Effect.fork + ) + + yield* TestClock.adjust("2 second") + const [result1, result2] = yield* Fiber.joinAll([fiber1, fiber2]) + + assert.deepStrictEqual(result1, [0]) + assert.deepStrictEqual(result2, [0, 1]) + })) })