Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Observable): now implements Symbol.asyncIterator #7189

Merged
merged 3 commits into from
Apr 24, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
238 changes: 237 additions & 1 deletion spec/Observable-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Observable, config, Subscription, Subscriber, Operator, NEVER, Subject,
import { map, filter, count, tap, combineLatestWith, concatWith, mergeWith, raceWith, zipWith, catchError, share} from 'rxjs/operators';
import { TestScheduler } from 'rxjs/testing';
import { observableMatcher } from './helpers/observableMatcher';
import { result } from 'lodash';
benlesh marked this conversation as resolved.
Show resolved Hide resolved

function expectFullObserver(val: any) {
expect(val).to.be.a('object');
Expand Down Expand Up @@ -690,5 +691,240 @@ describe('Observable', () => {
expect(thrownError).to.be.an.instanceOf(RangeError);
expect(thrownError.message).to.equal('Maximum call stack size exceeded');
});
});


describe('As an async iterable', () => {
it('should be able to be used with for-await-of', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

const results: number[] = [];
for await (const value of source) {
results.push(value);
}

expect(results).to.deep.equal([1, 2, 3]);
});

it('should unsubscribe if the for-await-of loop is broken', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

const results: number[] = [];
for await (const value of source) {
results.push(value);
break;
}

expect(results).to.deep.equal([1]);
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either a test name is misleading here or the unsubscription is not tested here. Are we expecting the subscription to be unsubscribed earlier if we break out of the loop?

Based on the test name I would imagine something like this:

    it('should unsubscribe if the for-await-of loop is broken', async () => {
      let state = 'idle';
      const source = new Observable<number>((subscriber) => {
        subscriber.next(1);
        setTimeout(() => subscriber.next(2), 100);
        setTimeout(() => subscriber.next(3), 200);
        setTimeout(() => subscriber.complete(), 500);

        return () => {
          state = 'unsubscribed'
        }
      });

      expect(state).to.equal('idle')

      const results: number[] = [];
      for await (const value of source) {
        results.push(value);
        break;
      }

      expect(state).to.equal('unsubscribed')
      expect(results).to.deep.equal([1]);
    });

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump, as I see what they're saying. Same comment for the other unsubscribe test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change this one around to test an active subscription count.


it('should unsubscribe if the for-await-of loop is broken with a thrown error', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

const results: number[] = [];

try {
for await (const value of source) {
results.push(value);
throw new Error('wee')
}
} catch {
// Ignore
}

expect(results).to.deep.equal([1]);
});

it('should cause the async iterator to throw if the observable errors', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.error(new Error('wee'));
});

const results: number[] = [];

try {
for await (const value of source) {
results.push(value);
}
} catch (err: any) {
expect(err.message).to.equal('wee');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since chai doesn't have a way to assert the number of expected assertions (like jest does) I think it's generally a good idea to try to avoid assertions that rely on a condition that the test itself is trying to verify. e.g. if it stops throwing any error at all, this test would still pass. It would only fail if it does throw an error but that error doesn't have { message: 'wee' }.

Something like:

      let expectedError: any;
      try {
        for await (const value of source) {
          results.push(value);
        }
      } catch (err: any) {
        expectedError = err;
      }
      
      expect(expectedError.message).to.equal('wee');

}

expect(results).to.deep.equal([1, 2]);
});

it('should handle situations where many promises are nexted out of the async iterator, but not awaited', async () => {
const subject = new Subject<number>();

const results: any[] = [];

const asyncIterator = subject[Symbol.asyncIterator]();

// Queue up three promises, but don't await them.
const first = asyncIterator.next().then((result) => {
results.push(result.value);
});

const second = asyncIterator.next().then((result) => {
results.push(result.value);
});

const third = asyncIterator.next().then((result) => {
results.push(result.value);
});

// Now let's progressively supply values to the promises.
expect(results).to.deep.equal([]);

subject.next(1);
await first;
expect(results).to.deep.equal([1]);

subject.next(2);
await second;
expect(results).to.deep.equal([1, 2]);

subject.next(3);
await third;
expect(results).to.deep.equal([1, 2, 3]);
});

