diff --git a/packages/effect/src/internal/pool.ts b/packages/effect/src/internal/pool.ts
index dc98b78365..bfafe4a3d9 100644
--- a/packages/effect/src/internal/pool.ts
+++ b/packages/effect/src/internal/pool.ts
@@ -1,8 +1,9 @@
+import type { Cause } from "effect/Cause"
import type { Queue } from "effect/Queue"
import * as Context from "../Context.js"
import * as Duration from "../Duration.js"
import type { Effect } from "../Effect.js"
-import type * as Exit from "../Exit.js"
+import type { Exit } from "../Exit.js"
import { dual, identity } from "../Function.js"
import { pipeArguments } from "../Pipeable.js"
import type { Pool, PoolTypeId as PoolTypeId_ } from "../Pool.js"
@@ -109,7 +110,7 @@ export const invalidate: {
} = dual(2, (self: Pool, item: A): Effect => self.invalidate(item))
interface PoolItem {
- readonly exit: Exit.Exit
+ readonly exit: Exit
finalizer: Effect
refCount: number
}
@@ -144,7 +145,7 @@ class PoolImpl implements Pool {
core.flatMap(({ exit, scope }) => {
const item: PoolItem = {
exit,
- finalizer: scope.close(exit),
+ finalizer: core.catchAllCause(scope.close(exit), reportUnhandledError),
refCount: 0
}
this.items.add(item)
@@ -250,7 +251,7 @@ class PoolImpl implements Pool {
const semaphore = circular.unsafeMakeSemaphore(size)
return core.forEachSequentialDiscard(this.items, (item) => {
if (item.refCount > 0) {
- item.finalizer = core.zipRight(item.finalizer, semaphore.release(1))
+ item.finalizer = core.zipLeft(item.finalizer, semaphore.release(1))
this.invalidated.add(item)
return semaphore.take(1)
}
@@ -308,18 +309,29 @@ const strategyCreationTTL = (ttl: Duration.DurationInput) =>
const strategyAccessTTL = (ttl: Duration.DurationInput) =>
core.map(internalQueue.unbounded>(), (queue) => {
return identity>({
- run: (pool) =>
- core.suspend(() => {
+ run: (pool) => {
+ const process: Effect = core.suspend(() => {
const excess = pool.items.size - pool.targetSize
if (excess <= 0) return core.void
- return core.flatMap(
- queue.takeUpTo(excess),
- core.forEachSequentialDiscard((item) => pool.invalidatePoolItem(item))
+ return queue.take.pipe(
+ core.tap((item) => pool.invalidatePoolItem(item)),
+ core.zipRight(process)
)
- }).pipe(
+ })
+ return process.pipe(
coreEffect.delay(ttl),
coreEffect.forever
- ),
+ )
+ },
onAcquire: (item) => queue.offer(item)
})
})
+
+const reportUnhandledError = (cause: Cause) =>
+ core.withFiberRuntime((fiber) => {
+ const unhandledLogLevel = fiber.getFiberRef(core.currentUnhandledErrorLogLevel)
+ if (unhandledLogLevel._tag === "Some") {
+ fiber.log("Unhandled error in pool finalizer", cause, unhandledLogLevel)
+ }
+ return core.void
+ })
diff --git a/packages/effect/test/Pool.test.ts b/packages/effect/test/Pool.test.ts
index 63f344e80d..663bd7aec4 100644
--- a/packages/effect/test/Pool.test.ts
+++ b/packages/effect/test/Pool.test.ts
@@ -30,6 +30,21 @@ describe("Pool", () => {
assert.strictEqual(result, 0)
}))
+ it.scoped("defects don't prevent cleanup", () =>
+ Effect.gen(function*() {
+ const count = yield* Ref.make(0)
+ const get = Effect.acquireRelease(
+ Ref.updateAndGet(count, (n) => n + 1),
+ () => Effect.zipRight(Ref.update(count, (n) => n - 1), Effect.die("boom"))
+ )
+ const scope = yield* Scope.make()
+ yield* Scope.extend(Pool.make({ acquire: get, size: 10 }), scope)
+ yield* Effect.repeat(Ref.get(count), { until: (n) => n === 10 })
+ yield* Scope.close(scope, Exit.succeed(void 0))
+ const result = yield* Ref.get(count)
+ assert.strictEqual(result, 0)
+ }))
+
it.scoped("acquire one item", () =>
Effect.gen(function*($) {
const count = yield* $(Ref.make(0))