Skip to content

Commit

Permalink
fix: exhaustAll use exhaustMap to handle synchronous completion of in…
Browse files Browse the repository at this point in the history
…ner Observable (#6911)

Fixes: #6910
  • Loading branch information
ajafff authored Mar 28, 2022
1 parent 15ca4a7 commit 3c1c6b8
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 27 deletions.
13 changes: 13 additions & 0 deletions spec/operators/exhaustAll-spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,4 +287,17 @@ describe('exhaust', () => {

expect(sideEffects).to.deep.equal([0, 1, 2]);
});

it('should handle synchronously completing inner observables', (done) => {
let i = 1;
of(of(1), of(2))
.pipe(exhaustAll())
.subscribe({
next: (v) => expect(v).to.equal(i++),
complete: () => {
expect(i).to.equal(3);
done();
},
});
});
});
30 changes: 3 additions & 27 deletions src/internal/operators/exhaustAll.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
import { Subscription } from '../Subscription';
import { OperatorFunction, ObservableInput, ObservedValueOf } from '../types';
import { operate } from '../util/lift';
import { innerFrom } from '../observable/innerFrom';
import { createOperatorSubscriber } from './OperatorSubscriber';
import { exhaustMap } from './exhaustMap';
import { identity } from '../util/identity';

/**
* Converts a higher-order Observable into a first-order Observable by dropping
Expand Down Expand Up @@ -49,27 +47,5 @@ import { createOperatorSubscriber } from './OperatorSubscriber';
* completes before subscribing to the next.
*/
export function exhaustAll<O extends ObservableInput<any>>(): OperatorFunction<O, ObservedValueOf<O>> {
return operate((source, subscriber) => {
let isComplete = false;
let innerSub: Subscription | null = null;
source.subscribe(
createOperatorSubscriber(
subscriber,
(inner) => {
if (!innerSub) {
innerSub = innerFrom(inner).subscribe(
createOperatorSubscriber(subscriber, undefined, () => {
innerSub = null;
isComplete && subscriber.complete();
})
);
}
},
() => {
isComplete = true;
!innerSub && subscriber.complete();
}
)
);
});
return exhaustMap(identity);
}

0 comments on commit 3c1c6b8

Please sign in to comment.