Skip to content

Commit

Permalink
add releaseAll api to Semaphore
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Feb 7, 2024
1 parent 296bc1c commit e2c5e8c
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 9 deletions.
8 changes: 8 additions & 0 deletions .changeset/chilly-birds-eat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"effect": patch
---

add releaseAll api to Semaphore

You can use `semphore.releaseAll` to atomically release all the permits of a
Semaphore.
1 change: 1 addition & 0 deletions packages/effect/src/Effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4689,6 +4689,7 @@ export interface Semaphore {
withPermits(permits: number): <R, E, A>(self: Effect<R, E, A>) => Effect<R, E, A>
take(permits: number): Effect<never, never, number>
release(permits: number): Effect<never, never, void>
releaseAll: Effect<never, never, void>
}

/**
Expand Down
26 changes: 17 additions & 9 deletions packages/effect/src/internal/effect/circular.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import * as supervisor from "../supervisor.js"

/** @internal */
class Semaphore {
public waiters = new Array<() => void>()
public waiters = new Array<() => boolean>()
public taken = 0

constructor(readonly permits: number) {}
Expand All @@ -48,14 +48,16 @@ class Semaphore {
core.asyncEither<never, never, number>((resume) => {
if (this.free < n) {
const observer = () => {
if (this.free >= n) {
const observerIndex = this.waiters.findIndex((cb) => cb === observer)
if (observerIndex !== -1) {
this.waiters.splice(observerIndex, 1)
}
this.taken += n
resume(core.succeed(n))
if (this.free < n) {
return false
}
const observerIndex = this.waiters.findIndex((cb) => cb === observer)
if (observerIndex !== -1) {
this.waiters.splice(observerIndex, 1)
}
this.taken += n
resume(core.succeed(n))
return true
}
this.waiters.push(observer)
return Either.left(core.sync(() => {
Expand All @@ -73,11 +75,17 @@ class Semaphore {
core.withFiberRuntime<never, never, void>((fiber) => {
this.taken -= n
fiber.getFiberRef(currentScheduler).scheduleTask(() => {
this.waiters.forEach((wake) => wake())
while (this.free > 0 && this.waiters.length > 0) {
if (this.waiters[0]() === false) {
break
}
}
}, fiber.getFiberRef(core.currentSchedulingPriority))
return core.unit
})

readonly releaseAll: Effect.Effect<never, never, void> = core.suspend(() => this.release(this.taken))

readonly withPermits = (n: number) => <R, E, A>(self: Effect.Effect<R, E, A>) =>
core.uninterruptibleMask((restore) =>
core.flatMap(
Expand Down
8 changes: 8 additions & 0 deletions packages/effect/test/Effect/semaphore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,12 @@ describe("Effect", () => {
yield* $(TestClock.adjust(D.seconds(3)))
assert.equal(messages.length, 8)
}))

it.effect("releaseAll", () =>
Effect.gen(function*(_) {
const sem = yield* _(Effect.makeSemaphore(4))
yield* _(sem.take(4))
yield* _(sem.releaseAll)
yield* _(sem.take(1))
}))
})

0 comments on commit e2c5e8c

Please sign in to comment.