diff --git a/spec/asynciterable/from-spec.ts b/spec/asynciterable/from-spec.ts index 870b52a5..065c2e2a 100644 --- a/spec/asynciterable/from-spec.ts +++ b/spec/asynciterable/from-spec.ts @@ -1,10 +1,16 @@ import { hasNext, noNext, toObserver } from '../asynciterablehelpers'; import { setInterval, clearInterval } from 'timers'; import { PartialObserver } from '../../src/observer'; -import { from } from 'ix/asynciterable'; +import { defer, from } from 'ix/asynciterable'; import { AbortError } from 'ix/Ix'; import { withAbort } from 'ix/asynciterable/operators'; +function throwIfAborted(signal?: AbortSignal) { + if (signal && signal.aborted) { + throw new AbortError(); + } +} + test('AsyncIterable#from from promise list', async () => { const xs: Iterable> = [ Promise.resolve(1), @@ -20,9 +26,12 @@ test('AsyncIterable#from from promise list', async () => { await noNext(it); }); -async function* getData() { +async function* getData(signal?: AbortSignal) { + throwIfAborted(signal); yield 1; + throwIfAborted(signal); yield 2; + throwIfAborted(signal); yield 3; } @@ -133,6 +142,33 @@ test('AsyncIterable#from from with non-iterable throws', () => { expect(() => from({} as any)).toThrow(); }); +test('AsyncIterable#from from async generator with AbortSignal', async () => { + const abortController = new AbortController(); + const xs = getData(); + const res = from(xs); + + const it = res[Symbol.asyncIterator](abortController.signal); + await hasNext(it, 1); + await hasNext(it, 2); + abortController.abort(); + await expect(it.next()).rejects.toBeInstanceOf(AbortError); +}); + +test('AsyncIterable#from from defer with AbortSignal', async () => { + const abortController = new AbortController(); + const xs = defer((signal) => { + expect(signal).toBeInstanceOf(AbortSignal); + return getData(signal); + }); + const res = from(xs); + + const it = res[Symbol.asyncIterator](abortController.signal); + await hasNext(it, 1); + await hasNext(it, 2); + abortController.abort(); + await expect(it.next()).rejects.toBeInstanceOf(AbortError); +}); + interface Observer { next: (value: T) => void; error: (err: any) => void; diff --git a/src/asynciterable/asynciterablex.ts b/src/asynciterable/asynciterablex.ts index 38236fbb..d5cb0ac4 100644 --- a/src/asynciterable/asynciterablex.ts +++ b/src/asynciterable/asynciterablex.ts @@ -257,10 +257,20 @@ export class FromAsyncIterable extends AsyncIterable this._selector = selector; } - async *[Symbol.asyncIterator]() { + async *[Symbol.asyncIterator](signal?: AbortSignal) { let i = 0; - for await (const item of >this._source) { - yield await this._selector(item, i++); + if (signal && this._source instanceof AsyncIterableX) { + for await (const item of new WithAbortAsyncIterable(this._source, signal)) { + yield await this._selector(item, i++); + } + } else { + throwIfAborted(signal); + for await (const item of this._source) { + throwIfAborted(signal); + const value = await this._selector(item, i++); + throwIfAborted(signal); + yield value; + } } } }