Skip to content

Commit

Permalink
ensure PubSub.publishAll does not increase size while there are no su…
Browse files Browse the repository at this point in the history
…bscribers (#3161)
  • Loading branch information
tim-smart authored Jul 4, 2024
1 parent 75a6c97 commit a5737d6
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .changeset/sweet-games-sell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

ensure PubSub.publishAll does not increase size while there are no subscribers
15 changes: 13 additions & 2 deletions packages/effect/src/internal/pubsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ class BoundedPubSubArb<in out A> implements AtomicPubSub<A> {
}

publishAll(elements: Iterable<A>): Chunk.Chunk<A> {
if (this.subscriberCount === 0) {
return Chunk.empty()
}
const chunk = Chunk.fromIterable(elements)
const n = chunk.length
const size = this.publisherIndex - this.subscribersIndex
Expand Down Expand Up @@ -400,6 +403,9 @@ class BoundedPubSubPow2<in out A> implements AtomicPubSub<A> {
}

publishAll(elements: Iterable<A>): Chunk.Chunk<A> {
if (this.subscriberCount === 0) {
return Chunk.empty()
}
const chunk = Chunk.fromIterable(elements)
const n = chunk.length
const size = this.publisherIndex - this.subscribersIndex
Expand Down Expand Up @@ -560,6 +566,9 @@ class BoundedPubSubSingle<in out A> implements AtomicPubSub<A> {
}

publishAll(elements: Iterable<A>): Chunk.Chunk<A> {
if (this.subscriberCount === 0) {
return Chunk.empty()
}
const chunk = Chunk.fromIterable(elements)
if (Chunk.isEmpty(chunk)) {
return chunk
Expand Down Expand Up @@ -692,8 +701,10 @@ class UnboundedPubSub<in out A> implements AtomicPubSub<A> {
}

publishAll(elements: Iterable<A>): Chunk.Chunk<A> {
for (const a of elements) {
this.publish(a)
if (this.publisherTail.subscribers !== 0) {
for (const a of elements) {
this.publish(a)
}
}
return Chunk.empty()
}
Expand Down
28 changes: 22 additions & 6 deletions packages/effect/test/PubSub.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Option } from "effect"
import * as Array from "effect/Array"
import * as Deferred from "effect/Deferred"
import * as Effect from "effect/Effect"
Expand Down Expand Up @@ -632,17 +633,32 @@ describe("PubSub", () => {
return PubSub.unbounded<number | null>().pipe(
Effect.flatMap((pubsub) =>
Effect.scoped(
Effect.gen(function*(_) {
const dequeue1 = yield* _(PubSub.subscribe(pubsub))
const dequeue2 = yield* _(PubSub.subscribe(pubsub))
yield* _(PubSub.publishAll(pubsub, messages))
const takes1 = yield* _(Queue.takeAll(dequeue1))
const takes2 = yield* _(Queue.takeAll(dequeue2))
Effect.gen(function*() {
const dequeue1 = yield* PubSub.subscribe(pubsub)
const dequeue2 = yield* PubSub.subscribe(pubsub)
yield* PubSub.publishAll(pubsub, messages)
const takes1 = yield* Queue.takeAll(dequeue1)
const takes2 = yield* Queue.takeAll(dequeue2)
assert.deepStrictEqual([...takes1], messages)
assert.deepStrictEqual([...takes2], messages)
})
)
)
)
})

it.scoped("publish does not increase size while no subscribers", () =>
Effect.gen(function*() {
const pubsub = yield* PubSub.dropping<number>(2)
yield* PubSub.publish(pubsub, 1)
yield* PubSub.publish(pubsub, 2)
assert.deepStrictEqual(pubsub.unsafeSize(), Option.some(0))
}))

it.scoped("publishAll does not increase size while no subscribers", () =>
Effect.gen(function*() {
const pubsub = yield* PubSub.dropping<number>(2)
yield* PubSub.publishAll(pubsub, [1, 2])
assert.deepStrictEqual(pubsub.unsafeSize(), Option.some(0))
}))
})

0 comments on commit a5737d6

Please sign in to comment.