diff --git a/.changeset/lucky-eagles-speak.md b/.changeset/lucky-eagles-speak.md new file mode 100644 index 0000000000..c25560826a --- /dev/null +++ b/.changeset/lucky-eagles-speak.md @@ -0,0 +1,24 @@ +--- +"effect": minor +--- + +add Effect.makeLatch, for creating a simple async latch + +```ts +import { Effect } from "effect" + +Effect.gen(function* () { + // Create a latch, starting in the closed state + const latch = yield* Effect.makeLatch(false) + + // Fork a fiber that logs "open sesame" when the latch is opened + const fiber = yield* Effect.log("open sesame").pipe( + latch.whenOpen, + Effect.fork + ) + + // Open the latch + yield* latch.open + yield* fiber.await +}) +``` diff --git a/packages/effect/src/Effect.ts b/packages/effect/src/Effect.ts index f58f064728..090039deb2 100644 --- a/packages/effect/src/Effect.ts +++ b/packages/effect/src/Effect.ts @@ -5400,6 +5400,56 @@ export const unsafeMakeSemaphore: (permits: number) => Semaphore = circular.unsa */ export const makeSemaphore: (permits: number) => Effect = circular.makeSemaphore +// ------------------------------------------------------------------------------------- +// latch +// ------------------------------------------------------------------------------------- + +/** + * @category latch + * @since 3.8.0 + */ +export interface Latch { + /** open the latch, releasing all fibers waiting on it */ + readonly open: Effect + /** release all fibers waiting on the latch, without opening it */ + readonly release: Effect + /** wait for the latch to be opened */ + readonly await: Effect + /** close the latch */ + readonly close: Effect + /** only run the given effect when the latch is open */ + readonly whenOpen: (self: Effect) => Effect +} + +/** + * @category latch + * @since 3.8.0 + */ +export const unsafeMakeLatch: (open?: boolean | undefined) => Latch = circular.unsafeMakeLatch + +/** + * @category latch + * @since 3.8.0 + * @example + * import { Effect } from "effect" + * + * Effect.gen(function*() { + * // Create a latch, starting in the closed state + * const latch = yield* Effect.makeLatch(false) + * + * // Fork a fiber that logs "open sesame" when the latch is opened + * const fiber = yield* Effect.log("open sesame").pipe( + * latch.whenOpen, + * Effect.fork + * ) + * + * // Open the latch + * yield* latch.open + * yield* fiber.await + * }) + */ +export const makeLatch: (open?: boolean | undefined) => Effect = circular.makeLatch + // ------------------------------------------------------------------------------------- // execution // ------------------------------------------------------------------------------------- diff --git a/packages/effect/src/internal/effect/circular.ts b/packages/effect/src/internal/effect/circular.ts index a1c790964f..53a75fea5c 100644 --- a/packages/effect/src/internal/effect/circular.ts +++ b/packages/effect/src/internal/effect/circular.ts @@ -112,6 +112,67 @@ export const unsafeMakeSemaphore = (permits: number): Semaphore => new Semaphore /** @internal */ export const makeSemaphore = (permits: number) => core.sync(() => unsafeMakeSemaphore(permits)) +class Latch implements Effect.Latch { + waiters: Array<(_: Effect.Effect) => void> = [] + scheduled = false + constructor(private isOpen: boolean) {} + + private unsafeSchedule(fiber: Fiber.RuntimeFiber) { + if (this.scheduled || this.waiters.length === 0) { + return core.void + } + this.scheduled = true + fiber.currentScheduler.scheduleTask(this.flushWaiters, fiber.getFiberRef(core.currentSchedulingPriority)) + return core.void + } + private flushWaiters = () => { + this.scheduled = false + const waiters = this.waiters + this.waiters = [] + for (let i = 0; i < waiters.length; i++) { + waiters[i](core.exitVoid) + } + } + + open = core.withFiberRuntime((fiber) => { + if (this.isOpen) { + return core.void + } + this.isOpen = true + return this.unsafeSchedule(fiber) + }) + release = core.withFiberRuntime((fiber) => { + if (this.isOpen) { + return core.void + } + return this.unsafeSchedule(fiber) + }) + await = core.unsafeAsync((resume) => { + if (this.isOpen) { + return resume(core.void) + } + this.waiters.push(resume) + return core.sync(() => { + const index = this.waiters.indexOf(resume) + if (index !== -1) { + this.waiters.splice(index, 1) + } + }) + }) + close = core.sync(() => { + this.isOpen = false + }) + whenOpen = (self: Effect.Effect): Effect.Effect => { + return core.zipRight(this.await, self) + } +} + +/** @internal */ +export const unsafeMakeLatch = (open?: boolean | undefined): Effect.Latch => new Latch(open ?? false) + +/** @internal */ +export const makeLatch = (open?: boolean | undefined) => core.sync(() => unsafeMakeLatch(open)) + /** @internal */ export const awaitAllChildren = (self: Effect.Effect): Effect.Effect => ensuringChildren(self, fiberRuntime.fiberAwaitAll) diff --git a/packages/effect/test/Effect/latch.test.ts b/packages/effect/test/Effect/latch.test.ts new file mode 100644 index 0000000000..e85d56184e --- /dev/null +++ b/packages/effect/test/Effect/latch.test.ts @@ -0,0 +1,33 @@ +import { Effect, Exit } from "effect" +import { assert, describe, it } from "effect/test/utils/extend" + +describe("Latch", () => { + it.effect("open works", () => + Effect.gen(function*() { + const latch = yield* Effect.makeLatch() + let fiber = yield* latch.await.pipe( + Effect.fork + ) + yield* Effect.yieldNow() + assert.isNull(fiber.unsafePoll()) + yield* latch.open + assert.deepStrictEqual(yield* fiber.await, Exit.void) + + fiber = yield* latch.await.pipe( + Effect.fork + ) + yield* Effect.yieldNow() + assert.deepStrictEqual(fiber.unsafePoll(), Exit.void) + + yield* latch.close + fiber = yield* Effect.void.pipe( + latch.whenOpen, + Effect.fork + ) + yield* Effect.yieldNow() + assert.isNull(fiber.unsafePoll()) + + yield* latch.release + assert.deepStrictEqual(yield* fiber.await, Exit.void) + })) +})