Skip to content

Commit

Permalink
ensure defects don't prevent clean shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed May 28, 2024
1 parent 0ae5ee7 commit b5bd4b4
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 11 deletions.
34 changes: 23 additions & 11 deletions packages/effect/src/internal/pool.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import type { Cause } from "effect/Cause"
import type { Queue } from "effect/Queue"
import * as Context from "../Context.js"
import * as Duration from "../Duration.js"
import type { Effect } from "../Effect.js"
import type * as Exit from "../Exit.js"
import type { Exit } from "../Exit.js"
import { dual, identity } from "../Function.js"
import { pipeArguments } from "../Pipeable.js"
import type { Pool, PoolTypeId as PoolTypeId_ } from "../Pool.js"
Expand Down Expand Up @@ -109,7 +110,7 @@ export const invalidate: {
} = dual(2, <A, E>(self: Pool<A, E>, item: A): Effect<void> => self.invalidate(item))

interface PoolItem<A, E> {
readonly exit: Exit.Exit<A, E>
readonly exit: Exit<A, E>
finalizer: Effect<void>
refCount: number
}
Expand Down Expand Up @@ -144,7 +145,7 @@ class PoolImpl<A, E> implements Pool<A, E> {
core.flatMap(({ exit, scope }) => {
const item: PoolItem<A, E> = {
exit,
finalizer: scope.close(exit),
finalizer: core.catchAllCause(scope.close(exit), reportUnhandledError),
refCount: 0
}
this.items.add(item)
Expand Down Expand Up @@ -250,7 +251,7 @@ class PoolImpl<A, E> implements Pool<A, E> {
const semaphore = circular.unsafeMakeSemaphore(size)
return core.forEachSequentialDiscard(this.items, (item) => {
if (item.refCount > 0) {
item.finalizer = core.zipRight(item.finalizer, semaphore.release(1))
item.finalizer = core.zipLeft(item.finalizer, semaphore.release(1))
this.invalidated.add(item)
return semaphore.take(1)
}
Expand Down Expand Up @@ -308,18 +309,29 @@ const strategyCreationTTL = <A, E>(ttl: Duration.DurationInput) =>
const strategyAccessTTL = <A, E>(ttl: Duration.DurationInput) =>
core.map(internalQueue.unbounded<PoolItem<A, E>>(), (queue) => {
return identity<Strategy<A, E>>({
run: (pool) =>
core.suspend(() => {
run: (pool) => {
const process: Effect<void> = core.suspend(() => {
const excess = pool.items.size - pool.targetSize
if (excess <= 0) return core.void
return core.flatMap(
queue.takeUpTo(excess),
core.forEachSequentialDiscard((item) => pool.invalidatePoolItem(item))
return queue.take.pipe(
core.tap((item) => pool.invalidatePoolItem(item)),
core.zipRight(process)
)
}).pipe(
})
return process.pipe(
coreEffect.delay(ttl),
coreEffect.forever
),
)
},
onAcquire: (item) => queue.offer(item)
})
})

const reportUnhandledError = <E>(cause: Cause<E>) =>
core.withFiberRuntime<void>((fiber) => {
const unhandledLogLevel = fiber.getFiberRef(core.currentUnhandledErrorLogLevel)
if (unhandledLogLevel._tag === "Some") {
fiber.log("Unhandled error in pool finalizer", cause, unhandledLogLevel)
}
return core.void
})
15 changes: 15 additions & 0 deletions packages/effect/test/Pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,21 @@ describe("Pool", () => {
assert.strictEqual(result, 0)
}))

it.scoped("defects don't prevent cleanup", () =>
Effect.gen(function*() {
const count = yield* Ref.make(0)
const get = Effect.acquireRelease(
Ref.updateAndGet(count, (n) => n + 1),
() => Effect.zipRight(Ref.update(count, (n) => n - 1), Effect.die("boom"))
)
const scope = yield* Scope.make()
yield* Scope.extend(Pool.make({ acquire: get, size: 10 }), scope)
yield* Effect.repeat(Ref.get(count), { until: (n) => n === 10 })
yield* Scope.close(scope, Exit.succeed(void 0))
const result = yield* Ref.get(count)
assert.strictEqual(result, 0)
}))

it.scoped("acquire one item", () =>
Effect.gen(function*($) {
const count = yield* $(Ref.make(0))
Expand Down

0 comments on commit b5bd4b4

Please sign in to comment.