it ('should handle situations where values from the observable are arriving faster than the are being consumed by the async iterator', async () => {
const subject = new Subject<number>();

const results: any[] = [];

const asyncIterator = subject[Symbol.asyncIterator]();

// start the subscription
const first = asyncIterator.next().then((result) => {
results.push(result.value);
});
subject.next(1);
await first
expect(results).to.deep.equal([1]);

// push values through the observable that aren't yet consumed by the async iterator
subject.next(2);
subject.next(3);

// now consume the values that were pushed through the observable
results.push((await asyncIterator.next()).value);
expect(results).to.deep.equal([1, 2]);

results.push((await asyncIterator.next()).value);
expect(results).to.deep.equal([1, 2, 3]);
});

it('should resolve all pending promises from the async iterable if the observable completes', async () => {
const subject = new Subject<number>();

const results: any[] = [];

const asyncIterator = subject[Symbol.asyncIterator]();

// Queue up three promises, but don't await them.
const allPending = Promise.all([
asyncIterator.next(),
asyncIterator.next(),
asyncIterator.next(),
]).then((allResults) => {
results.push(...allResults)
})

expect(results).to.deep.equal([]);

// Complete and make sure those promises are resolved.
subject.complete();
await allPending;
expect(results).to.deep.equal([
{ value: undefined, done: true },
{ value: undefined, done: true },
{ value: undefined, done: true },
]);
});

it('should reject all pending promises from the async iterable if the observable errors', async () => {
const subject = new Subject<number>();

const results: any[] = [];

const asyncIterator = subject[Symbol.asyncIterator]();

// Queue up three promises, but don't await them.
const allPending = Promise.all([
asyncIterator.next().catch((err: any) => results.push(err)),
asyncIterator.next().catch((err: any) => results.push(err)),
asyncIterator.next().catch((err: any) => results.push(err)),
])

expect(results).to.deep.equal([]);

// Complete and make sure those promises are resolved.
subject.error(new Error('wee'));
await allPending;
expect(results.length).to.equal(3);
expect(results[0]).to.be.an.instanceof(Error);
expect(results[0].message).to.equal('wee');
expect(results[1]).to.be.an.instanceOf(Error);
expect(results[1].message).to.equal('wee');
expect(results[2]).to.be.an.instanceOf(Error);
expect(results[2].message).to.equal('wee');
});

it('should unsubscribe from the source observable if `return` is called on the generator returned by Symbol.asyncIterator', async () => {
let state = 'idle';
const source = new Observable<number>((subscriber) => {
state = 'subscribed';
return () => {
state = 'unsubscribed';
}
});

const asyncIterator = source[Symbol.asyncIterator]();
expect(state).to.equal('idle');
asyncIterator.next();
expect(state).to.equal('subscribed');
asyncIterator.return();
expect(state).to.equal('unsubscribed');
});

