Skip to content

Commit

Permalink
allowing customizing Stream pubsub strategy (#3216)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Jul 10, 2024
1 parent d9cdf23 commit 62a9a3f
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 25 deletions.
25 changes: 25 additions & 0 deletions .changeset/hungry-drinks-pretend.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
---
"effect": minor
---

allowing customizing Stream pubsub strategy

```ts
import { Schedule, Stream } from "effect";

// toPubSub
Stream.fromSchedule(Schedule.spaced(1000)).pipe(
Stream.toPubSub({
capacity: 16, // or "unbounded"
strategy: "dropping", // or "sliding" / "suspend"
}),
);

// also for the broadcast apis
Stream.fromSchedule(Schedule.spaced(1000)).pipe(
Stream.broadcastDynamic({
capacity: 16,
strategy: "dropping",
}),
);
```
173 changes: 148 additions & 25 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -712,19 +712,40 @@ export const branchAfter = dual<
export const broadcast = dual<
<N extends number>(
n: N,
maximumLag: number
maximumLag: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
) => <A, E, R>(
self: Stream.Stream<A, E, R>
) => Effect.Effect<TupleOf<N, Stream.Stream<A, E>>, never, Scope.Scope | R>,
<A, E, R, N extends number>(
self: Stream.Stream<A, E, R>,
n: N,
maximumLag: number
maximumLag: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
) => Effect.Effect<TupleOf<N, Stream.Stream<A, E>>, never, Scope.Scope | R>
>(3, <A, E, R, N extends number>(
self: Stream.Stream<A, E, R>,
n: N,
maximumLag: number
maximumLag: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): Effect.Effect<TupleOf<N, Stream.Stream<A, E>>, never, Scope.Scope | R> =>
pipe(
self,
Expand All @@ -737,41 +758,79 @@ export const broadcast = dual<
/** @internal */
export const broadcastDynamic = dual<
(
maximumLag: number
maximumLag: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
) => <A, E, R>(self: Stream.Stream<A, E, R>) => Effect.Effect<Stream.Stream<A, E>, never, Scope.Scope | R>,
<A, E, R>(
self: Stream.Stream<A, E, R>,
maximumLag: number
maximumLag: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
) => Effect.Effect<Stream.Stream<A, E>, never, Scope.Scope | R>
>(2, <A, E, R>(
self: Stream.Stream<A, E, R>,
maximumLag: number
maximumLag: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): Effect.Effect<Stream.Stream<A, E>, never, Scope.Scope | R> =>
pipe(
self,
broadcastedQueuesDynamic(maximumLag),
Effect.map((effect) => flattenTake(flatMap(scoped(effect), fromQueue)))
))
Effect.map(toPubSub(self, maximumLag), (pubsub) => flattenTake(fromPubSub(pubsub))))

/** @internal */
export const broadcastedQueues = dual<
<N extends number>(
n: N,
maximumLag: number
maximumLag: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
) => <A, E, R>(
self: Stream.Stream<A, E, R>
) => Effect.Effect<TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>, never, Scope.Scope | R>,
<A, E, R, N extends number>(
self: Stream.Stream<A, E, R>,
n: N,
maximumLag: number
maximumLag: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
) => Effect.Effect<TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>, never, Scope.Scope | R>
>(3, <A, E, R, N extends number>(
self: Stream.Stream<A, E, R>,
n: N,
maximumLag: number
maximumLag: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): Effect.Effect<TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>, never, Scope.Scope | R> =>
Effect.flatMap(PubSub.bounded<Take.Take<A, E>>(maximumLag), (pubsub) =>
Effect.flatMap(pubsubFromOptions(maximumLag), (pubsub) =>
pipe(
Effect.all(Array.from({ length: n }, () => PubSub.subscribe(pubsub))) as Effect.Effect<
TupleOf<N, Queue.Dequeue<Take.Take<A, E>>>,
Expand All @@ -784,17 +843,38 @@ export const broadcastedQueues = dual<
/** @internal */
export const broadcastedQueuesDynamic = dual<
(
maximumLag: number
maximumLag: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
) => <A, E, R>(
self: Stream.Stream<A, E, R>
) => Effect.Effect<Effect.Effect<Queue.Dequeue<Take.Take<A, E>>, never, Scope.Scope>, never, Scope.Scope | R>,
<A, E, R>(
self: Stream.Stream<A, E, R>,
maximumLag: number
maximumLag: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
) => Effect.Effect<Effect.Effect<Queue.Dequeue<Take.Take<A, E>>, never, Scope.Scope>, never, Scope.Scope | R>
>(2, <A, E, R>(
self: Stream.Stream<A, E, R>,
maximumLag: number
maximumLag: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): Effect.Effect<Effect.Effect<Queue.Dequeue<Take.Take<A, E>>, never, Scope.Scope>, never, Scope.Scope | R> =>
Effect.map(toPubSub(self, maximumLag), PubSub.subscribe))

Expand Down Expand Up @@ -6526,24 +6606,67 @@ export const timeoutTo = dual<
}
)

const pubsubFromOptions = <A, E>(
options: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): Effect.Effect<PubSub.PubSub<Take.Take<A, E>>> => {
if (typeof options === "number") {
return PubSub.bounded(options)
} else if (options.capacity === "unbounded") {
return PubSub.unbounded({ replay: options.replay })
}
switch (options.strategy) {
case "dropping":
return PubSub.dropping(options)
case "sliding":
return PubSub.sliding(options)
default:
return PubSub.bounded(options)
}
}

/** @internal */
export const toPubSub = dual<
(
capacity: number
capacity: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
) => <A, E, R>(self: Stream.Stream<A, E, R>) => Effect.Effect<PubSub.PubSub<Take.Take<A, E>>, never, Scope.Scope | R>,
<A, E, R>(
self: Stream.Stream<A, E, R>,
capacity: number
capacity: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
) => Effect.Effect<PubSub.PubSub<Take.Take<A, E>>, never, Scope.Scope | R>
>(2, <A, E, R>(
self: Stream.Stream<A, E, R>,
capacity: number
capacity: number | {
readonly capacity: "unbounded"
readonly replay?: number | undefined
} | {
readonly capacity: number
readonly strategy?: "sliding" | "dropping" | "suspend" | undefined
readonly replay?: number | undefined
}
): Effect.Effect<PubSub.PubSub<Take.Take<A, E>>, never, Scope.Scope | R> =>
pipe(
Effect.acquireRelease(
PubSub.bounded<Take.Take<A, E>>(capacity),
(pubsub) => PubSub.shutdown(pubsub)
),
Effect.acquireRelease(pubsubFromOptions<A, E>(capacity), (pubsub) => PubSub.shutdown(pubsub)),
Effect.tap((pubsub) => pipe(self, runIntoPubSubScoped(pubsub), Effect.forkScoped))
))

Expand Down

0 comments on commit 62a9a3f

Please sign in to comment.