From 72c37ecc232fb9e256d0e272d7861ce8eed8135e Mon Sep 17 00:00:00 2001 From: Paul Taylor Date: Tue, 26 Jul 2022 17:03:14 -0500 Subject: [PATCH] [FIX] Ensure `return` is called on AsyncIterators (#348) * fix(tests): await values in concatMap and flatMap tests * test(debounce): test that debounce triggers finalize * fix(timeout): ensure timeout triggers finalize * fix(tap): ensure tap triggers finalize * fix(flattenconcurrentasynciterable): ensure FlattenConcurrentAsyncIterable triggers finalize * ignore non-js files when debugging --- .vscode/launch.json | 4 +- .../asynciterable-operators/concatmap-spec.ts | 32 +++---- spec/asynciterable-operators/debounce-spec.ts | 57 +++++++++++- spec/asynciterable-operators/finalize-spec.ts | 74 ++++++++++++++-- spec/asynciterable-operators/flatmap-spec.ts | 46 +++++++--- spec/asynciterable-operators/timeout-spec.ts | 26 +++++- spec/asynciterablehelpers.ts | 19 ++-- src/asynciterable/asynciterablex.ts | 4 +- src/asynciterable/operators/_flatten.ts | 87 ++++++++++--------- src/asynciterable/operators/tap.ts | 41 ++++----- src/asynciterable/operators/timeout.ts | 35 ++++---- 11 files changed, 300 insertions(+), 125 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index f0d23921..c3545883 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -24,8 +24,8 @@ "console": "integratedTerminal", "program": "${workspaceFolder}/node_modules/.bin/jest", "skipFiles": [ - "/**/*.js", - "${workspaceFolder}/node_modules/**/*.js" + "/**/*", + "${workspaceFolder}/node_modules/**/*", ], "env": { "NODE_NO_WARNINGS": "1", diff --git a/spec/asynciterable-operators/concatmap-spec.ts b/spec/asynciterable-operators/concatmap-spec.ts index 63e2cb75..e0effa62 100644 --- a/spec/asynciterable-operators/concatmap-spec.ts +++ b/spec/asynciterable-operators/concatmap-spec.ts @@ -1,4 +1,4 @@ -import { hasNext, noNext } from '../asynciterablehelpers'; +import { hasNext, hasErr, noNext } from '../asynciterablehelpers'; import { of, range, sequenceEqual, throwError } from 'ix/asynciterable'; import { map, tap, concatMap } from 'ix/asynciterable/operators'; @@ -7,13 +7,13 @@ test('AsyncIterable#concatMap with range', async () => { const ys = xs.pipe(concatMap(async (x) => range(0, x))); const it = ys[Symbol.asyncIterator](); - hasNext(it, 0); - hasNext(it, 0); - hasNext(it, 1); - hasNext(it, 0); - hasNext(it, 1); - hasNext(it, 2); - noNext(it); + await hasNext(it, 0); + await hasNext(it, 0); + await hasNext(it, 1); + await hasNext(it, 0); + await hasNext(it, 1); + await hasNext(it, 2); + await noNext(it); }); test('AsyncIterable#concatMap order of effects', async () => { @@ -35,10 +35,10 @@ test('AsyncIterable#concatMap selector returns throw', async () => { const ys = xs.pipe(concatMap(async (x) => (x < 3 ? range(0, x) : throwError(err)))); const it = ys[Symbol.asyncIterator](); - hasNext(it, 0); - hasNext(it, 0); - hasNext(it, 1); - await expect(it.next()).rejects.toThrow(err); + await hasNext(it, 0); + await hasNext(it, 0); + await hasNext(it, 1); + await hasErr(it, err); }); test('AsyncIterable#concatMap with error throws', async () => { @@ -63,8 +63,8 @@ test('AsyncIterable#concatMap selector throws error', async () => { ); const it = ys[Symbol.asyncIterator](); - hasNext(it, 0); - hasNext(it, 0); - hasNext(it, 1); - await expect(it.next()).rejects.toThrow(err); + await hasNext(it, 0); + await hasNext(it, 0); + await hasNext(it, 1); + await hasErr(it, err); }); diff --git a/spec/asynciterable-operators/debounce-spec.ts b/spec/asynciterable-operators/debounce-spec.ts index 32915a93..3633f8d2 100644 --- a/spec/asynciterable-operators/debounce-spec.ts +++ b/spec/asynciterable-operators/debounce-spec.ts @@ -1,5 +1,5 @@ -import { hasNext, noNext, delayValue } from '../asynciterablehelpers'; -import { debounce } from 'ix/asynciterable/operators'; +import { hasNext, hasErr, noNext, delayError, delayValue } from '../asynciterablehelpers'; +import { debounce, finalize } from 'ix/asynciterable/operators'; import { as } from 'ix/asynciterable'; import { AbortError } from 'ix/Ix'; @@ -53,8 +53,59 @@ test( const it = ys[Symbol.asyncIterator](controller.signal); await hasNext(it, 1); setImmediate(() => controller.abort()); - await expect(hasNext(it, 3)).rejects.toThrow(AbortError); + await hasErr(it, AbortError); await noNext(it); }, 10 * 1000 ); + +test( + 'AsyncIterable#debounce triggers finalize on error', + async () => { + let done = false; + const e = new Error(); + const xs = async function* () { + yield await delayValue(1, 100); + yield await delayError(e, 100); + yield await delayValue(3, 100); + }; + const ys = as(xs()).pipe( + finalize(() => { + done = true; + }), + debounce(50) + ); + + const it = ys[Symbol.asyncIterator](); + await hasNext(it, 1); + await hasErr(it, e); + await noNext(it); + expect(done).toBeTruthy(); + }, + 10 * 1000 +); + +test( + 'AsyncIterable#debounce triggers finalize on complete', + async () => { + let done = false; + const xs = async function* () { + yield await delayValue(1, 200); + yield await delayValue(2, 400); + yield await delayValue(3, 200); + }; + const ys = as(xs()).pipe( + finalize(() => { + done = true; + }), + debounce(300) + ); + + const it = ys[Symbol.asyncIterator](); + await hasNext(it, 1); + await hasNext(it, 3); + await noNext(it); + expect(done).toBeTruthy(); + }, + 10 * 1000 +); diff --git a/spec/asynciterable-operators/finalize-spec.ts b/spec/asynciterable-operators/finalize-spec.ts index 6d6dba3e..ae935fcd 100644 --- a/spec/asynciterable-operators/finalize-spec.ts +++ b/spec/asynciterable-operators/finalize-spec.ts @@ -1,8 +1,8 @@ -import { hasNext, noNext } from '../asynciterablehelpers'; +import { hasNext, hasErr, noNext } from '../asynciterablehelpers'; import { range, throwError } from 'ix/asynciterable'; -import { finalize } from 'ix/asynciterable/operators'; +import { flatMap, finalize, tap } from 'ix/asynciterable/operators'; -test('AsyncIterable#finally defers behavior', async () => { +test('AsyncIterable#finalize defers behavior', async () => { let done = false; const xs = range(0, 2).pipe( @@ -25,7 +25,7 @@ test('AsyncIterable#finally defers behavior', async () => { expect(done).toBeTruthy(); }); -test('AsyncIterable#finally calls even with error', async () => { +test('AsyncIterable#finalize calls even with error', async () => { let done = false; const err = new Error(); @@ -34,12 +34,76 @@ test('AsyncIterable#finally calls even with error', async () => { done = true; }) ); + expect(done).toBeFalsy(); const it = xs[Symbol.asyncIterator](); + + expect(done).toBeFalsy(); + + await hasErr(it, err); + + expect(done).toBeTruthy(); +}); + +test('AsyncIterable#finalize calls with downstream error', async () => { + let done = false; + + const err = new Error(); + const xs = range(0, 2).pipe( + finalize(async () => { + done = true; + }), + tap(async () => { + throw err; + }) + ); + + expect(done).toBeFalsy(); + + const it = xs[Symbol.asyncIterator](); + expect(done).toBeFalsy(); - await expect(hasNext(it, 0)).rejects.toThrow(err); + await hasErr(it, err); + + expect(done).toBeTruthy(); +}); + +test('AsyncIterable#finalize calls with downstream error from flattening', async () => { + let done = false; + // let srcValues = [] as number[]; + + const err = new Error(); + const xs = range(0, 4).pipe( + finalize(async () => { + done = true; + }), + flatMap(async (x) => { + // srcValues.push(x); + if (x === 1) { + return throwError(err); + } + return [x]; + }) + ); + + expect(done).toBeFalsy(); + + const it = xs[Symbol.asyncIterator](); + + expect(done).toBeFalsy(); + + await hasNext(it, 0); + await hasErr(it, err); + await noNext(it); expect(done).toBeTruthy(); + // The source will yield one more value after the throwError(err), + // because the internal Promise.race([outer, inner]) call resolves + // to the last outer value before the inner error. This is an artifact + // of the JS Promise scheduler's breadth-first scheduling behavior, not + // a bug in IxJS. + // TODO: This is broken in google-closure-compiler? + // expect(srcValues).toEqual([0, 1, 2]); }); diff --git a/spec/asynciterable-operators/flatmap-spec.ts b/spec/asynciterable-operators/flatmap-spec.ts index dd634471..e3fccb02 100644 --- a/spec/asynciterable-operators/flatmap-spec.ts +++ b/spec/asynciterable-operators/flatmap-spec.ts @@ -1,4 +1,4 @@ -import { hasNext } from '../asynciterablehelpers'; +import { hasNext, hasErr } from '../asynciterablehelpers'; import { of, range, throwError, toArray } from 'ix/asynciterable'; import { flatMap } from 'ix/asynciterable/operators'; @@ -10,15 +10,25 @@ test('AsyncIterable#flatMap with range', async () => { }); test('AsyncIterable#flatMap selector returns throw', async () => { + const err = new Error(); + const xs = of(1, 2, 3); + const ys = xs.pipe(flatMap((x) => (x < 3 ? range(0, x) : throwError(err)))); + + const it = ys[Symbol.asyncIterator](); + await hasNext(it, 0); + await hasNext(it, 0); + await hasErr(it, err); +}); + +test('AsyncIterable#flatMap async selector returns throw', async () => { const err = new Error(); const xs = of(1, 2, 3); const ys = xs.pipe(flatMap(async (x) => (x < 3 ? range(0, x) : throwError(err)))); const it = ys[Symbol.asyncIterator](); - hasNext(it, 0); - hasNext(it, 0); - hasNext(it, 1); - await expect(it.next()).rejects.toThrow(err); + await hasNext(it, 0); + await hasNext(it, 0); + await hasErr(it, err); }); test('AsyncIterable#flatMap with error throws', async () => { @@ -27,10 +37,27 @@ test('AsyncIterable#flatMap with error throws', async () => { const ys = xs.pipe(flatMap((x) => range(0, x))); const it = ys[Symbol.asyncIterator](); - await expect(it.next()).rejects.toThrow(err); + await hasErr(it, err); }); test('AsyncIterable#flatMap selector throws error', async () => { + const err = new Error(); + const xs = of(1, 2, 3); + const ys = xs.pipe( + flatMap((x) => { + if (x < 3) { + return range(0, x); + } + throw err; + }) + ); + + const it = ys[Symbol.asyncIterator](); + await hasNext(it, 0); + await hasErr(it, err); +}); + +test('AsyncIterable#flatMap async selector throws error', async () => { const err = new Error(); const xs = of(1, 2, 3); const ys = xs.pipe( @@ -43,8 +70,7 @@ test('AsyncIterable#flatMap selector throws error', async () => { ); const it = ys[Symbol.asyncIterator](); - hasNext(it, 0); - hasNext(it, 0); - hasNext(it, 1); - await expect(it.next()).rejects.toThrow(err); + await hasNext(it, 0); + await hasNext(it, 0); + await hasErr(it, err); }); diff --git a/spec/asynciterable-operators/timeout-spec.ts b/spec/asynciterable-operators/timeout-spec.ts index 3d724d40..fecc5517 100644 --- a/spec/asynciterable-operators/timeout-spec.ts +++ b/spec/asynciterable-operators/timeout-spec.ts @@ -1,5 +1,5 @@ -import { hasNext, noNext, delayValue } from '../asynciterablehelpers'; -import { timeout } from 'ix/asynciterable/operators'; +import { hasNext, hasErr, noNext, delayValue } from '../asynciterablehelpers'; +import { timeout, finalize } from 'ix/asynciterable/operators'; import { as } from 'ix/asynciterable'; import { TimeoutError } from 'ix/asynciterable/operators/timeout'; @@ -27,6 +27,26 @@ test('AsyncIterable#timeout throws when delayed', async () => { const it = ys[Symbol.asyncIterator](); await hasNext(it, 1); - await expect(it.next()).rejects.toThrow(TimeoutError); + await hasErr(it, TimeoutError); await noNext(it); }); + +test('AsyncIterable#timeout triggers finalize', async () => { + let done = false; + const xs = async function* () { + yield await delayValue(1, 50); + yield await delayValue(2, 200); + }; + const ys = as(xs()).pipe( + finalize(() => { + done = true; + }), + timeout(100) + ); + + const it = ys[Symbol.asyncIterator](); + await hasNext(it, 1); + await hasErr(it, TimeoutError); + await noNext(it); + expect(done).toBeTruthy(); +}); diff --git a/spec/asynciterablehelpers.ts b/spec/asynciterablehelpers.ts index a46997a9..a87ea5e5 100644 --- a/spec/asynciterablehelpers.ts +++ b/spec/asynciterablehelpers.ts @@ -3,14 +3,15 @@ import { AsyncIterableX } from 'ix/asynciterable'; import { Observer, PartialObserver } from '../src/observer'; export async function hasNext(source: AsyncIterator, expected: T) { - const { done, value } = await source.next(); - expect(done).toBeFalsy(); - expect(value).toEqual(expected); + await expect(source.next()).resolves.toEqual({ done: false, value: expected }); +} + +export async function hasErr(source: AsyncIterator, expected: any) { + await expect(source.next()).rejects.toThrow(expected); } export async function noNext(source: AsyncIterator) { - const next = await source.next(); - expect(next.done).toBeTruthy(); + await expect(source.next()).resolves.toEqual({ done: true, value: undefined }); } export function delayValue(item: T, delay: number): Promise { @@ -21,6 +22,14 @@ export function delayValue(item: T, delay: number): Promise { }); } +export function delayError(item: T, delay: number): Promise { + return new Promise((_, reject) => { + setTimeout(() => { + reject(item); + }, delay); + }); +} + const noop = (_?: any) => { /**/ }; diff --git a/src/asynciterable/asynciterablex.ts b/src/asynciterable/asynciterablex.ts index 583a4ba3..ec1dda72 100644 --- a/src/asynciterable/asynciterablex.ts +++ b/src/asynciterable/asynciterablex.ts @@ -82,13 +82,13 @@ export abstract class AsyncIterableX implements AsyncIterable { */ static as(source: string): AsyncIterableX; /** - * Converts the async iterable like input into an async-iterable. + * Converts the AsyncIterable-like input or single element into an AsyncIterable. * * @template T The type of elements in the async-iterable like sequence. * @param {AsyncIterableInput} source The async-iterable like input to convert to an async-iterable. * @returns {AsyncIterableX} An async-iterable stream from elements in the async-iterable like sequence. */ - static as(source: AsyncIterableInput): AsyncIterableX; + static as(source: AsyncIterableInput | T): AsyncIterableX; /** * Converts the single element into an async-iterable sequence. * diff --git a/src/asynciterable/operators/_flatten.ts b/src/asynciterable/operators/_flatten.ts index 4386b33f..4a7557cc 100644 --- a/src/asynciterable/operators/_flatten.ts +++ b/src/asynciterable/operators/_flatten.ts @@ -3,6 +3,7 @@ import { wrapWithAbort } from '../operators/withabort'; import { AbortError, throwIfAborted } from '../../aborterror'; import { safeRace } from '../../util/safeRace'; import { isPromise } from '../../util/isiterable'; +import { returnAsyncIterator } from '../../util/returniterator'; export type FlattenConcurrentSelector = ( value: TSource, @@ -17,6 +18,8 @@ const enum Type { INNER = 1, } +type Wrapper = { value: TValue; index: number; type: TType }; + function ignoreInnerAbortErrors(signal: AbortSignal) { return function ignoreInnerAbortError(e?: any) { if (signal.aborted && e instanceof AbortError) { @@ -26,12 +29,13 @@ function ignoreInnerAbortErrors(signal: AbortSignal) { }; } -async function* wrapIterator( - source: AsyncIterable, +async function* wrapIterator( + source: AsyncIterable, index: number, - type: Type, + type: TType, signal?: AbortSignal -) { +): AsyncGenerator> { + throwIfAborted(signal); for await (const value of wrapWithAbort(source, signal)) { throwIfAborted(signal); yield { type, index, value }; @@ -53,8 +57,8 @@ export class FlattenConcurrentAsyncIterable extends AsyncItera async *[Symbol.asyncIterator](outerSignal?: AbortSignal) { throwIfAborted(outerSignal); - type OuterWrapper = { value: TSource; index: number; type: Type.OUTER }; - type InnerWrapper = { value: TResult; index: number; type: Type.INNER }; + type OuterWrapper = Wrapper; + type InnerWrapper = Wrapper; let active = 0; let outerIndex = 0; @@ -70,13 +74,11 @@ export class FlattenConcurrentAsyncIterable extends AsyncItera const controllers = new Array(isFinite(concurrent) ? concurrent : 0); const inners = new Array>(isFinite(concurrent) ? concurrent : 0); - const outer = wrapIterator(this._source, 0, Type.OUTER, outerSignal) as AsyncGenerator< - OuterWrapper - >; + const outer = wrapIterator(this._source, 0, Type.OUTER, outerSignal); const results = [outer.next()] as Promise>[]; try { - while (1) { + do { const { done = false, value: { type, value, index }, @@ -98,7 +100,11 @@ export class FlattenConcurrentAsyncIterable extends AsyncItera } case Type.INNER: { yield value as TResult; - results[index] = pullNextInner(index); + const { [index - 1]: inner } = inners; + const { + [index - 1]: { signal }, + } = controllers; + results[index] = inner.next().catch(ignoreInnerAbortErrors(signal)); break; } } @@ -106,10 +112,11 @@ export class FlattenConcurrentAsyncIterable extends AsyncItera // ignore this result slot results[index] = NEVER_PROMISE; switch (type) { - case Type.OUTER: + case Type.OUTER: { outerComplete = true; break; - case Type.INNER: + } + case Type.INNER: { --active; // return the current slot to the pool innerIndices.push(index); @@ -119,22 +126,13 @@ export class FlattenConcurrentAsyncIterable extends AsyncItera pullNextOuter(outerValues.shift()!); } break; - } - if (outerComplete && active + outerValues.length === 0) { - return; + } } } - } + } while (!outerComplete || active + outerValues.length > 0); } finally { - controllers.forEach((controller) => { - controller?.abort(); - }); - } - - function pullNextInner(index: number) { - const result = inners[index - 1].next(); - const { [index - 1]: controller } = controllers; - return result.catch(ignoreInnerAbortErrors(controller.signal)); + controllers.forEach((controller) => controller?.abort()); + await Promise.all([outer as AsyncIterator, ...inners].map(returnAsyncIterator)); } function pullNextOuter(outerValue: TSource) { @@ -154,19 +152,30 @@ export class FlattenConcurrentAsyncIterable extends AsyncItera // `selector` is a sync or async function that returns AsyncIterableInput. const inner = selector.call(thisArg, outerValue, outerIndex++, innerSignal); - const wrapAndPullInner = (inner: AsyncIterableInput | TResult) => { - inners[index - 1] = wrapIterator( - AsyncIterableX.as(inner), - index, - Type.INNER, - innerSignal - ) as AsyncGenerator; - return pullNextInner(index); - }; - - results[index] = isPromise(inner) - ? (inner.then(wrapAndPullInner) as Promise>) - : wrapAndPullInner(inner); + results[index] = wrapAndPullInner(index, innerSignal, inner).catch( + ignoreInnerAbortErrors(innerSignal) + ); + } + + function wrapAndPullInner( + index: number, + signal: AbortSignal, + inner: + | PromiseLike | TResult> + | AsyncIterableInput + | TResult + ): Promise>> { + if (isPromise(inner)) { + return inner.then((inner) => wrapAndPullInner(index, signal, inner)) as Promise< + IteratorResult> + >; + } + return (inners[index - 1] = wrapIterator( + AsyncIterableX.as(inner), + index, + Type.INNER, + signal + )).next(); } } } diff --git a/src/asynciterable/operators/tap.ts b/src/asynciterable/operators/tap.ts index 3de16a70..0f1ba4b0 100644 --- a/src/asynciterable/operators/tap.ts +++ b/src/asynciterable/operators/tap.ts @@ -2,8 +2,8 @@ import { AsyncIterableX } from '../asynciterablex'; import { PartialAsyncObserver } from '../../observer'; import { MonoTypeOperatorAsyncFunction } from '../../interfaces'; import { toObserver } from '../../util/toobserver'; -import { wrapWithAbort } from './withabort'; import { AbortError, throwIfAborted } from '../../aborterror'; +import { returnAsyncIterator } from '../../util/returniterator'; export class TapAsyncIterable extends AsyncIterableX { private _source: AsyncIterable; @@ -17,34 +17,25 @@ export class TapAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); - const source = wrapWithAbort(this._source, signal); - const it = source[Symbol.asyncIterator](); - while (1) { - let next; - try { - next = await it.next(); - } catch (e) { - if (e instanceof AbortError) { - throw e; + const obs = this._observer; + const it = this._source[Symbol.asyncIterator](signal); + try { + for (let res: IteratorResult; !(res = await it.next()).done; ) { + if (obs.next) { + await obs.next(res.value); } - - if (this._observer.error) { - await this._observer.error(e); - } - throw e; + yield res.value; } - - if (next.done) { - if (this._observer.complete) { - await this._observer.complete(); - } - break; + if (obs.complete) { + await obs.complete(); } - - if (this._observer.next) { - await this._observer.next(next.value); + } catch (e) { + if (!(e instanceof AbortError) && obs.error) { + await obs.error(e); } - yield next.value; + throw e; + } finally { + await returnAsyncIterator(it); } } } diff --git a/src/asynciterable/operators/timeout.ts b/src/asynciterable/operators/timeout.ts index 53dc34bc..9fcb1024 100644 --- a/src/asynciterable/operators/timeout.ts +++ b/src/asynciterable/operators/timeout.ts @@ -5,6 +5,7 @@ import { wrapWithAbort } from './withabort'; import { throwIfAborted } from '../../aborterror'; import { isObject } from '../../util/isiterable'; import { safeRace } from '../../util/safeRace'; +import { returnAsyncIterator } from '../../util/returniterator'; export class TimeoutError extends Error { constructor(message = 'Timeout has occurred') { @@ -51,24 +52,28 @@ export class TimeoutAsyncIterable extends AsyncIterableX { async *[Symbol.asyncIterator](signal?: AbortSignal) { throwIfAborted(signal); const it = wrapWithAbort(this._source, signal)[Symbol.asyncIterator](); - while (1) { - const { type, value } = await safeRace>([ - it.next().then((val) => { - return { type: VALUE_TYPE, value: val }; - }), - sleep(this._dueTime, signal).then(() => { - return { type: ERROR_TYPE }; - }), - ]); + try { + while (1) { + const { type, value } = await safeRace>([ + it.next().then((val) => { + return { type: VALUE_TYPE, value: val }; + }), + sleep(this._dueTime, signal).then(() => { + return { type: ERROR_TYPE }; + }), + ]); - if (type === ERROR_TYPE) { - throw new TimeoutError(); - } + if (type === ERROR_TYPE) { + throw new TimeoutError(); + } - if (!value || value.done) { - break; + if (!value || value.done) { + break; + } + yield value.value; } - yield value.value; + } finally { + await returnAsyncIterator(it); } } }