From b5bd4b49763bc41cb71a1ef578d5b1cc781bdaf2 Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 29 May 2024 10:59:34 +1200 Subject: [PATCH] ensure defects don't prevent clean shutdown --- packages/effect/src/internal/pool.ts | 34 +++++++++++++++++++--------- packages/effect/test/Pool.test.ts | 15 ++++++++++++ 2 files changed, 38 insertions(+), 11 deletions(-) diff --git a/packages/effect/src/internal/pool.ts b/packages/effect/src/internal/pool.ts index dc98b78365..bfafe4a3d9 100644 --- a/packages/effect/src/internal/pool.ts +++ b/packages/effect/src/internal/pool.ts @@ -1,8 +1,9 @@ +import type { Cause } from "effect/Cause" import type { Queue } from "effect/Queue" import * as Context from "../Context.js" import * as Duration from "../Duration.js" import type { Effect } from "../Effect.js" -import type * as Exit from "../Exit.js" +import type { Exit } from "../Exit.js" import { dual, identity } from "../Function.js" import { pipeArguments } from "../Pipeable.js" import type { Pool, PoolTypeId as PoolTypeId_ } from "../Pool.js" @@ -109,7 +110,7 @@ export const invalidate: { } = dual(2, (self: Pool, item: A): Effect => self.invalidate(item)) interface PoolItem { - readonly exit: Exit.Exit + readonly exit: Exit finalizer: Effect refCount: number } @@ -144,7 +145,7 @@ class PoolImpl implements Pool { core.flatMap(({ exit, scope }) => { const item: PoolItem = { exit, - finalizer: scope.close(exit), + finalizer: core.catchAllCause(scope.close(exit), reportUnhandledError), refCount: 0 } this.items.add(item) @@ -250,7 +251,7 @@ class PoolImpl implements Pool { const semaphore = circular.unsafeMakeSemaphore(size) return core.forEachSequentialDiscard(this.items, (item) => { if (item.refCount > 0) { - item.finalizer = core.zipRight(item.finalizer, semaphore.release(1)) + item.finalizer = core.zipLeft(item.finalizer, semaphore.release(1)) this.invalidated.add(item) return semaphore.take(1) } @@ -308,18 +309,29 @@ const strategyCreationTTL = (ttl: Duration.DurationInput) => const strategyAccessTTL = (ttl: Duration.DurationInput) => core.map(internalQueue.unbounded>(), (queue) => { return identity>({ - run: (pool) => - core.suspend(() => { + run: (pool) => { + const process: Effect = core.suspend(() => { const excess = pool.items.size - pool.targetSize if (excess <= 0) return core.void - return core.flatMap( - queue.takeUpTo(excess), - core.forEachSequentialDiscard((item) => pool.invalidatePoolItem(item)) + return queue.take.pipe( + core.tap((item) => pool.invalidatePoolItem(item)), + core.zipRight(process) ) - }).pipe( + }) + return process.pipe( coreEffect.delay(ttl), coreEffect.forever - ), + ) + }, onAcquire: (item) => queue.offer(item) }) }) + +const reportUnhandledError = (cause: Cause) => + core.withFiberRuntime((fiber) => { + const unhandledLogLevel = fiber.getFiberRef(core.currentUnhandledErrorLogLevel) + if (unhandledLogLevel._tag === "Some") { + fiber.log("Unhandled error in pool finalizer", cause, unhandledLogLevel) + } + return core.void + }) diff --git a/packages/effect/test/Pool.test.ts b/packages/effect/test/Pool.test.ts index 63f344e80d..663bd7aec4 100644 --- a/packages/effect/test/Pool.test.ts +++ b/packages/effect/test/Pool.test.ts @@ -30,6 +30,21 @@ describe("Pool", () => { assert.strictEqual(result, 0) })) + it.scoped("defects don't prevent cleanup", () => + Effect.gen(function*() { + const count = yield* Ref.make(0) + const get = Effect.acquireRelease( + Ref.updateAndGet(count, (n) => n + 1), + () => Effect.zipRight(Ref.update(count, (n) => n - 1), Effect.die("boom")) + ) + const scope = yield* Scope.make() + yield* Scope.extend(Pool.make({ acquire: get, size: 10 }), scope) + yield* Effect.repeat(Ref.get(count), { until: (n) => n === 10 }) + yield* Scope.close(scope, Exit.succeed(void 0)) + const result = yield* Ref.get(count) + assert.strictEqual(result, 0) + })) + it.scoped("acquire one item", () => Effect.gen(function*($) { const count = yield* $(Ref.make(0))