From 6f61854b4705fbed8de7e49a777655a7b772b0dc Mon Sep 17 00:00:00 2001 From: Tim Date: Mon, 27 May 2024 17:16:20 +1200 Subject: [PATCH] support concurrent access for Pool items --- .changeset/quiet-maps-judge.md | 5 + packages/effect/src/Pool.ts | 17 +- packages/effect/src/internal/pool.ts | 743 +++++++++++---------------- packages/effect/test/Pool.test.ts | 92 ++-- 4 files changed, 369 insertions(+), 488 deletions(-) create mode 100644 .changeset/quiet-maps-judge.md diff --git a/.changeset/quiet-maps-judge.md b/.changeset/quiet-maps-judge.md new file mode 100644 index 0000000000..3c4622af7f --- /dev/null +++ b/.changeset/quiet-maps-judge.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +support concurrent access for Pool items diff --git a/packages/effect/src/Pool.ts b/packages/effect/src/Pool.ts index 6eb6042771..d2e4f58b8b 100644 --- a/packages/effect/src/Pool.ts +++ b/packages/effect/src/Pool.ts @@ -81,6 +81,7 @@ export const make: ( options: { readonly acquire: Effect.Effect readonly size: number + readonly permits?: number | undefined } ) => Effect.Effect, never, Scope.Scope | R> = internal.make @@ -115,12 +116,16 @@ export const make: ( * @since 2.0.0 * @category constructors */ -export const makeWithTTL: (options: { - readonly acquire: Effect.Effect - readonly min: number - readonly max: number - readonly timeToLive: Duration.DurationInput -}) => Effect.Effect, never, Scope.Scope | R> = internal.makeWithTTL +export const makeWithTTL: ( + options: { + readonly acquire: Effect.Effect + readonly min: number + readonly max: number + readonly permits?: number | undefined + readonly timeToLive: Duration.DurationInput + readonly strategy?: "creation" | "access" | undefined + } +) => Effect.Effect, never, Scope.Scope | R> = internal.makeWithTTL /** * Retrieves an item from the pool in a scoped effect. Note that if diff --git a/packages/effect/src/internal/pool.ts b/packages/effect/src/internal/pool.ts index ff9d00fab4..8c402c4286 100644 --- a/packages/effect/src/internal/pool.ts +++ b/packages/effect/src/internal/pool.ts @@ -1,31 +1,20 @@ -import type * as Clock from "../Clock.js" +/** + * @since 1.0.0 + */ import * as Context from "../Context.js" import * as Duration from "../Duration.js" -import type * as Effect from "../Effect.js" -import * as Equal from "../Equal.js" +import * as Effect from "../Effect.js" import type * as Exit from "../Exit.js" -import { dual, pipe } from "../Function.js" -import * as Hash from "../Hash.js" -import * as HashSet from "../HashSet.js" +import * as Fiber from "../Fiber.js" +import { dual, identity } from "../Function.js" import { pipeArguments } from "../Pipeable.js" -import type * as Pool from "../Pool.js" +import type { Pool, PoolTypeId as PoolTypeId_ } from "../Pool.js" import { hasProperty } from "../Predicate.js" -import type * as Queue from "../Queue.js" -import type * as Ref from "../Ref.js" -import type * as Scope from "../Scope.js" -import * as effect from "./core-effect.js" -import * as core from "./core.js" -import * as fiberRuntime from "./fiberRuntime.js" -import * as queue from "./queue.js" -import * as ref from "./ref.js" - -/** @internal */ -const PoolSymbolKey = "effect/Pool" +import * as Queue from "../Queue.js" +import * as Scope from "../Scope.js" /** @internal */ -export const PoolTypeId: Pool.PoolTypeId = Symbol.for( - PoolSymbolKey -) as Pool.PoolTypeId +export const PoolTypeId: PoolTypeId_ = Symbol.for("effect/Pool") as PoolTypeId_ const poolVariance = { /* c8 ignore next */ @@ -34,467 +23,327 @@ const poolVariance = { _A: (_: any) => _ } -interface PoolState { - readonly size: number - readonly free: number -} +/** @internal */ +export const isPool = (u: unknown): u is Pool => hasProperty(u, PoolTypeId) -interface Attempted { - readonly result: Exit.Exit - readonly finalizer: Effect.Effect -} +/** @internal */ +export const makeWith = (options: { + readonly acquire: Effect.Effect + readonly min: number + readonly max: number + readonly strategy: Strategy + readonly permits?: number | undefined +}): Effect.Effect, never, Scope.Scope | R> => + Effect.uninterruptibleMask((restore) => + Effect.flatMap(Effect.context(), (context) => { + const scope = Context.get(context, Scope.Scope) + const acquire = Effect.mapInputContext( + options.acquire, + (input) => Context.merge(context, input) + ) as Effect.Effect< + A, + E, + Scope.Scope + > + return Queue.unbounded>().pipe( + Effect.map((queue) => + new PoolImpl( + acquire, + options.permits ?? 1, + options.min, + options.max, + queue, + options.strategy + ) + ), + Effect.tap((pool) => + restore(pool.reconcile).pipe( + Effect.forkDaemon, + Effect.tap(Scope.addFinalizer(scope, pool.shutdown)), + Effect.tap((fiber) => Scope.addFinalizer(scope, Fiber.interrupt(fiber))) + ) + ), + Effect.tap((pool) => + restore(options.strategy.run(pool)).pipe( + Effect.forkDaemon, + Effect.tap((fiber) => Scope.addFinalizer(scope, Fiber.interrupt(fiber))) + ) + ) + ) + }) + ) /** - * A `Strategy` describes the protocol for how a pool whose excess items are - * not being used should shrink down to the minimum pool size. + * @since 1.0.0 + * @category constructors */ -interface Strategy { - /** - * Describes how the initial state of the strategy should be allocated. - */ - initial(): Effect.Effect - /** - * Describes how the state of the strategy should be updated when an item is - * added to the pool or returned to the pool. - */ - track(state: S, attempted: Exit.Exit): Effect.Effect - /** - * Describes how excess items that are not being used should shrink down. - */ - run( - state: S, - getExcess: Effect.Effect, - shrink: Effect.Effect - ): Effect.Effect -} +export const make = (options: { + readonly acquire: Effect.Effect + readonly size: number + readonly permits?: number | undefined +}): Effect.Effect, never, R | Scope.Scope> => + makeWith({ ...options, min: options.size, max: options.size, strategy: strategyNoop() }) /** - * A strategy that does nothing to shrink excess items. This is useful when - * the minimum size of the pool is equal to its maximum size and so there is - * nothing to do. + * @since 1.0.0 + * @category constructors */ -class NoneStrategy implements Strategy { - initial(): Effect.Effect { - return core.void - } - track(): Effect.Effect { - return core.void - } - run(): Effect.Effect { - return core.void - } -} +export const makeWithTTL = (options: { + readonly acquire: Effect.Effect + readonly min: number + readonly max: number + readonly permits?: number | undefined + readonly timeToLive: Duration.DurationInput + readonly mode?: "creation" | "access" | undefined +}): Effect.Effect, never, R | Scope.Scope> => + Effect.flatMap( + options.mode === "creation" ? + strategyCreationTTL(options.timeToLive) : + strategyAccessTTL(options.timeToLive), + (strategy) => makeWith({ ...options, strategy }) + ) /** - * A strategy that shrinks the pool down to its minimum size if items in the - * pool have not been used for the specified duration. + * @since 1.0.0 + * @category combinators */ -class TimeToLiveStrategy implements Strategy], never, never, never> { - constructor(readonly timeToLive: Duration.Duration) {} - initial(): Effect.Effect]> { - return core.flatMap(effect.clock, (clock) => - core.flatMap(clock.currentTimeMillis, (now) => - core.map( - ref.make(now), - (ref) => [clock, ref] as const - ))) - } - track(state: readonly [Clock.Clock, Ref.Ref]): Effect.Effect { - return core.asVoid(core.flatMap( - state[0].currentTimeMillis, - (now) => ref.set(state[1], now) - )) - } - run( - state: readonly [Clock.Clock, Ref.Ref], - getExcess: Effect.Effect, - shrink: Effect.Effect - ): Effect.Effect { - return core.flatMap(getExcess, (excess) => - excess <= 0 - ? core.zipRight( - state[0].sleep(this.timeToLive), - this.run(state, getExcess, shrink) - ) - : pipe( - core.zipWith( - ref.get(state[1]), - state[0].currentTimeMillis, - (start, end) => end - start - ), - core.flatMap((duration) => { - if (duration >= Duration.toMillis(this.timeToLive)) { - return core.zipRight(shrink, this.run(state, getExcess, shrink)) - } else { - return core.zipRight(state[0].sleep(this.timeToLive), this.run(state, getExcess, shrink)) - } - }) - )) - } +export const get = (self: Pool): Effect.Effect => self.get + +/** + * @since 1.0.0 + * @category combinators + */ +export const invalidate: { + (item: A): (self: Pool) => Effect.Effect + (self: Pool, item: A): Effect.Effect +} = dual(2, (self: Pool, item: A): Effect.Effect => self.invalidate(item)) + +// ---------------------------------------------------------------------------- +// Implementation +// ---------------------------------------------------------------------------- + +interface PoolItem { + readonly exit: Exit.Exit + finalizer: Effect.Effect + refCount: number } -class PoolImpl implements Pool.Pool { - readonly [PoolTypeId] = poolVariance - constructor( - readonly creator: Effect.Effect, - readonly min: number, - readonly max: number, - readonly isShuttingDown: Ref.Ref, - readonly state: Ref.Ref, - readonly items: Queue.Queue>, - readonly invalidated: Ref.Ref>, - readonly track: (exit: Exit.Exit) => Effect.Effect - ) {} +interface Strategy { + readonly run: (pool: PoolImpl) => Effect.Effect + readonly onAcquire: (item: PoolItem) => Effect.Effect +} - [Hash.symbol](): number { - return pipe( - Hash.hash(this.creator), - Hash.combine(Hash.number(this.min)), - Hash.combine(Hash.number(this.max)), - Hash.combine(Hash.hash(this.isShuttingDown)), - Hash.combine(Hash.hash(this.state)), - Hash.combine(Hash.hash(this.items)), - Hash.combine(Hash.hash(this.invalidated)), - Hash.combine(Hash.hash(this.track)), - Hash.cached(this) - ) - } +class PoolImpl implements Pool { + readonly [PoolTypeId]: Pool.Variance[PoolTypeId_] - [Equal.symbol](that: unknown): boolean { - return isPool(that) && - Equal.equals(this.creator, (that as PoolImpl).creator) && - this.min === (that as PoolImpl).min && - this.max === (that as PoolImpl).max && - Equal.equals(this.isShuttingDown, (that as PoolImpl).isShuttingDown) && - Equal.equals(this.state, (that as PoolImpl).state) && - Equal.equals(this.items, (that as PoolImpl).items) && - Equal.equals(this.invalidated, (that as PoolImpl).invalidated) && - Equal.equals(this.track, (that as PoolImpl).track) - } + private isShuttingDown = false + readonly items = new Set>() + readonly invalidated = new Set>() + private waiters = 0 - pipe() { - return pipeArguments(this, arguments) + constructor( + readonly acquire: Effect.Effect, + readonly permits: number, + readonly minSize: number, + readonly maxSize: number, + readonly queue: Queue.Queue>, + readonly strategy: Strategy + ) { + this[PoolTypeId] = poolVariance } - get get(): Effect.Effect { - const acquire = ( - restore: (effect: Effect.Effect) => Effect.Effect - ): Effect.Effect> => - core.flatMap(ref.get(this.isShuttingDown), (down) => - down - ? core.interrupt - : core.flatten(ref.modify(this.state, (state) => { - if (state.free > 0 || state.size >= this.max) { - return [ - core.flatMap( - queue.take(this.items), - (attempted) => - core.exitMatch(attempted.result, { - onFailure: () => core.succeed(attempted), - onSuccess: (item) => - core.flatMap( - ref.get(this.invalidated), - (set) => { - if (pipe(set, HashSet.has(item))) { - return core.zipRight(finalizeInvalid(this, attempted), acquire(restore)) - } - return core.succeed(attempted) - } - ) - }) - ), - { ...state, free: state.free - 1 } - ] as const - } - if (state.size >= 0) { - return [ - core.zipRight(allocate(this, restore), acquire(restore)), - { size: state.size + 1, free: state.free + 1 } - ] as const - } - return [core.interrupt, state] as const - }))) - - const release = (attempted: Attempted): Effect.Effect => - core.exitMatch(attempted.result, { - onFailure: () => - core.zipRight( - attempted.finalizer, - core.flatten(ref.modify(this.state, (state) => { - if (state.size <= this.min) { - return [allocateUinterruptible(this), { ...state, free: state.free + 1 }] as const - } - return [core.void, { ...state, size: state.size - 1 }] as const - })) - ), - onSuccess: (item) => - core.flatMap(ref.get(this.invalidated), (set) => { - if (pipe(set, HashSet.has(item))) { - return finalizeInvalid(this, attempted) - } - return pipe( - ref.update(this.state, (state) => ({ ...state, free: state.free + 1 })), - core.zipRight(queue.offer(this.items, attempted)), - core.zipRight(this.track(attempted.result)), - core.zipRight(core.whenEffect(getAndShutdown(this), ref.get(this.isShuttingDown))) - ) - }) - }) + allocate: Effect.Effect = Scope.make().pipe( + Effect.bindTo("scope"), + Effect.bind("exit", ({ scope }) => Effect.exit(Scope.extend(this.acquire, scope))), + Effect.flatMap(({ exit, scope }) => { + const item: PoolItem = { + exit, + finalizer: Scope.close(scope, exit), + refCount: 0 + } + this.items.add(item) + const offer = Effect.zipRight(this.strategy.onAcquire(item), this.queue.offer(item)) + return exit._tag === "Success" ? offer : Effect.zipRight(item.finalizer, offer) + }) + ) - return pipe( - core.uninterruptibleMask((restore) => - core.tap(acquire(restore), (a) => fiberRuntime.addFinalizer((_exit) => release(a))) - ), - fiberRuntime.withEarlyRelease, - fiberRuntime.disconnect, - core.flatMap(([release, attempted]) => - pipe( - effect.when(release, () => isFailure(attempted)), - core.zipRight(toEffect(attempted)) - ) - ) - ) + get currentUsage() { + let count = this.waiters + for (const item of this.items) { + count += item.refCount + } + return count } - invalidate(item: A): Effect.Effect { - return ref.update(this.invalidated, HashSet.add(item)) + get targetSize() { + if (this.isShuttingDown) return 0 + const target = Math.ceil(this.currentUsage / this.permits) + return Math.min(Math.max(this.minSize, target), this.maxSize) } -} - -const allocate = ( - self: PoolImpl, - restore: (effect: Effect.Effect) => Effect.Effect -): Effect.Effect => - core.flatMap(fiberRuntime.scopeMake(), (scope) => - core.flatMap( - core.exit(restore(fiberRuntime.scopeExtend(self.creator, scope))), - (exit) => - core.flatMap( - core.succeed>({ - result: exit as Exit.Exit, - finalizer: core.scopeClose(scope, core.exitSucceed(void 0)) - }), - (attempted) => - pipe( - queue.offer(self.items, attempted), - core.zipRight(self.track(attempted.result)), - core.zipRight(core.whenEffect(getAndShutdown(self), ref.get(self.isShuttingDown))), - core.as(attempted) - ) - ) - )) - -const allocateUinterruptible = ( - self: PoolImpl -): Effect.Effect => core.uninterruptibleMask((restore) => allocate(self, restore)) - -/** - * Returns the number of items in the pool in excess of the minimum size. - */ -const excess = (self: PoolImpl): Effect.Effect => - core.map(ref.get(self.state), (state) => state.size - Math.min(self.min, state.free)) -const finalizeInvalid = ( - self: PoolImpl, - attempted: Attempted -): Effect.Effect => - pipe( - forEach(attempted, (a) => ref.update(self.invalidated, HashSet.remove(a))), - core.zipRight(attempted.finalizer), - core.zipRight( - core.flatten(ref.modify(self.state, (state) => { - if (state.size <= self.min) { - return [allocateUinterruptible(self), { ...state, free: state.free + 1 }] as const + private reconcileSemaphore = Effect.unsafeMakeSemaphore(1) + reconcileLoop: Effect.Effect = Effect.suspend(() => { + if (this.items.size >= this.targetSize) { + return Effect.void + } + return Effect.zipRight(this.allocate, this.reconcileLoop) + }) + reconcile = this.reconcileSemaphore.withPermits(1)(this.reconcileLoop) + + getPoolItem: Effect.Effect, never, Scope.Scope> = Effect.uninterruptibleMask((restore) => + Effect.suspend(() => { + this.waiters++ + return restore(this.reconcile).pipe( + Effect.zipRight(restore(this.queue.take)), + Effect.ensuring(Effect.sync(() => this.waiters--)), + Effect.flatMap((poolItem) => { + if (!this.items.has(poolItem) || this.invalidated.has(poolItem)) { + return this.getPoolItem + } + return Effect.succeed(poolItem) + }) + ) + }).pipe( + Effect.tap((poolItem) => { + if (poolItem.exit._tag === "Failure") { + this.items.delete(poolItem) + return Effect.void } - return [core.void, { ...state, size: state.size - 1 }] as const - })) + poolItem.refCount++ + return Effect.flatMap(Effect.scope, (scope) => { + const addFinalizer = Scope.addFinalizer( + scope, + Effect.suspend(() => { + poolItem.refCount-- + if (this.invalidated.has(poolItem)) { + return this.invalidatePoolItem(poolItem) + } + return poolItem.refCount === (this.permits - 1) ? + this.queue.offer(poolItem) : + Effect.void + }) + ) + return poolItem.refCount < this.permits + ? Effect.zipRight(addFinalizer, this.queue.offer(poolItem)) + : addFinalizer + }) + }) ) ) -/** - * Gets items from the pool and shuts them down as long as there are items - * free, signalling shutdown of the pool if the pool is empty. - */ -const getAndShutdown = (self: PoolImpl): Effect.Effect => - core.flatten(ref.modify(self.state, (state) => { - if (state.free > 0) { - return [ - core.matchCauseEffect(queue.take(self.items), { - onFailure: () => core.void, - onSuccess: (attempted) => - pipe( - forEach(attempted, (a) => ref.update(self.invalidated, HashSet.remove(a))), - core.zipRight(attempted.finalizer), - core.zipRight(ref.update(self.state, (state) => ({ ...state, size: state.size - 1 }))), - core.flatMap(() => getAndShutdown(self)) - ) - }), - { ...state, free: state.free - 1 } - ] as const - } - if (state.size > 0) { - return [core.void, state] as const - } - return [queue.shutdown(self.items), { ...state, size: state.size - 1 }] as const - })) - -/** - * Begins pre-allocating pool entries based on minimum pool size. - */ -const initialize = (self: PoolImpl): Effect.Effect => - fiberRuntime.replicateEffect( - core.uninterruptibleMask((restore) => - core.flatten(ref.modify(self.state, (state) => { - if (state.size < self.min && state.size >= 0) { - return [ - allocate(self, restore), - { size: state.size + 1, free: state.free + 1 } - ] as const - } - return [core.void, state] as const - })) - ), - self.min, - { discard: true } + get: Effect.Effect = Effect.flatMap( + Effect.suspend(() => this.isShuttingDown ? Effect.interrupt : this.getPoolItem), + (_) => _.exit ) -/** - * Shrinks the pool down, but never to less than the minimum size. - */ -const shrink = (self: PoolImpl): Effect.Effect => - core.uninterruptible( - core.flatten(ref.modify(self.state, (state) => { - if (state.size > self.min && state.free > 0) { - return [ - pipe( - queue.take(self.items), - core.flatMap((attempted) => - pipe( - forEach(attempted, (a) => ref.update(self.invalidated, HashSet.remove(a))), - core.zipRight(attempted.finalizer), - core.zipRight(ref.update(self.state, (state) => ({ ...state, size: state.size - 1 }))) - ) - ) - ), - { ...state, free: state.free - 1 } - ] as const + invalidate(item: A): Effect.Effect { + return Effect.uninterruptible(Effect.suspend(() => { + if (this.isShuttingDown) return Effect.void + for (const poolItem of this.items) { + if (poolItem.exit._tag === "Success" && poolItem.exit.value === item) { + return this.invalidatePoolItem(poolItem) + } } - return [core.void, state] as const + return Effect.void })) - ) - -const shutdown = (self: PoolImpl): Effect.Effect => - core.flatten(ref.modify(self.isShuttingDown, (down) => - down - ? [queue.awaitShutdown(self.items), true] as const - : [core.zipRight(getAndShutdown(self), queue.awaitShutdown(self.items)), true])) - -const isFailure = (self: Attempted): boolean => core.exitIsFailure(self.result) - -const forEach = ( - self: Attempted, - f: (a: A) => Effect.Effect -): Effect.Effect => - core.exitMatch(self.result, { - onFailure: () => core.void, - onSuccess: f - }) - -const toEffect = (self: Attempted): Effect.Effect => self.result - -/** - * A more powerful variant of `make` that allows specifying a `Strategy` that - * describes how a pool whose excess items are not being used will be shrunk - * down to the minimum size. - */ -const makeWith = ( - options: { - readonly acquire: Effect.Effect - readonly min: number - readonly max: number - readonly strategy: Strategy } -): Effect.Effect, never, R | R2 | Scope.Scope> => - core.uninterruptibleMask((restore) => - pipe( - fiberRuntime.all([ - core.context(), - ref.make(false), - ref.make({ size: 0, free: 0 }), - queue.bounded>(options.max), - ref.make(HashSet.empty()), - options.strategy.initial() - ]), - core.flatMap(([context, down, state, items, inv, initial]) => { - const pool = new PoolImpl( - core.mapInputContext(options.acquire, (old) => Context.merge(old)(context)), - options.min, - options.max, - down, - state, - items, - inv, - (exit) => options.strategy.track(initial, exit) - ) - return pipe( - fiberRuntime.forkDaemon(restore(initialize(pool))), - core.flatMap((fiber) => - core.flatMap( - fiberRuntime.forkDaemon(restore(options.strategy.run(initial, excess(pool), shrink(pool)))), - (shrink) => - fiberRuntime.addFinalizer(() => - pipe( - shutdown(pool), - core.zipRight(core.interruptFiber(fiber)), - core.zipRight(core.interruptFiber(shrink)) - ) - ) - ) - ), - core.as>(pool) - ) - }) - ) - ) -/** @internal */ -export const isPool = (u: unknown): u is Pool.Pool => hasProperty(u, PoolTypeId) + invalidatePoolItem(poolItem: PoolItem): Effect.Effect { + return Effect.suspend(() => { + if (poolItem.refCount === 0) { + this.items.delete(poolItem) + this.invalidated.delete(poolItem) + return poolItem.finalizer + } + this.invalidated.add(poolItem) + return Effect.void + }) + } -/** @internal */ -export const make = ( - options: { - readonly acquire: Effect.Effect - readonly size: number + get shutdown(): Effect.Effect { + return Effect.uninterruptible(Effect.suspend(() => { + this.isShuttingDown = true + const size = this.items.size + const semaphore = Effect.unsafeMakeSemaphore(size) + return Effect.zipRight( + Effect.forEach(this.items, (item) => { + if (item.refCount > 0) { + item.finalizer = Effect.zipRight(item.finalizer, semaphore.release(1)) + this.invalidated.add(item) + return Effect.zipRight(semaphore.take(1), item.finalizer) + } + return this.invalidatePoolItem(item) + }), + semaphore.take(size) + ) + })) } -): Effect.Effect, never, R | Scope.Scope> => - makeWith({ - acquire: options.acquire, - min: options.size, - max: options.size, - strategy: new NoneStrategy() - }) -/** @internal */ -export const makeWithTTL = ( - options: { - readonly acquire: Effect.Effect - readonly min: number - readonly max: number - readonly timeToLive: Duration.DurationInput + pipe() { + return pipeArguments(this, arguments) } -): Effect.Effect, never, R | Scope.Scope> => - makeWith({ - acquire: options.acquire, - min: options.min, - max: options.max, - strategy: new TimeToLiveStrategy(Duration.decode(options.timeToLive)) - }) +} -/** @internal */ -export const get = (self: Pool.Pool): Effect.Effect => self.get +const strategyNoop = (): Strategy => ({ + run: (_) => Effect.void, + onAcquire: (_) => Effect.void +}) + +const strategyCreationTTL = (ttl: Duration.DurationInput) => + Effect.gen(function*() { + const ttlMillis = Duration.toMillis(ttl) + const clock = yield* Effect.clock + const creationTimes = new WeakMap, number>() + const queue = yield* Queue.unbounded>() + + return identity>({ + run: (pool) => { + const process = (item: PoolItem): Effect.Effect => + Effect.suspend(() => { + if (pool.invalidated.has(item)) return Effect.void + const now = clock.unsafeCurrentTimeMillis() + const created = creationTimes.get(item)! + const remaining = ttlMillis - (now - created) + return remaining > 0 + ? Effect.delay(process(item), remaining) + : pool.invalidatePoolItem(item) + }) + return queue.take.pipe( + Effect.tap(process), + Effect.forever + ) + }, + onAcquire: (item) => + Effect.suspend(() => { + creationTimes.set(item, clock.unsafeCurrentTimeMillis()) + return queue.offer(item) + }) + }) + }) -/** @internal */ -export const invalidate = dual< - (value: A) => (self: Pool.Pool) => Effect.Effect, - (self: Pool.Pool, value: A) => Effect.Effect ->(2, (self, value) => self.invalidate(value)) +const strategyAccessTTL = (ttl: Duration.DurationInput) => + Effect.gen(function*() { + const clock = yield* Effect.clock + const queue = yield* Queue.unbounded>() + const accessTimes = new WeakMap, number>() + + return identity>({ + run: (pool) => { + return Effect.suspend(() => { + const excess = pool.items.size - pool.targetSize + if (excess <= 0) return Effect.void + return queue.takeUpTo(excess).pipe( + Effect.flatMap(Effect.forEach((item) => pool.invalidatePoolItem(item), { discard: true })) + ) + }).pipe( + Effect.delay(ttl), + Effect.forever + ) + }, + onAcquire: (item) => + Effect.suspend(() => { + accessTimes.set(item, clock.unsafeCurrentTimeMillis()) + return queue.offer(item) + }) + }) + }) diff --git a/packages/effect/test/Pool.test.ts b/packages/effect/test/Pool.test.ts index b557d8d50c..63f344e80d 100644 --- a/packages/effect/test/Pool.test.ts +++ b/packages/effect/test/Pool.test.ts @@ -1,44 +1,33 @@ -import * as Deferred from "effect/Deferred" -import * as Duration from "effect/Duration" -import * as Effect from "effect/Effect" -import * as Exit from "effect/Exit" -import * as Fiber from "effect/Fiber" -import * as Option from "effect/Option" -import * as Pool from "effect/Pool" -import * as Ref from "effect/Ref" -import * as Scope from "effect/Scope" -import * as it from "effect/test/utils/extend" -import * as TestClock from "effect/TestClock" -import * as TestServices from "effect/TestServices" -import { describe, expect } from "vitest" +import { Deferred, Duration, Effect, Exit, Fiber, Option, Pool, Ref, Scope, TestClock, TestServices } from "effect" +import { assert, describe, expect, it } from "effect/test/utils/extend" describe("Pool", () => { it.scoped("preallocates pool items", () => - Effect.gen(function*($) { - const count = yield* $(Ref.make(0)) + Effect.gen(function*() { + const count = yield* Ref.make(0) const get = Effect.acquireRelease( Ref.updateAndGet(count, (n) => n + 1), () => Ref.update(count, (n) => n - 1) ) - yield* $(Pool.make({ acquire: get, size: 10 })) - yield* $(Effect.repeat(Ref.get(count), { until: (n) => n === 10 })) - const result = yield* $(Ref.get(count)) - expect(result).toBe(10) + yield* Pool.make({ acquire: get, size: 10 }) + yield* Effect.repeat(Ref.get(count), { until: (n) => n === 10 }) + const result = yield* Ref.get(count) + assert.strictEqual(result, 10) })) it.scoped("cleans up items when shut down", () => - Effect.gen(function*($) { - const count = yield* $(Ref.make(0)) + Effect.gen(function*() { + const count = yield* Ref.make(0) const get = Effect.acquireRelease( Ref.updateAndGet(count, (n) => n + 1), () => Ref.update(count, (n) => n - 1) ) - const scope = yield* $(Scope.make()) - yield* $(Scope.extend(Pool.make({ acquire: get, size: 10 }), scope)) - yield* $(Effect.repeat(Ref.get(count), { until: (n) => n === 10 })) - yield* $(Scope.close(scope, Exit.succeed(void 0))) - const result = yield* $(Ref.get(count)) - expect(result).toBe(0) + const scope = yield* Scope.make() + yield* Scope.extend(Pool.make({ acquire: get, size: 10 }), scope) + yield* Effect.repeat(Ref.get(count), { until: (n) => n === 10 }) + yield* Scope.close(scope, Exit.succeed(void 0)) + const result = yield* Ref.get(count) + assert.strictEqual(result, 0) })) it.scoped("acquire one item", () => @@ -51,7 +40,7 @@ describe("Pool", () => { const pool = yield* $(Pool.make({ acquire: get, size: 10 })) yield* $(Effect.repeat(Ref.get(count), { until: (n) => n === 10 })) const item = yield* $(Pool.get(pool)) - expect(item).toBe(1) + assert.strictEqual(item, 1) })) it.scoped("reports failures via get", () => @@ -202,6 +191,38 @@ describe("Pool", () => { expect(max).toBe(15) })) + it.scoped("max pool size with permits: 3", () => + Effect.gen(function*($) { + const deferred = yield* $(Deferred.make()) + const count = yield* $(Ref.make(0)) + const acquire = Effect.acquireRelease( + Ref.updateAndGet(count, (n) => n + 1), + () => Ref.update(count, (n) => n - 1) + ) + const pool = yield* $(Pool.makeWithTTL({ + acquire, + min: 10, + max: 15, + permits: 3, + timeToLive: Duration.seconds(60) + })) + yield* $( + Effect.scoped(Effect.zipRight( + Pool.get(pool), + Deferred.await(deferred) + )), + Effect.fork, + Effect.repeatN(14 * 3) + ) + yield* $(Effect.repeat(Ref.get(count), { until: (n) => n === 15 })) + yield* $(Deferred.succeed(deferred, void 0)) + const max = yield* $(Ref.get(count)) + yield* $(TestClock.adjust(Duration.seconds(60))) + const min = yield* $(Ref.get(count)) + expect(min).toBe(10) + expect(max).toBe(15) + })) + it.scoped("shutdown robustness", () => Effect.gen(function*($) { const count = yield* $(Ref.make(0)) @@ -236,7 +257,7 @@ describe("Pool", () => { expect(result).toEqual(Exit.interrupt(fiberId)) })) - it.scoped("get is interruptible with dynamic size", () => + it.effect("get is interruptible with dynamic size", () => Effect.gen(function*($) { const get = Effect.never.pipe(Effect.forkScoped) const fiberId = yield* $(Effect.fiberId) @@ -245,15 +266,16 @@ describe("Pool", () => { const fiber = yield* $(Effect.fork(Pool.get(pool))) const result = yield* $(Fiber.interrupt(fiber)) expect(result).toEqual(Exit.interrupt(fiberId)) - })) + }).pipe(Effect.scoped)) it.scoped("finalizer is called for failed allocations", () => Effect.gen(function*() { const scope = yield* Scope.make() - const count = yield* Ref.make(0) + const allocations = yield* Ref.make(0) + const released = yield* Ref.make(0) const get = Effect.acquireRelease( - Ref.updateAndGet(count, (n) => n + 1), - () => Ref.update(count, (n) => n - 1) + Ref.updateAndGet(allocations, (n) => n + 1), + () => Ref.update(released, (n) => n + 1) ).pipe( Effect.andThen(Effect.fail("boom")) ) @@ -263,8 +285,8 @@ describe("Pool", () => { yield* Effect.scoped(pool.get).pipe( Effect.ignore ) - expect(yield* Ref.get(count)).toBe(10) yield* Scope.close(scope, Exit.void) - expect(yield* Ref.get(count)).toBe(0) + expect(yield* Ref.get(allocations)).toBe(11) + expect(yield* Ref.get(released)).toBe(11) })) })