diff --git a/modules/effects/spec/effect_sources.spec.ts b/modules/effects/spec/effect_sources.spec.ts index 1f9627f445..1661a6e91c 100644 --- a/modules/effects/spec/effect_sources.spec.ts +++ b/modules/effects/spec/effect_sources.spec.ts @@ -1,8 +1,10 @@ import 'rxjs/add/operator/concat'; import 'rxjs/add/operator/catch'; -import { cold } from 'jasmine-marbles'; +import 'rxjs/add/operator/map'; +import { cold, getTestScheduler } from 'jasmine-marbles'; import { Observable } from 'rxjs/Observable'; import { of } from 'rxjs/observable/of'; +import { timer } from 'rxjs/observable/timer'; import { _throw } from 'rxjs/observable/throw'; import { never } from 'rxjs/observable/never'; import { empty } from 'rxjs/observable/empty'; @@ -69,6 +71,11 @@ describe('EffectSources', () => { @Effect() e$ = _throw(error); } + class SourceG { + @Effect() empty = of('value'); + @Effect() never = timer(50, getTestScheduler()).map(() => 'update'); + } + it('should resolve effects from instances', () => { const sources$ = cold('--a--', { a: new SourceA() }); const expected = cold('--a--', { a }); @@ -99,6 +106,17 @@ describe('EffectSources', () => { expect(mockErrorReporter.report).toHaveBeenCalled(); }); + it('should not complete the group if just one effect completes', () => { + const sources$ = cold('g', { + g: new SourceG(), + }); + const expected = cold('a----b-----', { a: 'value', b: 'update' }); + + const output = toActions(sources$); + + expect(output).toBeObservable(expected); + }); + function toActions(source: any): Observable { source['errorReporter'] = mockErrorReporter; return effectSources.toActions.call(source); diff --git a/modules/effects/src/effect_sources.ts b/modules/effects/src/effect_sources.ts index 3991bbb22e..0ec6037db8 100644 --- a/modules/effects/src/effect_sources.ts +++ b/modules/effects/src/effect_sources.ts @@ -3,9 +3,11 @@ import { mergeMap } from 'rxjs/operator/mergeMap'; import { exhaustMap } from 'rxjs/operator/exhaustMap'; import { map } from 'rxjs/operator/map'; import { dematerialize } from 'rxjs/operator/dematerialize'; +import { filter } from 'rxjs/operator/filter'; import { concat } from 'rxjs/observable/concat'; import { Observable } from 'rxjs/Observable'; import { Subject } from 'rxjs/Subject'; +import { Notification } from 'rxjs/Notification'; import { Injectable } from '@angular/core'; import { Action } from '@ngrx/store'; import { EffectNotification, verifyOutput } from './effect_notification'; @@ -31,13 +33,16 @@ export class EffectSources extends Subject { groupBy.call(this, getSourceForInstance), (source$: GroupedObservable) => dematerialize.call( - map.call( - exhaustMap.call(source$, resolveEffectSource), - (output: EffectNotification) => { - verifyOutput(output, this.errorReporter); + filter.call( + map.call( + exhaustMap.call(source$, resolveEffectSource), + (output: EffectNotification) => { + verifyOutput(output, this.errorReporter); - return output.notification; - } + return output.notification; + } + ), + (notification: Notification) => notification.kind === 'N' ) ) );