Skip to content

Commit

Permalink
feat(Observable): now implements Symbol.asyncIterator
Browse files Browse the repository at this point in the history
  • Loading branch information
benlesh committed Mar 1, 2023
1 parent 414a692 commit e34aa5e
Show file tree
Hide file tree
Showing 2 changed files with 355 additions and 0 deletions.
236 changes: 236 additions & 0 deletions spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Observable, config, Subscription, Subscriber, Operator, NEVER, Subject,
import { map, filter, count, tap, combineLatestWith, concatWith, mergeWith, raceWith, zipWith, catchError, share} from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';
import { result } from 'lodash';

function expectFullObserver(val: any) {
expect(val).to.be.a('object');
Expand Down Expand Up @@ -972,4 +973,239 @@ describe('Observable.lift', () => {
} }
);
});

describe('As an async iterable', () => {
it('should be able to be used with for-await-of', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

const results: number[] = [];
for await (const value of source) {
results.push(value);
}

expect(results).to.deep.equal([1, 2, 3]);
});

it('should unsubscribe if the for-await-of loop is broken', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

const results: number[] = [];
for await (const value of source) {
results.push(value);
break;
}

expect(results).to.deep.equal([1]);
});

it('should unsubscribe if the for-await-of loop is broken with a thrown error', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

const results: number[] = [];

try {
for await (const value of source) {
results.push(value);
throw new Error('wee')
}
} catch {
// Ignore
}

expect(results).to.deep.equal([1]);
});

it('should cause the async iterator to throw if the observable errors', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.error(new Error('wee'));
});

const results: number[] = [];

try {
for await (const value of source) {
results.push(value);
}
} catch (err: any) {
expect(err.message).to.equal('wee');
}

expect(results).to.deep.equal([1, 2]);
});

it('should handle situations where many promises are nexted out of the async iterator, but not awaited', async () => {
const subject = new Subject<number>();

const results: any[] = [];

const asyncIterator = subject[Symbol.asyncIterator]();

// Queue up three promises, but don't await them.
const first = asyncIterator.next().then((result) => {
results.push(result.value);
});

const second = asyncIterator.next().then((result) => {
results.push(result.value);
});

const third = asyncIterator.next().then((result) => {
results.push(result.value);
});

// Now let's progressively supply values to the promises.
expect(results).to.deep.equal([]);

subject.next(1);
await first;
expect(results).to.deep.equal([1]);

subject.next(2);
await second;
expect(results).to.deep.equal([1, 2]);

subject.next(3);
await third;
expect(results).to.deep.equal([1, 2, 3]);
});

it ('should handle situations where values from the observable are arriving faster than the are being consumed by the async iterator', async () => {
const subject = new Subject<number>();

const results: any[] = [];

const asyncIterator = subject[Symbol.asyncIterator]();

// start the subscription
const first = asyncIterator.next().then((result) => {
results.push(result.value);
});
subject.next(1);
await first
expect(results).to.deep.equal([1]);

// push values through the observable that aren't yet consumed by the async iterator
subject.next(2);
subject.next(3);

// now consume the values that were pushed through the observable
results.push((await asyncIterator.next()).value);
expect(results).to.deep.equal([1, 2]);

results.push((await asyncIterator.next()).value);
expect(results).to.deep.equal([1, 2, 3]);
});

it('should resolve all pending promises from the async iterable if the observable completes', async () => {
const subject = new Subject<number>();

const results: any[] = [];

const asyncIterator = subject[Symbol.asyncIterator]();

// Queue up three promises, but don't await them.
const allPending = Promise.all([
asyncIterator.next(),
asyncIterator.next(),
asyncIterator.next(),
]).then((allResults) => {
results.push(...allResults)
})

expect(results).to.deep.equal([]);

// Complete and make sure those promises are resolved.
subject.complete();
await allPending;
expect(results).to.deep.equal([
{ value: undefined, done: true },
{ value: undefined, done: true },
{ value: undefined, done: true },
]);
});

it('should reject all pending promises from the async iterable if the observable errors', async () => {
const subject = new Subject<number>();

const results: any[] = [];

const asyncIterator = subject[Symbol.asyncIterator]();

// Queue up three promises, but don't await them.
const allPending = Promise.all([
asyncIterator.next().catch((err: any) => results.push(err)),
asyncIterator.next().catch((err: any) => results.push(err)),
asyncIterator.next().catch((err: any) => results.push(err)),
])

expect(results).to.deep.equal([]);

// Complete and make sure those promises are resolved.
subject.error(new Error('wee'));
await allPending;
expect(results.length).to.equal(3);
expect(results[0]).to.be.an.instanceof(Error);
expect(results[0].message).to.equal('wee');
expect(results[1]).to.be.an.instanceOf(Error);
expect(results[1].message).to.equal('wee');
expect(results[2]).to.be.an.instanceOf(Error);
expect(results[2].message).to.equal('wee');
});

