Skip to content

Commit

Permalink
refactor(Batching)!: Relax arbitrary arrays constraint
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 16, 2024
1 parent 856c655 commit 4f6f300
Showing 1 changed file with 7 additions and 7 deletions.
14 changes: 7 additions & 7 deletions src/Equinox.Core/Batching.fs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,16 @@ type internal AsyncBatch<'Req, 'Res>() =
// sadly there's no way to detect without a try/catch
try queue.TryAdd(item)
with :? InvalidOperationException -> false
let mutable attempt = Unchecked.defaultof<Lazy<Task<'Res[]>>>
let mutable attempt = Unchecked.defaultof<Lazy<Task<'Res>>>

/// Attempt to add a request to the flight
/// Succeeds during linger interval (which commences when the first caller triggers the workflow via AwaitResult)
/// Fails if this flight has closed (caller should initialize a fresh Batch, potentially holding off until the current attempt completes)
member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, lingerMs: int, limiter: System.Threading.SemaphoreSlim voption, ct) =
member _.TryAdd(req, dispatch: Func<'Req[], CancellationToken, Task<'Res>>, lingerMs: int, limiter: System.Threading.SemaphoreSlim voption, ct) =
if not (tryEnqueue req) then false else

// Prepare a new instance, with cancellation under our control (it won't start until the Force triggers it though)
let newInstance: Lazy<Task<'Res[]>> = lazy task {
// Prepare a new instance, with cancellation under our control (it won't start until the .Value triggers it though)
let newInstance: Lazy<Task<'Res>> = lazy task {
do! Task.Delay(lingerMs, ct)
match limiter with ValueNone -> () | ValueSome s -> do! s.WaitAsync(ct)
try queue.CompleteAdding()
Expand All @@ -45,12 +45,12 @@ type internal AsyncBatch<'Req, 'Res>() =
/// Requests are added to pending batch during the wait period, which consists of two phases:
/// 1. a defined linger period (min 1ms)
/// 2. (optionally) a wait to acquire capacity on a limiter semaphore (e.g. one might have a limit on concurrent dispatches across a pool)
type Batcher<'Req, 'Res> private (tryInclude: Func<AsyncBatch<_, _>, 'Req, CancellationToken, bool>) =
type Batcher<'Req, 'Res> private (tryInclude: Func<AsyncBatch<'Req, 'Res>, 'Req, CancellationToken, bool>) =
let mutable cell = AsyncBatch<'Req, 'Res>()
new(dispatch: Func<'Req[], CancellationToken, Task<'Res[]>>, lingerMs, limiter) =
new(dispatch: Func<'Req[], CancellationToken, Task<'Res>>, lingerMs, limiter) =
if lingerMs < 1 then invalidArg (nameof(lingerMs)) "Minimum linger period is 1ms" // concurrent waiters need to add work to the batch across their threads
Batcher(fun cell req ct -> cell.TryAdd(req, dispatch, lingerMs, limiter, ct = ct))
new(dispatch: 'Req[] -> Async<'Res[]>, ?linger : TimeSpan,
new(dispatch: 'Req[] -> Async<'Res>, ?linger: TimeSpan,
// Extends the linger phase to include a period during which we await capacity on an externally managed Semaphore
// The Batcher doesn't care, but a typical use is to enable limiting the number of concurrent in-flight dispatches
?limiter) =
Expand Down

0 comments on commit 4f6f300

Please sign in to comment.