Skip to content

Commit

Permalink
add releaseAll api to Semaphore (#2067)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Feb 7, 2024
1 parent 330e1a4 commit d0b911c
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 16 deletions.
8 changes: 8 additions & 0 deletions .changeset/chilly-birds-eat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
"effect": patch
---

add releaseAll api to Semaphore

You can use `semphore.releaseAll` to atomically release all the permits of a
Semaphore.
1 change: 1 addition & 0 deletions packages/effect/src/Effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4689,6 +4689,7 @@ export interface Semaphore {
withPermits(permits: number): <R, E, A>(self: Effect<R, E, A>) => Effect<R, E, A>
take(permits: number): Effect<never, never, number>
release(permits: number): Effect<never, never, void>
releaseAll: Effect<never, never, void>
}

/**
Expand Down
36 changes: 20 additions & 16 deletions packages/effect/src/internal/effect/circular.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import * as supervisor from "../supervisor.js"

/** @internal */
class Semaphore {
public waiters = new Array<() => void>()
public waiters = new Set<() => boolean>()
public taken = 0

constructor(readonly permits: number) {}
Expand All @@ -48,36 +48,40 @@ class Semaphore {
core.asyncEither<never, never, number>((resume) => {
if (this.free < n) {
const observer = () => {
if (this.free >= n) {
const observerIndex = this.waiters.findIndex((cb) => cb === observer)
if (observerIndex !== -1) {
this.waiters.splice(observerIndex, 1)
}
this.taken += n
resume(core.succeed(n))
if (this.free < n) {
return false
}
this.waiters.delete(observer)
this.taken += n
resume(core.succeed(n))
return true
}
this.waiters.push(observer)
this.waiters.add(observer)
return Either.left(core.sync(() => {
const observerIndex = this.waiters.findIndex((cb) => cb === observer)
if (observerIndex !== -1) {
this.waiters.splice(observerIndex, 1)
}
this.waiters.delete(observer)
}))
}
this.taken += n
return Either.right(core.succeed(n))
})

readonly release = (n: number): Effect.Effect<never, never, void> =>
readonly updateTaken = (f: (n: number) => number): Effect.Effect<never, never, void> =>
core.withFiberRuntime<never, never, void>((fiber) => {
this.taken -= n
this.taken = f(this.taken)
fiber.getFiberRef(currentScheduler).scheduleTask(() => {
this.waiters.forEach((wake) => wake())
const iter = this.waiters.values()
let item = iter.next()
while (item.done === false && item.value() === true) {
item = iter.next()
}
}, fiber.getFiberRef(core.currentSchedulingPriority))
return core.unit
})

readonly release = (n: number): Effect.Effect<never, never, void> => this.updateTaken((taken) => taken - n)

readonly releaseAll: Effect.Effect<never, never, void> = this.updateTaken((_) => 0)

readonly withPermits = (n: number) => <R, E, A>(self: Effect.Effect<R, E, A>) =>
core.uninterruptibleMask((restore) =>
core.flatMap(
Expand Down
8 changes: 8 additions & 0 deletions packages/effect/test/Effect/semaphore.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,12 @@ describe("Effect", () => {
yield* $(TestClock.adjust(D.seconds(3)))
assert.equal(messages.length, 8)
}))

it.effect("releaseAll", () =>
Effect.gen(function*(_) {
const sem = yield* _(Effect.makeSemaphore(4))
yield* _(sem.take(4))
yield* _(sem.releaseAll)
yield* _(sem.take(1))
}))
})

0 comments on commit d0b911c

Please sign in to comment.