diff --git a/.changeset/chilly-birds-eat.md b/.changeset/chilly-birds-eat.md new file mode 100644 index 0000000000..4ff61292fd --- /dev/null +++ b/.changeset/chilly-birds-eat.md @@ -0,0 +1,8 @@ +--- +"effect": patch +--- + +add releaseAll api to Semaphore + +You can use `semphore.releaseAll` to atomically release all the permits of a +Semaphore. diff --git a/packages/effect/src/Effect.ts b/packages/effect/src/Effect.ts index 4ec45641f8..029b31c602 100644 --- a/packages/effect/src/Effect.ts +++ b/packages/effect/src/Effect.ts @@ -4689,6 +4689,7 @@ export interface Semaphore { withPermits(permits: number): (self: Effect) => Effect take(permits: number): Effect release(permits: number): Effect + releaseAll: Effect } /** diff --git a/packages/effect/src/internal/effect/circular.ts b/packages/effect/src/internal/effect/circular.ts index 1f0221ee80..fbcffe05cd 100644 --- a/packages/effect/src/internal/effect/circular.ts +++ b/packages/effect/src/internal/effect/circular.ts @@ -35,7 +35,7 @@ import * as supervisor from "../supervisor.js" /** @internal */ class Semaphore { - public waiters = new Array<() => void>() + public waiters = new Array<() => boolean>() public taken = 0 constructor(readonly permits: number) {} @@ -48,14 +48,16 @@ class Semaphore { core.asyncEither((resume) => { if (this.free < n) { const observer = () => { - if (this.free >= n) { - const observerIndex = this.waiters.findIndex((cb) => cb === observer) - if (observerIndex !== -1) { - this.waiters.splice(observerIndex, 1) - } - this.taken += n - resume(core.succeed(n)) + if (this.free < n) { + return false + } + const observerIndex = this.waiters.findIndex((cb) => cb === observer) + if (observerIndex !== -1) { + this.waiters.splice(observerIndex, 1) } + this.taken += n + resume(core.succeed(n)) + return true } this.waiters.push(observer) return Either.left(core.sync(() => { @@ -73,11 +75,17 @@ class Semaphore { core.withFiberRuntime((fiber) => { this.taken -= n fiber.getFiberRef(currentScheduler).scheduleTask(() => { - this.waiters.forEach((wake) => wake()) + while (this.free > 0 && this.waiters.length > 0) { + if (this.waiters[0]() === false) { + break + } + } }, fiber.getFiberRef(core.currentSchedulingPriority)) return core.unit }) + readonly releaseAll: Effect.Effect = core.suspend(() => this.release(this.taken)) + readonly withPermits = (n: number) => (self: Effect.Effect) => core.uninterruptibleMask((restore) => core.flatMap( diff --git a/packages/effect/test/Effect/semaphore.test.ts b/packages/effect/test/Effect/semaphore.test.ts index d2fa9b851d..386af6b650 100644 --- a/packages/effect/test/Effect/semaphore.test.ts +++ b/packages/effect/test/Effect/semaphore.test.ts @@ -34,4 +34,12 @@ describe("Effect", () => { yield* $(TestClock.adjust(D.seconds(3))) assert.equal(messages.length, 8) })) + + it.effect("releaseAll", () => + Effect.gen(function*(_) { + const sem = yield* _(Effect.makeSemaphore(4)) + yield* _(sem.take(4)) + yield* _(sem.releaseAll) + yield* _(sem.take(1)) + })) })