Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(Observable): now implements Symbol.asyncIterator #7189

Merged
merged 3 commits into from
Apr 24, 2023

Conversation

benlesh
Copy link
Member

@benlesh benlesh commented Feb 17, 2023

Adds async iterator support to Observable!

#6779
#6857

@benlesh benlesh requested review from jpark011 and demensky and removed request for jpark011 and demensky March 1, 2023 01:58
@demensky
Copy link
Contributor

demensky commented Mar 1, 2023

I suggest to play it safe and do it first as a regular method, and after a while do it with Symbol.asyncIterator. So there will be additional time to decide whether this was the optimal solution.

For example, the current implementation will trigger a microtask on each event, even if we already have events stored in the queue.

In streams, we usually work not with individual items, but with their chunks AsyncGenerator<T[], void, void>. Here, too, a similar approach suggests itself, returning an array of accumulated events.

for await (const events of source$) {
   console.log(events); // event accumulated between iterations
}

At the same time, nothing will interfere with us to work with each element separately. At the same time, it will not be unnecessary to provoke microtasks (and parasite Promise).

for await (const events of source$) {
  for (const event of events) {
     console.log(event);
  }
}

I hope that I was able to convey my thought correctly. Sorry if I wrote inaudibly somewhere. English is not my bearing language.

@benlesh
Copy link
Member Author

benlesh commented Mar 7, 2023

@demensky The library https://github.com/benlesh/rxjs-for-await as existed for quite some time. Every single use of that library I've found uses eachValueFrom. There's probably an argument to be made for pulling the rest of that library in here... however, given that the behavior here is nearly identical to concatMap, and people don't often trip over that, the core team has decided that this is the proper route.

@benlesh benlesh added AGENDA ITEM Flagged for discussion at core team meetings 8.x Issues and PRs for version 8.x labels Mar 7, 2023
Copy link

@ktrz ktrz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! I just left one question regarding the tests

Comment on lines 994 to 728
it('should unsubscribe if the for-await-of loop is broken', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

const results: number[] = [];
for await (const value of source) {
results.push(value);
break;
}

expect(results).to.deep.equal([1]);
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Either a test name is misleading here or the unsubscription is not tested here. Are we expecting the subscription to be unsubscribed earlier if we break out of the loop?

Based on the test name I would imagine something like this:

    it('should unsubscribe if the for-await-of loop is broken', async () => {
      let state = 'idle';
      const source = new Observable<number>((subscriber) => {
        subscriber.next(1);
        setTimeout(() => subscriber.next(2), 100);
        setTimeout(() => subscriber.next(3), 200);
        setTimeout(() => subscriber.complete(), 500);

        return () => {
          state = 'unsubscribed'
        }
      });

      expect(state).to.equal('idle')

      const results: number[] = [];
      for await (const value of source) {
        results.push(value);
        break;
      }

      expect(state).to.equal('unsubscribed')
      expect(results).to.deep.equal([1]);
    });

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump, as I see what they're saying. Same comment for the other unsubscribe test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll change this one around to test an active subscription count.

Comment on lines +491 to +463
throw: (err): Promise<IteratorResult<T>> => {
subscription?.unsubscribe();
// NOTE: I did some research on this, and as of Feb 2023, Chrome doesn't seem to do
// anything with pending promises returned from `next()` when `throw()` is called.
// However, for consumption of observables, I don't want RxJS taking the heat for that
// quirk/leak of the type. So we're going to reject all pending promises we've nexted out here.
handleError(err);
return Promise.reject(err);
},
return: (): Promise<IteratorResult<T>> => {
subscription?.unsubscribe();
// NOTE: I did some research on this, and as of Feb 2023, Chrome doesn't seem to do
// anything with pending promises returned from `next()` when `throw()` is called.
// However, for consumption of observables, I don't want RxJS taking the heat for that
// quirk/leak of the type. So we're going to resolve all pending promises we've nexted out here.
handleComplete();
return Promise.resolve({ value: undefined, done: true });
},
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TIL
I didn't know about this part of the iteration protocol. This is great!

@benlesh
Copy link
Member Author

benlesh commented Mar 22, 2023

CORE TEAM: Approval.

@benlesh benlesh removed the AGENDA ITEM Flagged for discussion at core team meetings label Mar 22, 2023
Copy link
Member

@jayphelps jayphelps left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally good, but some points that I at least wanted to run by you.

Comment on lines 994 to 728
it('should unsubscribe if the for-await-of loop is broken', async () => {
const source = new Observable<number>((subscriber) => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});

const results: number[] = [];
for await (const value of source) {
results.push(value);
break;
}

expect(results).to.deep.equal([1]);
});
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump, as I see what they're saying. Same comment for the other unsubscribe test.

results.push(value);
}
} catch (err: any) {
expect(err.message).to.equal('wee');
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since chai doesn't have a way to assert the number of expected assertions (like jest does) I think it's generally a good idea to try to avoid assertions that rely on a condition that the test itself is trying to verify. e.g. if it stops throwing any error at all, this test would still pass. It would only fail if it does throw an error but that error doesn't have { message: 'wee' }.

Something like:

      let expectedError: any;
      try {
        for await (const value of source) {
          results.push(value);
        }
      } catch (err: any) {
        expectedError = err;
      }
      
      expect(expectedError.message).to.equal('wee');

subscription = this.subscribe({
next: (value) => {
if (deferreds.length) {
deferreds.shift()![0]({ value, done: false });
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this code is "fine" but I think it would be much clearer for someone who isn't super-super familiar with how all this works to use an intermediate variable for the resolve, and maybe even an object with field names instead of an array tuple.

e.g.

const { resolve } = deferreds.shift()!;
resolve({ value, done: false });

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A solid suggestion. Implemented.

@benlesh benlesh requested review from ktrz and jayphelps April 5, 2023 20:00
Copy link
Member

@jakovljevic-mladen jakovljevic-mladen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, only minor types issues that could be considered fixing.

src/internal/Observable.ts Outdated Show resolved Hide resolved
src/internal/Observable.ts Outdated Show resolved Hide resolved
src/internal/Observable.ts Outdated Show resolved Hide resolved
@benlesh benlesh merged commit 256ab5c into ReactiveX:master Apr 24, 2023
@benlesh benlesh deleted the Observable-asyncIterator branch April 24, 2023 21:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
8.x Issues and PRs for version 8.x
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants