Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Observable): now implements Symbol.asyncIterator #7189

Merged
merged 3 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
250 changes: 249 additions & 1 deletion spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Observable, config, Subscription, Subscriber, Operator, NEVER, Subject,
import { map, filter, count, tap, combineLatestWith, concatWith, mergeWith, raceWith, zipWith, catchError, share} from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';
import { result } from 'lodash';
benlesh marked this conversation as resolved.
Show resolved Hide resolved

function expectFullObserver(val: any) {
expect(val).to.be.a('object');
Expand Down Expand Up @@ -690,5 +691,252 @@ describe('Observable', () => {
expect(thrownError).to.be.an.instanceOf(RangeError);
expect(thrownError.message).to.equal('Maximum call stack size exceeded');
});
});


describe('As an async iterable', () => {
it('should be able to be used with for-await-of', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

const results: number[] = [];
for await (const value of source) {
results.push(value);
}

expect(results).to.deep.equal([1, 2, 3]);
});

it('should unsubscribe if the for-await-of loop is broken', async () => {
let activeSubscriptions = 0;

const source = new Observable<number>((subscriber) => {
activeSubscriptions++;

subscriber.next(1);
subscriber.next(2);

// NOTE that we are NOT calling `subscriber.complete()` here.
// therefore the teardown below would never be called naturally
// by the observable unless it was unsubscribed.
return () => {
activeSubscriptions--;
}
});

const results: number[] = [];
for await (const value of source) {
results.push(value);
break;
}

expect(results).to.deep.equal([1]);
expect(activeSubscriptions).to.equal(0);
});

it('should unsubscribe if the for-await-of loop is broken with a thrown error', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

const results: number[] = [];

try {
for await (const value of source) {
results.push(value);
throw new Error('wee')
}
} catch {
// Ignore
}

expect(results).to.deep.equal([1]);
});

it('should cause the async iterator to throw if the observable errors', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.error(new Error('wee'));
});

const results: number[] = [];
let thrownError: any;

try {
for await (const value of source) {
results.push(value);
}
} catch (err: any) {
thrownError = err;
}

expect(thrownError?.message).to.equal('wee')
expect(results).to.deep.equal([1, 2]);
});

it('should handle situations where many promises are nexted out of the async iterator, but not awaited', async () => {
const subject = new Subject<number>();

const results: any[] = [];

const asyncIterator = subject[Symbol.asyncIterator]();

// Queue up three promises, but don't await them.
const first = asyncIterator.next().then((result) => {
results.push(result.value);
});

const second = asyncIterator.next().then((result) => {
results.push(result.value);
});

const third = asyncIterator.next().then((result) => {
results.push(result.value);
});

// Now let's progressively supply values to the promises.
expect(results).to.deep.equal([]);

subject.next(1);
await first;
expect(results).to.deep.equal([1]);

subject.next(2);
await second;
expect(results).to.deep.equal([1, 2]);

subject.next(3);
await third;
expect(results).to.deep.equal([1, 2, 3]);
});

it ('should handle situations where values from the observable are arriving faster than the are being consumed by the async iterator', async () => {
const subject = new Subject<number>();

const results: any[] = [];

const asyncIterator = subject[Symbol.asyncIterator]();

// start the subscription
const first = asyncIterator.next().then((result) => {
results.push(result.value);
});
subject.next(1);
await first
expect(results).to.deep.equal([1]);

// push values through the observable that aren't yet consumed by the async iterator
subject.next(2);
subject.next(3);

// now consume the values that were pushed through the observable
results.push((await asyncIterator.next()).value);
expect(results).to.deep.equal([1, 2]);

results.push((await asyncIterator.next()).value);
expect(results).to.deep.equal([1, 2, 3]);
});

it('should resolve all pending promises from the async iterable if the observable completes', async () => {
const subject = new Subject<number>();

const results: any[] = [];

const asyncIterator = subject[Symbol.asyncIterator]();

// Queue up three promises, but don't await them.
const allPending = Promise.all([
asyncIterator.next(),
asyncIterator.next(),
asyncIterator.next(),
]).then((allResults) => {
results.push(...allResults)
})

expect(results).to.deep.equal([]);

// Complete and make sure those promises are resolved.
subject.complete();
await allPending;
expect(results).to.deep.equal([
{ value: undefined, done: true },
{ value: undefined, done: true },
{ value: undefined, done: true },
]);
});

it('should reject all pending promises from the async iterable if the observable errors', async () => {
const subject = new Subject<number>();

const results: any[] = [];

const asyncIterator = subject[Symbol.asyncIterator]();

// Queue up three promises, but don't await them.
const allPending = Promise.all([
asyncIterator.next().catch((err: any) => results.push(err)),
asyncIterator.next().catch((err: any) => results.push(err)),
asyncIterator.next().catch((err: any) => results.push(err)),
])

expect(results).to.deep.equal([]);

// Complete and make sure those promises are resolved.
subject.error(new Error('wee'));
await allPending;
expect(results.length).to.equal(3);
expect(results[0]).to.be.an.instanceof(Error);
expect(results[0].message).to.equal('wee');
expect(results[1]).to.be.an.instanceOf(Error);
expect(results[1].message).to.equal('wee');
expect(results[2]).to.be.an.instanceOf(Error);
expect(results[2].message).to.equal('wee');
});

it('should unsubscribe from the source observable if `return` is called on the generator returned by Symbol.asyncIterator', async () => {
let state = 'idle';
const source = new Observable<number>((subscriber) => {
state = 'subscribed';
return () => {
state = 'unsubscribed';
}
});

const asyncIterator = source[Symbol.asyncIterator]();
expect(state).to.equal('idle');
asyncIterator.next();
expect(state).to.equal('subscribed');
asyncIterator.return();
expect(state).to.equal('unsubscribed');
});

it('should unsubscribe from the source observable if `throw` is called on the generator returned by Symbol.asyncIterator', async () => {
let state = 'idle';
const source = new Observable<number>((subscriber) => {
state = 'subscribed';
subscriber.next(0)
return () => {
state = 'unsubscribed';
}
});

const asyncIterator = source[Symbol.asyncIterator]();
expect(state).to.equal('idle');
await asyncIterator.next();
expect(state).to.equal('subscribed');
try {
await asyncIterator.throw(new Error('wee!'));
} catch (err: any) {
expect(err.message).to.equal('wee!');
}
expect(state).to.equal('unsubscribed');
});
});
});
122 changes: 122 additions & 0 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,126 @@ export class Observable<T> implements Subscribable<T> {
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
return pipeFromArray(operations)(this);
}

/**
* Observable is async iterable, so it can be used in `for await` loop. This method
* of subscription is cancellable by breaking the for await loop. Although it's not
* recommended to use Observable's AsyncIterable contract outside of `for await`, if
* you're consuming the Observable as an AsyncIterable, and you're _not_ using `for await`,
* you can use the `throw` or `return` methods on the `AsyncGenerator` we return to
* cancel the subscription. Note that the subscription to the observable does not start
* until the first value is requested from the AsyncIterable.
*
* Functionally, this is equivalent to using a {@link concatMap} with an `async` function.
* That means that while the body of the `for await` loop is executing, any values that arrive
* from the observable source will be queued up, so they can be processed by the `for await`
* loop in order. So, like {@link concatMap} it's important to understand the speed your
* source emits at, and the speed of the body of your `for await` loop.
*
* ## Example
*
* ```ts
* import { interval } from 'rxjs';
*
* async function main() {
* // Subscribe to the observable using for await.
* for await (const value of interval(1000)) {
* console.log(value);
*
* if (value > 5) {
* // Unsubscribe from the interval if we get a value greater than 5
* break;
* }
* }
* }
*
* main();
* ```
*/
[Symbol.asyncIterator](): AsyncGenerator<T, void, void> {
let subscription: Subscription | undefined;
let hasError = false;
let error: unknown;
let completed = false;
const values: T[] = [];
const deferreds: [(value: IteratorResult<T>) => void, (reason: unknown) => void][] = [];

const handleError = (err: unknown) => {
hasError = true;
error = err;
while (deferreds.length) {
const [_, reject] = deferreds.shift()!;
reject(err);
}
};

const handleComplete = () => {
completed = true;
while (deferreds.length) {
const [resolve] = deferreds.shift()!;
resolve({ value: undefined, done: true });
}
};

return {
benlesh marked this conversation as resolved.
Show resolved Hide resolved
next: (): Promise<IteratorResult<T>> => {
if (!subscription) {
// We only want to start the subscription when the user starts iterating.
subscription = this.subscribe({
next: (value) => {
if (deferreds.length) {
const [resolve] = deferreds.shift()!;
resolve({ value, done: false });
} else {
values.push(value);
}
},
error: handleError,
complete: handleComplete,
});
}

// If we already have some values in our buffer, we'll return the next one.
if (values.length) {
return Promise.resolve({ value: values.shift()!, done: false });
benlesh marked this conversation as resolved.
Show resolved Hide resolved
}

// This was already completed, so we're just going to return a done result.
if (completed) {
return Promise.resolve({ value: undefined, done: true });
}

// There was an error, so we're going to return an error result.
if (hasError) {
return Promise.reject(error);
}

// Otherwise, we need to make them wait for a value.
return new Promise((resolve, reject) => {
deferreds.push([resolve, reject]);
});
},
throw: (err): Promise<IteratorResult<T>> => {
subscription?.unsubscribe();
// NOTE: I did some research on this, and as of Feb 2023, Chrome doesn't seem to do
// anything with pending promises returned from `next()` when `throw()` is called.
// However, for consumption of observables, I don't want RxJS taking the heat for that
// quirk/leak of the type. So we're going to reject all pending promises we've nexted out here.
handleError(err);
return Promise.reject(err);
},
return: (): Promise<IteratorResult<T>> => {
subscription?.unsubscribe();
// NOTE: I did some research on this, and as of Feb 2023, Chrome doesn't seem to do
// anything with pending promises returned from `next()` when `throw()` is called.
// However, for consumption of observables, I don't want RxJS taking the heat for that
// quirk/leak of the type. So we're going to resolve all pending promises we've nexted out here.
handleComplete();
return Promise.resolve({ value: undefined, done: true });
},
Comment on lines +449 to +466
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL
I didn't know about this part of the iteration protocol. This is great!

[Symbol.asyncIterator]() {
return this;
},
};
}
}