Skip to content

Commit

Permalink
add PubSub replay option (#3135)
Browse files Browse the repository at this point in the history
Co-authored-by: Tim Smart <tim.smart@arisechurch.com>
  • Loading branch information
tim-smart and Tim Smart authored Jul 8, 2024
1 parent 6e1a8bc commit 188e23a
Show file tree
Hide file tree
Showing 4 changed files with 357 additions and 63 deletions.
18 changes: 18 additions & 0 deletions .changeset/seven-ghosts-move.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
---
"effect": minor
---

add `replay` option to PubSub constructors

This option adds a replay buffer in front of the given PubSub. The buffer will
replay the last `n` messages to any new subscriber.

```ts
Effect.gen(function*() {
const messages = [1, 2, 3, 4, 5]
const pubsub = yield* PubSub.bounded<number>({ capacity: 16, replay: 3 })
yield* PubSub.publishAll(pubsub, messages)
const sub = yield* PubSub.subscribe(pubsub)
assert.deepStrictEqual(Chunk.toReadonlyArray(yield* Queue.takeAll(sub)), [3, 4, 5])
}))
```
15 changes: 11 additions & 4 deletions packages/effect/src/PubSub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ export interface PubSub<in out A> extends Queue.Enqueue<A>, Pipeable {
* @since 2.0.0
* @category constructors
*/
export const bounded: <A>(requestedCapacity: number) => Effect.Effect<PubSub<A>> = internal.bounded
export const bounded: <A>(
capacity: number | { readonly capacity: number; readonly replay?: number | undefined }
) => Effect.Effect<PubSub<A>> = internal.bounded

/**
* Creates a bounded `PubSub` with the dropping strategy. The `PubSub` will drop new
Expand All @@ -57,7 +59,9 @@ export const bounded: <A>(requestedCapacity: number) => Effect.Effect<PubSub<A>>
* @since 2.0.0
* @category constructors
*/
export const dropping: <A>(requestedCapacity: number) => Effect.Effect<PubSub<A>> = internal.dropping
export const dropping: <A>(
capacity: number | { readonly capacity: number; readonly replay?: number | undefined }
) => Effect.Effect<PubSub<A>> = internal.dropping

/**
* Creates a bounded `PubSub` with the sliding strategy. The `PubSub` will add new
Expand All @@ -68,15 +72,18 @@ export const dropping: <A>(requestedCapacity: number) => Effect.Effect<PubSub<A>
* @since 2.0.0
* @category constructors
*/
export const sliding: <A>(requestedCapacity: number) => Effect.Effect<PubSub<A>> = internal.sliding
export const sliding: <A>(
capacity: number | { readonly capacity: number; readonly replay?: number | undefined }
) => Effect.Effect<PubSub<A>> = internal.sliding

/**
* Creates an unbounded `PubSub`.
*
* @since 2.0.0
* @category constructors
*/
export const unbounded: <A>() => Effect.Effect<PubSub<A>> = internal.unbounded
export const unbounded: <A>(options?: { readonly replay?: number | undefined }) => Effect.Effect<PubSub<A>> =
internal.unbounded

/**
* Returns the number of elements the queue can hold.
Expand Down
Loading

0 comments on commit 188e23a

Please sign in to comment.