it('should unsubscribe from the source observable if `throw` is called on the generator returned by Symbol.asyncIterator', async () => {
let state = 'idle';
const source = new Observable<number>((subscriber) => {
state = 'subscribed';
subscriber.next(0)
return () => {
state = 'unsubscribed';
}
});

const asyncIterator = source[Symbol.asyncIterator]();
expect(state).to.equal('idle');
await asyncIterator.next();
expect(state).to.equal('subscribed');
try {
await asyncIterator.throw(new Error('wee!'));
} catch (err: any) {
expect(err.message).to.equal('wee!');
}
expect(state).to.equal('unsubscribed');
});
});
});
119 changes: 119 additions & 0 deletions src/internal/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,123 @@ export class Observable<T> implements Subscribable<T> {
pipe(...operations: OperatorFunction<any, any>[]): Observable<any> {
return pipeFromArray(operations)(this);
}

/**
* Observable is async iterable, so it can be used in `for await` loop. This method
* of subscription is cancellable by breaking the for await loop. Although it's not
* recommended to use Observable's AsyncIterable contract outside of `for await`, if
* you're consuming the Observable as an AsyncIterable, and you're _not_ using `for await`,
* you can use the `throw` or `return` methods on the `AsyncGenerator` we return to
* cancel the subscription. Note that the subscription to the observable does not start
* until the first value is requested from the AsyncIterable.
*
* Functionally, this is equivalent to using a {@link concatMap} with an `async` function.
* That means that while the body of the `for await` loop is executing, any values that arrive
* from the observable source will be queued up, so they can be processed by the `for await`
* loop in order. So, like {@link concatMap} it's important to understand the speed your
* source emits at, and the speed of the body of your `for await` loop.
*
* ## Example
*
* ```ts
* import { interval } from 'rxjs';
*
* async function main() {
* // Subscribe to the observable using for await.
* for await (const value of interval(1000)) {
* console.log(value);
*
* if (value > 5) {
* // Unsubscribe from the interval if we get a value greater than 5
* break;
* }
* }
* }
*
* main();
* ```
*/
[Symbol.asyncIterator](): AsyncGenerator<T, void, void> {
let subscription: Subscription | undefined;
let hasError = false;
let error: any;
benlesh marked this conversation as resolved.
Show resolved Hide resolved
let completed = false;
const values: T[] = [];
const deferreds: [(value: any) => void, (reason: any) => void][] = [];
benlesh marked this conversation as resolved.
Show resolved Hide resolved

const handleError = (err: any) => {
benlesh marked this conversation as resolved.
Show resolved Hide resolved
hasError = true;
error = err;
while (deferreds.length) {
deferreds.shift()![1](err);
}
};

const handleComplete = () => {
completed = true;
while (deferreds.length) {
deferreds.shift()![0]({ value: undefined, done: true });
}
};

return {
benlesh marked this conversation as resolved.
Show resolved Hide resolved
next: (): Promise<IteratorResult<T>> => {
if (!subscription) {
// We only want to start the subscription when the user starts iterating.
subscription = this.subscribe({
next: (value) => {
if (deferreds.length) {
deferreds.shift()![0]({ value, done: false });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this code is "fine" but I think it would be much clearer for someone who isn't super-super familiar with how all this works to use an intermediate variable for the resolve, and maybe even an object with field names instead of an array tuple.

e.g.

const { resolve } = deferreds.shift()!;
resolve({ value, done: false });

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A solid suggestion. Implemented.

} else {
values.push(value);
}
},
error: handleError,
complete: handleComplete,
});
}

// If we already have some values in our buffer, we'll return the next one.
if (values.length) {
return Promise.resolve({ value: values.shift()!, done: false });
benlesh marked this conversation as resolved.
Show resolved Hide resolved
}

// This was already completed, so we're just going to return a done result.
if (completed) {
return Promise.resolve({ value: undefined, done: true });
}

// There was an error, so we're going to return an error result.
if (hasError) {
return Promise.reject(error);
}

// Otherwise, we need to make them wait for a value.
return new Promise((resolve, reject) => {
deferreds.push([resolve, reject]);
});
},
throw: (err): Promise<IteratorResult<T>> => {
subscription?.unsubscribe();
// NOTE: I did some research on this, and as of Feb 2023, Chrome doesn't seem to do
// anything with pending promises returned from `next()` when `throw()` is called.
// However, for consumption of observables, I don't want RxJS taking the heat for that
// quirk/leak of the type. So we're going to reject all pending promises we've nexted out here.
handleError(err);
return Promise.reject(err);
},
return: (): Promise<IteratorResult<T>> => {
subscription?.unsubscribe();
// NOTE: I did some research on this, and as of Feb 2023, Chrome doesn't seem to do
// anything with pending promises returned from `next()` when `throw()` is called.
// However, for consumption of observables, I don't want RxJS taking the heat for that
// quirk/leak of the type. So we're going to resolve all pending promises we've nexted out here.
handleComplete();
return Promise.resolve({ value: undefined, done: true });
},
Comment on lines +449 to +466
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL
I didn't know about this part of the iteration protocol. This is great!

[Symbol.asyncIterator]() {
return this;
},
};
}
}