Skip to content

Commit

Permalink
add Stream.share api (#3080)
Browse files Browse the repository at this point in the history
Co-authored-by: Tim <hello@timsmart.co>
  • Loading branch information
2 people authored and mikearnaldi committed Sep 10, 2024
1 parent b4dd4b8 commit 68cf4ce
Show file tree
Hide file tree
Showing 4 changed files with 193 additions and 0 deletions.
10 changes: 10 additions & 0 deletions .changeset/stream-share.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"effect": minor
---

add `Stream.share` api

The `Stream.share` api is a ref counted variant of the broadcast apis.

It allows you to share a stream between multiple consumers, and will close the
upstream when the last consumer ends.
36 changes: 36 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -549,6 +549,42 @@ export const broadcast: {
): Effect.Effect<TupleOf<N, Stream<A, E>>, never, Scope.Scope | R>
} = internal.broadcast

/**
* Returns a new Stream that multicasts the original Stream, subscribing to it as soon as the first consumer subscribes.
* As long as there is at least one consumer, the upstream will continue running and emitting data.
* When all consumers have exited, the upstream will be finalized.
*
* @since 3.8.0
* @category utils
*/
export const share: {
<A, E>(
config: {
readonly capacity: "unbounded"
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
}
): <R>(self: Stream<A, E, R>) => Effect.Effect<Stream<A, E>, never, R | Scope.Scope>
<A, E, R>(
self: Stream<A, E, R>,
config: {
readonly capacity: "unbounded"
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
}
): Effect.Effect<Stream<A, E>, never, R | Scope.Scope>
} = internal.share

/**
* Fan out the stream, producing a dynamic number of streams that have the
* same elements as this stream. The driver stream will only ever advance the
Expand Down
53 changes: 53 additions & 0 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { pipeArguments } from "../Pipeable.js"
import { hasProperty, isTagged, type Predicate, type Refinement } from "../Predicate.js"
import * as PubSub from "../PubSub.js"
import * as Queue from "../Queue.js"
import * as RcRef from "../RcRef.js"
import * as Ref from "../Ref.js"
import * as Runtime from "../Runtime.js"
import * as Schedule from "../Schedule.js"
Expand Down Expand Up @@ -837,6 +838,58 @@ export const broadcastDynamic = dual<
): Effect.Effect<Stream.Stream<A, E>, never, Scope.Scope | R> =>
Effect.map(toPubSub(self, maximumLag), (pubsub) => flattenTake(fromPubSub(pubsub))))

export const share = dual<
<A, E>(
config: {
readonly capacity: "unbounded"
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
}
) => <R>(
self: Stream.Stream<A, E, R>
) => Effect.Effect<Stream.Stream<A, E>, never, R | Scope.Scope>,
<A, E, R>(
self: Stream.Stream<A, E, R>,
config: {
readonly capacity: "unbounded"
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
}
) => Effect.Effect<Stream.Stream<A, E>, never, R | Scope.Scope>
>(
2,
<A, E, R>(
self: Stream.Stream<A, E, R>,
options: {
readonly capacity: "unbounded"
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
readonly idleTimeToLive?: Duration.DurationInput | undefined
}
): Effect.Effect<Stream.Stream<A, E>, never, R | Scope.Scope> =>
Effect.map(
RcRef.make({
acquire: broadcastDynamic(self, options),
idleTimeToLive: options.idleTimeToLive
}),
(rcRef) => unwrapScoped(RcRef.get(rcRef))
)
)

/** @internal */
export const broadcastedQueues = dual<
<N extends number>(
Expand Down
94 changes: 94 additions & 0 deletions packages/effect/test/Stream/broadcasting.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ import * as Either from "effect/Either"
import * as Fiber from "effect/Fiber"
import { pipe } from "effect/Function"
import * as Ref from "effect/Ref"
import * as Schedule from "effect/Schedule"
import * as Sink from "effect/Sink"
import * as Stream from "effect/Stream"
import * as it from "effect/test/utils/extend"
import * as TestClock from "effect/TestClock"
import { assert, describe } from "vitest"

describe("Stream", () => {
Expand Down Expand Up @@ -101,4 +104,95 @@ describe("Stream", () => {
)
assert.deepStrictEqual(Array.from(result), [0, 1, 2, 3, 4])
}))

it.scoped("share sequenced", () =>
Effect.gen(function*() {
const sharedStream = yield* Stream.fromSchedule(Schedule.spaced("1 seconds")).pipe(
Stream.share({ capacity: 16 })
)

const firstFiber = yield* sharedStream.pipe(
Stream.take(1),
Stream.run(Sink.collectAll()),
Effect.map(Array.from),
Effect.fork
)

yield* TestClock.adjust("1 second")

const first = yield* Fiber.join(firstFiber)
assert.deepStrictEqual(first, [0])

const secondFiber = yield* sharedStream.pipe(
Stream.take(1),
Stream.run(Sink.collectAll()),
Effect.map(Array.from),
Effect.fork
)

yield* TestClock.adjust("1 second")

const second = yield* Fiber.join(secondFiber)
assert.deepStrictEqual(second, [0])
}))

it.scoped("share sequenced with idleTimeToLive", () =>
Effect.gen(function*() {
const sharedStream = yield* Stream.fromSchedule(Schedule.spaced("1 seconds")).pipe(
Stream.share({
capacity: 16,
idleTimeToLive: "1 second"
})
)

const firstFiber = yield* sharedStream.pipe(
Stream.take(1),
Stream.run(Sink.collectAll()),
Effect.map(Array.from),
Effect.fork
)

yield* TestClock.adjust("1 second")

const first = yield* Fiber.join(firstFiber)
assert.deepStrictEqual(first, [0])

const secondFiber = yield* sharedStream.pipe(
Stream.take(1),
Stream.run(Sink.collectAll()),
Effect.map(Array.from),
Effect.fork
)

yield* TestClock.adjust("1 second")

const second = yield* Fiber.join(secondFiber)
assert.deepStrictEqual(second, [1])
}))

it.scoped("share parallel", () =>
Effect.gen(function*() {
const sharedStream = yield* Stream.fromSchedule(Schedule.spaced("1 seconds")).pipe(
Stream.share({ capacity: 16 })
)

const fiber1 = yield* sharedStream.pipe(
Stream.take(1),
Stream.run(Sink.collectAll()),
Effect.map((x) => Array.from(x)),
Effect.fork
)
const fiber2 = yield* sharedStream.pipe(
Stream.take(2),
Stream.run(Sink.collectAll()),
Effect.map((x) => Array.from(x)),
Effect.fork
)

yield* TestClock.adjust("2 second")
const [result1, result2] = yield* Fiber.joinAll([fiber1, fiber2])

assert.deepStrictEqual(result1, [0])
assert.deepStrictEqual(result2, [0, 1])
}))
})

0 comments on commit 68cf4ce

Please sign in to comment.