Skip to content

Commit

Permalink
fea(asynciterable-operators): Add bufferCountOrTime operator (#324)
Browse files Browse the repository at this point in the history
  • Loading branch information
nordfjord authored Jun 9, 2021
1 parent aa40ab1 commit ee7c43e
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 1 deletion.
16 changes: 16 additions & 0 deletions docs/asynciterable/transforming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Buffer - count or time

An operator that's useful when you need to handle messages in batches.
Let's say you have an ongoing subscription to something but want to handle batches of messages.

```ts
await subscription.pipe(
// emit when buffer hits 16 items, or every 100ms
bufferCountOrTime(16, 100)
)
.forEach(handleBatch)
```

Using this operator makes sure that if messages slow down you'll still
handle them in a reasonable time whereas using `buffer` would leave you stuck until you get
the right amount of messages.
2 changes: 1 addition & 1 deletion docs/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ This just scratches the surface with the capabilities of using async iterables w
- Converting AsyncIterables
- `toArray`, `toDOMStream`, `toMap`, `toNodeStream`, `toObservable`, `toSet`
- [Transforming Sequences](asynciterable/transforming.md)
- `buffer`, `flat`, `flatMap`, `groupBy`, `map`, `pluck`, `scan`, `scanRight`
- `buffer`, `bufferCountOrTime`, `flat`, `flatMap`, `groupBy`, `map`, `pluck`, `scan`, `scanRight`
- Filtering Sequences
- `debounce`, `distinct`, `distinctUntilChanged`, `elementAt`, `find`, `findIndex`, `first`, `ignoreElements`, `last`, `single`, `slice`
- Combining Sequences
Expand Down
28 changes: 28 additions & 0 deletions spec/asynciterable/buffercountortime-spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import '../asynciterablehelpers';
import { of, concat } from 'ix/asynciterable';
import { bufferCountOrTime, delay } from 'ix/asynciterable/operators';

test('buffer count behaviour', async () => {
const result: number[][] = [];

await of(1, 2, 3, 4, 5, 6, 7, 8, 9, 0)
.pipe(bufferCountOrTime(5, 10))
.forEach((buf) => {
result.push(buf);
});

expect(result).toEqual([
[1, 2, 3, 4, 5],
[6, 7, 8, 9, 0],
]);
});

test('buffer time behaviour', async () => {
const result: number[][] = [];
const seq = concat(of(1, 2, 3, 4, 5), of(6, 7, 8, 9), of(0).pipe(delay(11)));
await seq.pipe(bufferCountOrTime(5, 10)).forEach((buf) => {
result.push(buf);
});

expect(result).toEqual([[1, 2, 3, 4, 5], [6, 7, 8, 9], [0]]);
});
64 changes: 64 additions & 0 deletions src/asynciterable/operators/buffercountortime.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { OperatorAsyncFunction } from '../../interfaces';
import { AsyncIterableX, interval, concat, of } from '../';
import { map } from './map';
import { merge } from '../merge';
import { wrapWithAbort } from './withabort';

const timerEvent = {};
const ended = {};

class BufferCountOrTime<TSource> extends AsyncIterableX<TSource[]> {
constructor(
private readonly source: AsyncIterable<TSource>,
private readonly bufferSize: number,
private readonly maxWaitTime: number
) {
super();
}

async *[Symbol.asyncIterator](signal?: AbortSignal) {
const buffer: TSource[] = [];
const timer = interval(this.maxWaitTime).pipe(map(() => timerEvent));
const source = concat(this.source, of(ended));
const merged = merge(source, timer);

for await (const item of wrapWithAbort(merged, signal)) {
if (item === ended) {
break;
}
if (item !== timerEvent) {
buffer.push(item as TSource);
}
if (buffer.length >= this.bufferSize || (buffer.length && item === timerEvent)) {
yield buffer.slice();
buffer.length = 0;
}
}

if (buffer.length) {
yield buffer;
}
}
}

/**
* Projects each element of an async-iterable sequence into consecutive buffers
* which are emitted when either the threshold count or time is met.
*
* @export
* @template TSource The type of elements in the source sequence.
* @param {number} count The size of the buffer.
* @param {number} time The threshold number of milliseconds to wait before flushing a non-full buffer
* @returns {OperatorAsyncFunction<TSource, TSource[]>} An operator which returns an async-iterable sequence
* of buffers
*/
export function bufferCountOrTime<TSource>(
count: number,
time: number
): OperatorAsyncFunction<TSource, TSource[]> {
return function bufferOperatorFunction(
source: AsyncIterable<TSource>
): AsyncIterableX<TSource[]> {
return new BufferCountOrTime<TSource>(source, count, time);
};
}
1 change: 1 addition & 0 deletions src/asynciterable/operators/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './batch';
export * from './buffer';
export * from './buffercountortime';
export * from './catcherror';
export * from './combinelatestwith';
export * from './concatall';
Expand Down

0 comments on commit ee7c43e

Please sign in to comment.