From ee7c43e32c6aebacd649b0ca107b2fc5096c3fd5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Einar=20Nor=C3=B0fj=C3=B6r=C3=B0?= Date: Wed, 9 Jun 2021 16:13:04 -0400 Subject: [PATCH] fea(asynciterable-operators): Add `bufferCountOrTime` operator (#324) --- docs/asynciterable/transforming.md | 16 +++++ docs/readme.md | 2 +- spec/asynciterable/buffercountortime-spec.ts | 28 ++++++++ .../operators/buffercountortime.ts | 64 +++++++++++++++++++ src/asynciterable/operators/index.ts | 1 + 5 files changed, 110 insertions(+), 1 deletion(-) create mode 100644 spec/asynciterable/buffercountortime-spec.ts create mode 100644 src/asynciterable/operators/buffercountortime.ts diff --git a/docs/asynciterable/transforming.md b/docs/asynciterable/transforming.md index e69de29b..b249cba4 100644 --- a/docs/asynciterable/transforming.md +++ b/docs/asynciterable/transforming.md @@ -0,0 +1,16 @@ +# Buffer - count or time + +An operator that's useful when you need to handle messages in batches. +Let's say you have an ongoing subscription to something but want to handle batches of messages. + +```ts +await subscription.pipe( + // emit when buffer hits 16 items, or every 100ms + bufferCountOrTime(16, 100) +) +.forEach(handleBatch) +``` + +Using this operator makes sure that if messages slow down you'll still +handle them in a reasonable time whereas using `buffer` would leave you stuck until you get +the right amount of messages. diff --git a/docs/readme.md b/docs/readme.md index 005679e0..b05684e8 100644 --- a/docs/readme.md +++ b/docs/readme.md @@ -152,7 +152,7 @@ This just scratches the surface with the capabilities of using async iterables w - Converting AsyncIterables - `toArray`, `toDOMStream`, `toMap`, `toNodeStream`, `toObservable`, `toSet` - [Transforming Sequences](asynciterable/transforming.md) - - `buffer`, `flat`, `flatMap`, `groupBy`, `map`, `pluck`, `scan`, `scanRight` + - `buffer`, `bufferCountOrTime`, `flat`, `flatMap`, `groupBy`, `map`, `pluck`, `scan`, `scanRight` - Filtering Sequences - `debounce`, `distinct`, `distinctUntilChanged`, `elementAt`, `find`, `findIndex`, `first`, `ignoreElements`, `last`, `single`, `slice` - Combining Sequences diff --git a/spec/asynciterable/buffercountortime-spec.ts b/spec/asynciterable/buffercountortime-spec.ts new file mode 100644 index 00000000..e2938bf2 --- /dev/null +++ b/spec/asynciterable/buffercountortime-spec.ts @@ -0,0 +1,28 @@ +import '../asynciterablehelpers'; +import { of, concat } from 'ix/asynciterable'; +import { bufferCountOrTime, delay } from 'ix/asynciterable/operators'; + +test('buffer count behaviour', async () => { + const result: number[][] = []; + + await of(1, 2, 3, 4, 5, 6, 7, 8, 9, 0) + .pipe(bufferCountOrTime(5, 10)) + .forEach((buf) => { + result.push(buf); + }); + + expect(result).toEqual([ + [1, 2, 3, 4, 5], + [6, 7, 8, 9, 0], + ]); +}); + +test('buffer time behaviour', async () => { + const result: number[][] = []; + const seq = concat(of(1, 2, 3, 4, 5), of(6, 7, 8, 9), of(0).pipe(delay(11))); + await seq.pipe(bufferCountOrTime(5, 10)).forEach((buf) => { + result.push(buf); + }); + + expect(result).toEqual([[1, 2, 3, 4, 5], [6, 7, 8, 9], [0]]); +}); diff --git a/src/asynciterable/operators/buffercountortime.ts b/src/asynciterable/operators/buffercountortime.ts new file mode 100644 index 00000000..f1294610 --- /dev/null +++ b/src/asynciterable/operators/buffercountortime.ts @@ -0,0 +1,64 @@ +import { OperatorAsyncFunction } from '../../interfaces'; +import { AsyncIterableX, interval, concat, of } from '../'; +import { map } from './map'; +import { merge } from '../merge'; +import { wrapWithAbort } from './withabort'; + +const timerEvent = {}; +const ended = {}; + +class BufferCountOrTime extends AsyncIterableX { + constructor( + private readonly source: AsyncIterable, + private readonly bufferSize: number, + private readonly maxWaitTime: number + ) { + super(); + } + + async *[Symbol.asyncIterator](signal?: AbortSignal) { + const buffer: TSource[] = []; + const timer = interval(this.maxWaitTime).pipe(map(() => timerEvent)); + const source = concat(this.source, of(ended)); + const merged = merge(source, timer); + + for await (const item of wrapWithAbort(merged, signal)) { + if (item === ended) { + break; + } + if (item !== timerEvent) { + buffer.push(item as TSource); + } + if (buffer.length >= this.bufferSize || (buffer.length && item === timerEvent)) { + yield buffer.slice(); + buffer.length = 0; + } + } + + if (buffer.length) { + yield buffer; + } + } +} + +/** + * Projects each element of an async-iterable sequence into consecutive buffers + * which are emitted when either the threshold count or time is met. + * + * @export + * @template TSource The type of elements in the source sequence. + * @param {number} count The size of the buffer. + * @param {number} time The threshold number of milliseconds to wait before flushing a non-full buffer + * @returns {OperatorAsyncFunction} An operator which returns an async-iterable sequence + * of buffers + */ +export function bufferCountOrTime( + count: number, + time: number +): OperatorAsyncFunction { + return function bufferOperatorFunction( + source: AsyncIterable + ): AsyncIterableX { + return new BufferCountOrTime(source, count, time); + }; +} diff --git a/src/asynciterable/operators/index.ts b/src/asynciterable/operators/index.ts index e82095ef..f36cc7df 100644 --- a/src/asynciterable/operators/index.ts +++ b/src/asynciterable/operators/index.ts @@ -1,5 +1,6 @@ export * from './batch'; export * from './buffer'; +export * from './buffercountortime'; export * from './catcherror'; export * from './combinelatestwith'; export * from './concatall';