diff --git a/packages/rxjs/spec/operators/every-spec.ts b/packages/rxjs/spec/operators/every-spec.ts index 5c4bacdca1..a758f53e46 100644 --- a/packages/rxjs/spec/operators/every-spec.ts +++ b/packages/rxjs/spec/operators/every-spec.ts @@ -2,7 +2,7 @@ import { expect } from 'chai'; import { every, mergeMap } from 'rxjs/operators'; import { TestScheduler } from 'rxjs/testing'; import type { Observer } from 'rxjs'; -import { of, Observable } from 'rxjs'; +import { of, Observable, Subject } from 'rxjs'; import { observableMatcher } from '../helpers/observableMatcher'; /** @test {every} */ @@ -301,4 +301,24 @@ describe('every', () => { expectSubscriptions(e1.subscriptions).toBe(e1subs); }); }); + + it('should handle reentrancy properly', () => { + const subject = new Subject(); + const results: any[] = []; + let n = 0; + + subject.pipe(every(() => false)).subscribe({ + next: (result) => { + results.push(result); + if (n < 3) { + subject.next(n++); + } + }, + complete: () => results.push('done'), + }); + + subject.next(n); + + expect(results).to.deep.equal([false, 'done']); + }); }); diff --git a/packages/rxjs/src/internal/operators/every.ts b/packages/rxjs/src/internal/operators/every.ts index b7a01728f6..311c215097 100644 --- a/packages/rxjs/src/internal/operators/every.ts +++ b/packages/rxjs/src/internal/operators/every.ts @@ -33,20 +33,25 @@ export function every(predicate: (value: T, index: number) => boolean): Opera return (source) => new Observable((destination) => { let index = 0; - source.subscribe( - operate({ - destination, - next: (value) => { - if (!predicate(value, index++)) { - destination.next(false); - destination.complete(); - } - }, - complete: () => { - destination.next(true); + + const subscriber = operate({ + destination, + next: (value: T) => { + if (!predicate(value, index++)) { + // To prevent re-entrancy issues, we unsubscribe from the + // source as soon as possible. Because the `next` right below it + // could cause us to re-enter before we get to `complete()`. + subscriber.unsubscribe(); + destination.next(false); destination.complete(); - }, - }) - ); + } + }, + complete: () => { + destination.next(true); + destination.complete(); + }, + }); + + source.subscribe(subscriber); }); }