Skip to content

Commit

Permalink
fix(from): make from forward or check abort signal. fixes #352 (#361)
Browse files Browse the repository at this point in the history
  • Loading branch information
trxcllnt authored Dec 30, 2023
1 parent d6591ee commit e2dd8d7
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 5 deletions.
40 changes: 38 additions & 2 deletions spec/asynciterable/from-spec.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { hasNext, noNext, toObserver } from '../asynciterablehelpers';
import { setInterval, clearInterval } from 'timers';
import { PartialObserver } from '../../src/observer';
import { from } from 'ix/asynciterable';
import { defer, from } from 'ix/asynciterable';
import { AbortError } from 'ix/Ix';
import { withAbort } from 'ix/asynciterable/operators';

function throwIfAborted(signal?: AbortSignal) {
if (signal && signal.aborted) {
throw new AbortError();
}
}

test('AsyncIterable#from from promise list', async () => {
const xs: Iterable<Promise<number>> = [
Promise.resolve(1),
Expand All @@ -20,9 +26,12 @@ test('AsyncIterable#from from promise list', async () => {
await noNext(it);
});

async function* getData() {
async function* getData(signal?: AbortSignal) {
throwIfAborted(signal);
yield 1;
throwIfAborted(signal);
yield 2;
throwIfAborted(signal);
yield 3;
}

Expand Down Expand Up @@ -133,6 +142,33 @@ test('AsyncIterable#from from with non-iterable throws', () => {
expect(() => from({} as any)).toThrow();
});

test('AsyncIterable#from from async generator with AbortSignal', async () => {
const abortController = new AbortController();
const xs = getData();
const res = from(xs);

const it = res[Symbol.asyncIterator](abortController.signal);
await hasNext(it, 1);
await hasNext(it, 2);
abortController.abort();
await expect(it.next()).rejects.toBeInstanceOf(AbortError);
});

test('AsyncIterable#from from defer with AbortSignal', async () => {
const abortController = new AbortController();
const xs = defer((signal) => {
expect(signal).toBeInstanceOf(AbortSignal);
return getData(signal);
});
const res = from(xs);

const it = res[Symbol.asyncIterator](abortController.signal);
await hasNext(it, 1);
await hasNext(it, 2);
abortController.abort();
await expect(it.next()).rejects.toBeInstanceOf(AbortError);
});

interface Observer<T> {
next: (value: T) => void;
error: (err: any) => void;
Expand Down
16 changes: 13 additions & 3 deletions src/asynciterable/asynciterablex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,10 +257,20 @@ export class FromAsyncIterable<TSource, TResult = TSource> extends AsyncIterable
this._selector = selector;
}

async *[Symbol.asyncIterator]() {
async *[Symbol.asyncIterator](signal?: AbortSignal) {
let i = 0;
for await (const item of <AsyncIterable<TSource>>this._source) {
yield await this._selector(item, i++);
if (signal && this._source instanceof AsyncIterableX) {
for await (const item of new WithAbortAsyncIterable(this._source, signal)) {
yield await this._selector(item, i++);
}
} else {
throwIfAborted(signal);
for await (const item of this._source) {
throwIfAborted(signal);
const value = await this._selector(item, i++);
throwIfAborted(signal);
yield value;
}
}
}
}
Expand Down

0 comments on commit e2dd8d7

Please sign in to comment.