Skip to content

Commit

Permalink
feat(from): support AbortSignal in from(observable) (#333)
Browse files Browse the repository at this point in the history
  • Loading branch information
aikoven authored Jul 22, 2021
1 parent 370ae91 commit 7897e85
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 1 deletion.
58 changes: 58 additions & 0 deletions spec/asynciterable/from-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import { hasNext, noNext, toObserver } from '../asynciterablehelpers';
import { setInterval, clearInterval } from 'timers';
import { PartialObserver } from '../../src/observer';
import { from } from 'ix/asynciterable';
import { AbortError } from 'ix/Ix';
import { withAbort } from 'ix/asynciterable/operators';

test('AsyncIterable#from from promise list', async () => {
const xs: Iterable<Promise<number>> = [
Expand Down Expand Up @@ -246,3 +248,59 @@ test('AsyncIterable#fromObservable with error', async () => {
const it = ys[Symbol.asyncIterator]();
await expect(it.next()).rejects.toThrow(err);
});

test('AsyncIterable#fromObservable with abort while waiting', async () => {
let unsubscribed = false;

const xs = new TestObservable<number>((obs) => {
obs.next(0);

return {
unsubscribe() {
unsubscribed = true;
},
};
});

const abortController = new AbortController();

const ys = from(xs).pipe(withAbort(abortController.signal));
const it = ys[Symbol.asyncIterator]();

await hasNext(it, 0);

setTimeout(() => {
abortController.abort();
}, 100);

await expect(it.next()).rejects.toBeInstanceOf(AbortError);
expect(unsubscribed).toBe(true);
});

test('AsyncIterable#fromObservable with abort while queueing', async () => {
let unsubscribed = false;

const xs = new TestObservable<number>((obs) => {
obs.next(0);
obs.next(1);
obs.next(2);

return {
unsubscribe() {
unsubscribed = true;
},
};
});

const abortController = new AbortController();

const ys = from(xs).pipe(withAbort(abortController.signal));
const it = ys[Symbol.asyncIterator]();

await hasNext(it, 0);

abortController.abort();

await expect(it.next()).rejects.toBeInstanceOf(AbortError);
expect(unsubscribed).toBe(true);
});
18 changes: 17 additions & 1 deletion src/asynciterable/from.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import {
import { Observable } from '../observer';
import { toLength } from '../util/tolength';
import { AsyncSink } from './asyncsink';
import { AbortError, throwIfAborted } from '../aborterror';

export let from: <TSource, TResult = TSource>(
source: AsyncIterableInput<TSource>,
Expand Down Expand Up @@ -149,7 +150,9 @@ export function _initialize(Ctor: typeof AsyncIterableX) {
this._selector = selector;
}

async *[Symbol.asyncIterator]() {
async *[Symbol.asyncIterator](signal?: AbortSignal) {
throwIfAborted(signal);

const sink: AsyncSink<TSource> = new AsyncSink<TSource>();
const subscription = this._observable.subscribe({
next(value: TSource) {
Expand All @@ -163,12 +166,25 @@ export function _initialize(Ctor: typeof AsyncIterableX) {
}
});

function onAbort() {
sink.error(new AbortError());
}

if (signal) {
signal.addEventListener('abort', onAbort);
}

let i = 0;
try {
for (let next; !(next = await sink.next()).done; ) {
throwIfAborted(signal);
yield await this._selector(next.value!, i++);
}
} finally {
if (signal) {
signal.removeEventListener('abort', onAbort);
}

subscription.unsubscribe();
}
}
Expand Down

0 comments on commit 7897e85

Please sign in to comment.