Skip to content

Commit

Permalink
add Effect.makeLatch (#3571)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Sep 15, 2024
1 parent fcfa6ee commit 273565e
Show file tree
Hide file tree
Showing 4 changed files with 168 additions and 0 deletions.
24 changes: 24 additions & 0 deletions .changeset/lucky-eagles-speak.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
---
"effect": minor
---

add Effect.makeLatch, for creating a simple async latch

```ts
import { Effect } from "effect"

Effect.gen(function* () {
// Create a latch, starting in the closed state
const latch = yield* Effect.makeLatch(false)

// Fork a fiber that logs "open sesame" when the latch is opened
const fiber = yield* Effect.log("open sesame").pipe(
latch.whenOpen,
Effect.fork
)

// Open the latch
yield* latch.open
yield* fiber.await
})
```
50 changes: 50 additions & 0 deletions packages/effect/src/Effect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5400,6 +5400,56 @@ export const unsafeMakeSemaphore: (permits: number) => Semaphore = circular.unsa
*/
export const makeSemaphore: (permits: number) => Effect<Semaphore> = circular.makeSemaphore

// -------------------------------------------------------------------------------------
// latch
// -------------------------------------------------------------------------------------

/**
* @category latch
* @since 3.8.0
*/
export interface Latch {
/** open the latch, releasing all fibers waiting on it */
readonly open: Effect<void>
/** release all fibers waiting on the latch, without opening it */
readonly release: Effect<void>
/** wait for the latch to be opened */
readonly await: Effect<void>
/** close the latch */
readonly close: Effect<void>
/** only run the given effect when the latch is open */
readonly whenOpen: <A, E, R>(self: Effect<A, E, R>) => Effect<A, E, R>
}

/**
* @category latch
* @since 3.8.0
*/
export const unsafeMakeLatch: (open?: boolean | undefined) => Latch = circular.unsafeMakeLatch

/**
* @category latch
* @since 3.8.0
* @example
* import { Effect } from "effect"
*
* Effect.gen(function*() {
* // Create a latch, starting in the closed state
* const latch = yield* Effect.makeLatch(false)
*
* // Fork a fiber that logs "open sesame" when the latch is opened
* const fiber = yield* Effect.log("open sesame").pipe(
* latch.whenOpen,
* Effect.fork
* )
*
* // Open the latch
* yield* latch.open
* yield* fiber.await
* })
*/
export const makeLatch: (open?: boolean | undefined) => Effect<Latch, never, never> = circular.makeLatch

// -------------------------------------------------------------------------------------
// execution
// -------------------------------------------------------------------------------------
Expand Down
61 changes: 61 additions & 0 deletions packages/effect/src/internal/effect/circular.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,67 @@ export const unsafeMakeSemaphore = (permits: number): Semaphore => new Semaphore
/** @internal */
export const makeSemaphore = (permits: number) => core.sync(() => unsafeMakeSemaphore(permits))

class Latch implements Effect.Latch {
waiters: Array<(_: Effect.Effect<void>) => void> = []
scheduled = false
constructor(private isOpen: boolean) {}

private unsafeSchedule(fiber: Fiber.RuntimeFiber<void>) {
if (this.scheduled || this.waiters.length === 0) {
return core.void
}
this.scheduled = true
fiber.currentScheduler.scheduleTask(this.flushWaiters, fiber.getFiberRef(core.currentSchedulingPriority))
return core.void
}
private flushWaiters = () => {
this.scheduled = false
const waiters = this.waiters
this.waiters = []
for (let i = 0; i < waiters.length; i++) {
waiters[i](core.exitVoid)
}
}

open = core.withFiberRuntime<void>((fiber) => {
if (this.isOpen) {
return core.void
}
this.isOpen = true
return this.unsafeSchedule(fiber)
})
release = core.withFiberRuntime<void>((fiber) => {
if (this.isOpen) {
return core.void
}
return this.unsafeSchedule(fiber)
})
await = core.unsafeAsync<void>((resume) => {
if (this.isOpen) {
return resume(core.void)
}
this.waiters.push(resume)
return core.sync(() => {
const index = this.waiters.indexOf(resume)
if (index !== -1) {
this.waiters.splice(index, 1)
}
})
})
close = core.sync(() => {
this.isOpen = false
})
whenOpen = <A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> => {
return core.zipRight(this.await, self)
}
}

/** @internal */
export const unsafeMakeLatch = (open?: boolean | undefined): Effect.Latch => new Latch(open ?? false)

/** @internal */
export const makeLatch = (open?: boolean | undefined) => core.sync(() => unsafeMakeLatch(open))

/** @internal */
export const awaitAllChildren = <A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> =>
ensuringChildren(self, fiberRuntime.fiberAwaitAll)
Expand Down
33 changes: 33 additions & 0 deletions packages/effect/test/Effect/latch.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { Effect, Exit } from "effect"
import { assert, describe, it } from "effect/test/utils/extend"

describe("Latch", () => {
it.effect("open works", () =>
Effect.gen(function*() {
const latch = yield* Effect.makeLatch()
let fiber = yield* latch.await.pipe(
Effect.fork
)
yield* Effect.yieldNow()
assert.isNull(fiber.unsafePoll())
yield* latch.open
assert.deepStrictEqual(yield* fiber.await, Exit.void)

fiber = yield* latch.await.pipe(
Effect.fork
)
yield* Effect.yieldNow()
assert.deepStrictEqual(fiber.unsafePoll(), Exit.void)

yield* latch.close
fiber = yield* Effect.void.pipe(
latch.whenOpen,
Effect.fork
)
yield* Effect.yieldNow()
assert.isNull(fiber.unsafePoll())

yield* latch.release
assert.deepStrictEqual(yield* fiber.await, Exit.void)
}))
})

0 comments on commit 273565e

Please sign in to comment.