it('should unsubscribe from the source observable if `return` is called on the generator returned by Symbol.asyncIterator', async () => {
let state = 'idle';
const source = new Observable<number>((subscriber) => {
state = 'subscribed';
return () => {
state = 'unsubscribed';
}
});

const asyncIterator = source[Symbol.asyncIterator]();
expect(state).to.equal('idle');
asyncIterator.next();
expect(state).to.equal('subscribed');
asyncIterator.return();
expect(state).to.equal('unsubscribed');
});

it('should unsubscribe from the source observable if `throw` is called on the generator returned by Symbol.asyncIterator', async () => {
let state = 'idle';
const source = new Observable<number>((subscriber) => {
state = 'subscribed';
subscriber.next(0)
return () => {
state = 'unsubscribed';
}
});

const asyncIterator = source[Symbol.asyncIterator]();
expect(state).to.equal('idle');
await asyncIterator.next();
expect(state).to.equal('subscribed');
try {
await asyncIterator.throw(new Error('wee!'));
} catch (err: any) {
expect(err.message).to.be('wee!');
}
expect(state).to.equal('unsubscribed');
});
});
});
119 changes: 119 additions & 0 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -392,4 +392,123 @@ export class Observable<T> implements Subscribable<T> {
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
return pipeFromArray(operations)(this);
}

/**
* Observable is async iterable, so it can be used in `for await` loop. This method
* of subscription is cancellable by breaking the for await loop. Although it's not
* recommended to use Observable's AsyncIterable contract outside of `for await`, if
* you're consuming the Observable as an AsyncIterable, and you're _not_ using `for await`,
* you can use the `throw` or `return` methods on the `AsyncGenerator` we return to
* cancel the subscription. Note that the subscription to the observable does not start
* until the first value is requested from the AsyncIterable.
*
* Functionally, this is equivalent to using a {@link concatMap} with an `async` function.
* That means that while the body of the `for await` loop is executing, any values that arrive
* from the observable source will be queued up, so they can be processed by the `for await`
* loop in order. So, like {@link concatMap} it's important to understand the speed your
* source emits at, and the speed of the body of your `for await` loop.
*
* ## Example
*
* ```ts
* import { interval } from 'rxjs';
*
* async function main() {
* // Subscribe to the observable using for await.
* for await (const value of interval(1000)) {
* console.log(value);
*
* if (value > 5) {
* // Unsubscribe from the interval if we get a value greater than 5
* break;
* }
* }
* }
*
* main();
* ```
*/
[Symbol.asyncIterator](): AsyncGenerator<T, void, void> {
let subscription: Subscription | undefined;
let hasError = false;
let error: any;
let completed = false;
const values: T[] = [];
const deferreds: [(value: any) => void, (reason: any) => void][] = [];

const handleError = (err: any) => {
hasError = true;
error = err;
while (deferreds.length) {
deferreds.shift()![1](err);
}
};

const handleComplete = () => {
completed = true;
while (deferreds.length) {
deferreds.shift()![0]({ value: undefined, done: true });
}
};

return {
next: (): Promise<IteratorResult<T>> => {
if (!subscription) {
// We only want to start the subscription when the user starts iterating.
subscription = this.subscribe({
next: (value) => {
if (deferreds.length) {
deferreds.shift()![0]({ value, done: false });
} else {
values.push(value);
}
},
error: handleError,
complete: handleComplete,
});
}

// If we already have some values in our buffer, we'll return the next one.
if (values.length) {
return Promise.resolve({ value: values.shift()!, done: false });
}

// This was already completed, so we're just going to return a done result.
if (completed) {
return Promise.resolve({ value: undefined, done: true });
}

// There was an error, so we're going to return an error result.
if (hasError) {
return Promise.reject(error);
}

// Otherwise, we need to make them wait for a value.
return new Promise((resolve, reject) => {
deferreds.push([resolve, reject]);
});
},
throw: (err): Promise<IteratorResult<T>> => {
subscription?.unsubscribe();
// NOTE: I did some research on this, and as of Feb 2023, Chrome doesn't seem to do
// anything with pending promises returned from `next()` when `throw()` is called.
// However, for consumption of observables, I don't want RxJS taking the heat for that
// quirk/leak of the type. So we're going to reject all pending promises we've nexted out here.
handleError(err);
return Promise.resolve({ value: undefined, done: true });
},
return: (): Promise<IteratorResult<T>> => {
subscription?.unsubscribe();
// NOTE: I did some research on this, and as of Feb 2023, Chrome doesn't seem to do
// anything with pending promises returned from `next()` when `throw()` is called.
// However, for consumption of observables, I don't want RxJS taking the heat for that
// quirk/leak of the type. So we're going to resolve all pending promises we've nexted out here.
handleComplete();
return Promise.resolve({ value: undefined, done: true });
},
[Symbol.asyncIterator]() {
return this;
},
};
}
}

0 comments on commit e34aa5e

Please sign in to comment.