From a054cb04fe590f9dfbf26bc9062abff6dc560d6e Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 7 Feb 2024 17:36:42 +1300 Subject: [PATCH] add releaseAll api to Semaphore --- .changeset/chilly-birds-eat.md | 8 +++++ packages/effect/src/Effect.ts | 1 + .../effect/src/internal/effect/circular.ts | 32 ++++++++++++------- packages/effect/test/Effect/semaphore.test.ts | 8 +++++ 4 files changed, 38 insertions(+), 11 deletions(-) create mode 100644 .changeset/chilly-birds-eat.md diff --git a/.changeset/chilly-birds-eat.md b/.changeset/chilly-birds-eat.md new file mode 100644 index 0000000000..4ff61292fd --- /dev/null +++ b/.changeset/chilly-birds-eat.md @@ -0,0 +1,8 @@ +--- +"effect": patch +--- + +add releaseAll api to Semaphore + +You can use `semphore.releaseAll` to atomically release all the permits of a +Semaphore. diff --git a/packages/effect/src/Effect.ts b/packages/effect/src/Effect.ts index 4ec45641f8..029b31c602 100644 --- a/packages/effect/src/Effect.ts +++ b/packages/effect/src/Effect.ts @@ -4689,6 +4689,7 @@ export interface Semaphore { withPermits(permits: number): (self: Effect) => Effect take(permits: number): Effect release(permits: number): Effect + releaseAll: Effect } /** diff --git a/packages/effect/src/internal/effect/circular.ts b/packages/effect/src/internal/effect/circular.ts index 1f0221ee80..f35a41a706 100644 --- a/packages/effect/src/internal/effect/circular.ts +++ b/packages/effect/src/internal/effect/circular.ts @@ -35,7 +35,7 @@ import * as supervisor from "../supervisor.js" /** @internal */ class Semaphore { - public waiters = new Array<() => void>() + public waiters = new Array<() => boolean>() public taken = 0 constructor(readonly permits: number) {} @@ -48,14 +48,16 @@ class Semaphore { core.asyncEither((resume) => { if (this.free < n) { const observer = () => { - if (this.free >= n) { - const observerIndex = this.waiters.findIndex((cb) => cb === observer) - if (observerIndex !== -1) { - this.waiters.splice(observerIndex, 1) - } - this.taken += n - resume(core.succeed(n)) + if (this.free < n) { + return false + } + const observerIndex = this.waiters.findIndex((cb) => cb === observer) + if (observerIndex !== -1) { + this.waiters.splice(observerIndex, 1) } + this.taken += n + resume(core.succeed(n)) + return true } this.waiters.push(observer) return Either.left(core.sync(() => { @@ -69,15 +71,23 @@ class Semaphore { return Either.right(core.succeed(n)) }) - readonly release = (n: number): Effect.Effect => + readonly updateTaken = (f: (n: number) => number): Effect.Effect => core.withFiberRuntime((fiber) => { - this.taken -= n + this.taken = f(this.taken) fiber.getFiberRef(currentScheduler).scheduleTask(() => { - this.waiters.forEach((wake) => wake()) + while (this.waiters.length > 0) { + if (this.waiters[0]() === false) { + break + } + } }, fiber.getFiberRef(core.currentSchedulingPriority)) return core.unit }) + readonly release = (n: number): Effect.Effect => this.updateTaken((taken) => taken - n) + + readonly releaseAll: Effect.Effect = this.updateTaken((_) => 0) + readonly withPermits = (n: number) => (self: Effect.Effect) => core.uninterruptibleMask((restore) => core.flatMap( diff --git a/packages/effect/test/Effect/semaphore.test.ts b/packages/effect/test/Effect/semaphore.test.ts index d2fa9b851d..386af6b650 100644 --- a/packages/effect/test/Effect/semaphore.test.ts +++ b/packages/effect/test/Effect/semaphore.test.ts @@ -34,4 +34,12 @@ describe("Effect", () => { yield* $(TestClock.adjust(D.seconds(3))) assert.equal(messages.length, 8) })) + + it.effect("releaseAll", () => + Effect.gen(function*(_) { + const sem = yield* _(Effect.makeSemaphore(4)) + yield* _(sem.take(4)) + yield* _(sem.releaseAll) + yield* _(sem.take(1)) + })) })