Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(component-store): add effect #2544

Merged
merged 1 commit into from
Jun 1, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 191 additions & 2 deletions modules/component-store/spec/component-store.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,21 @@
import { ComponentStore } from '@ngrx/component-store';
import { fakeSchedulers, marbles } from 'rxjs-marbles/jest';
import { of, Subscription, ConnectableObservable, interval, timer } from 'rxjs';
import { delayWhen, publishReplay, take, map } from 'rxjs/operators';
import {
of,
Subscription,
ConnectableObservable,
interval,
timer,
Observable,
} from 'rxjs';
import {
delayWhen,
publishReplay,
take,
map,
tap,
finalize,
} from 'rxjs/operators';

describe('Component Store', () => {
describe('initialization', () => {
@@ -627,4 +641,179 @@ describe('Component Store', () => {
componentStore.ngOnDestroy();
});
});

describe('effect', () => {
let componentStore: ComponentStore<object>;

beforeEach(() => {
componentStore = new ComponentStore<object>();
});

it(
'is run when value is provided',
marbles(m => {
const results: string[] = [];
const mockGenerator = jest.fn((origin$: Observable<string>) =>
origin$.pipe(tap(v => results.push(v)))
);
const effect = componentStore.effect(mockGenerator);
effect('value 1');
effect('value 2');

expect(results).toEqual(['value 1', 'value 2']);
})
);

it(
'is run when undefined value is provided',
marbles(m => {
const results: string[] = [];
const mockGenerator = jest.fn((origin$: Observable<undefined>) =>
origin$.pipe(tap(v => results.push(typeof v)))
);
const effect = componentStore.effect(mockGenerator);
effect(undefined);
effect();

expect(results).toEqual(['undefined', 'undefined']);
})
);

it(
'is run when observable is provided',
marbles(m => {
const mockGenerator = jest.fn(origin$ => origin$);
const effect = componentStore.effect(mockGenerator);

effect(m.cold('-a-b-c|'));

m.expect(mockGenerator.mock.calls[0][0]).toBeObservable(
m.hot(' -a-b-c-')
);
})
);
it(
'is run with multiple Observables',
marbles(m => {
const mockGenerator = jest.fn(origin$ => origin$);
const effect = componentStore.effect(mockGenerator);

effect(m.cold('-a-b-c|'));
effect(m.hot(' --d--e----f-'));

m.expect(mockGenerator.mock.calls[0][0]).toBeObservable(
m.hot(' -adb-(ce)-f-')
);
})
);

describe('cancels effect Observable', () => {
beforeEach(() => jest.useFakeTimers());
it(
'by unsubscribing with returned Subscription',
fakeSchedulers(advance => {
const results: string[] = [];
const effect = componentStore.effect((origin$: Observable<string>) =>
origin$.pipe(tap(v => results.push(v)))
);

const observable$ = interval(10).pipe(
map(v => String(v)),
take(10) // just in case
);

// Update with Observable.
const subsription = effect(observable$);

// Advance for 40 fake milliseconds and unsubscribe - should capture
// from '0' to '3'
advance(40);
subsription.unsubscribe();

// Advance for 20 more fake milliseconds, to check if anything else
// is captured
advance(20);

expect(results).toEqual(['0', '1', '2', '3']);
})
);
it(
'could be unsubscribed from the specific Observable when multiple' +
' are provided',
fakeSchedulers(advance => {
// Record all the values that go through state$ into an array
const results: Array<{ value: string }> = [];
const effect = componentStore.effect(
(origin$: Observable<{ value: string }>) =>
origin$.pipe(tap(v => results.push(v)))
);

// Pass the first Observable to the effect.
const subsription = effect(
interval(10).pipe(
map(v => ({ value: 'a' + v })),
take(10) // just in case
)
);

// Pass the second Observable that pushes values to effect
effect(
timer(15, 10).pipe(
map(v => ({ value: 'b' + v })),
take(10)
)
);

// Advance for 40 fake milliseconds and unsubscribe - should capture
// from '0' to '3'
advance(40);
subsription.unsubscribe();

// Advance for 30 more fake milliseconds, to make sure that second
// Observable still emits
advance(30);

expect(results).toEqual([
{ value: 'a0' },
{ value: 'b0' },
{ value: 'a1' },
{ value: 'b1' },
{ value: 'a2' },
{ value: 'b2' },
{ value: 'a3' },
{ value: 'b3' },
{ value: 'b4' },
{ value: 'b5' }, // second Observable continues to emit values
]);
})
);

it('completes when componentStore is destroyed', (doneFn: jest.DoneCallback) => {
componentStore.effect(origin$ =>
origin$.pipe(
finalize(() => {
doneFn();
})
)
)(interval(10));

setTimeout(() => componentStore.ngOnDestroy(), 20);
jest.advanceTimersByTime(20);
});

it('observable argument completes when componentStore is destroyed', (doneFn: jest.DoneCallback) => {
componentStore.effect(origin$ => origin$)(
interval(10).pipe(
finalize(() => {
doneFn();
})
)
);

setTimeout(() => componentStore.ngOnDestroy(), 20);

jest.advanceTimersByTime(20);
});
});
});
});
39 changes: 39 additions & 0 deletions modules/component-store/src/component-store.ts
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@ import {
Subscription,
throwError,
combineLatest,
Subject,
} from 'rxjs';
import {
concatMap,
@@ -17,6 +18,15 @@ import {
} from 'rxjs/operators';
import { debounceSync } from './debounceSync';

/**
* Return type of the effect, that behaves differently based on whether the
* argument is passed to the callback.
*/
interface EffectReturnFn<T> {
(): void;
(t: T | Observable<T>): Subscription;
}

export class ComponentStore<T extends object> {
// Should be used only in ngOnDestroy.
private readonly destroySubject$ = new ReplaySubject<void>(1);
@@ -178,4 +188,33 @@ export class ComponentStore<T extends object> {
);
return distinctSharedObservable$;
}

/**
* Creates an effect.
*
* This effect is subscribed to for the life of the @Component.
* @param generator A function that takes an origin Observable input and
* returns an Observable. The Observable that is returned will be
* subscribed to for the life of the component.
* @return A function that, when called, will trigger the origin Observable.
*/
effect<V, R = unknown>(
generator: (origin$: Observable<V>) => Observable<R>
): EffectReturnFn<V> {
const origin$ = new Subject<V>();
generator(origin$)
// tied to the lifecycle 👇 of ComponentStore
.pipe(takeUntil(this.destroy$))
.subscribe();

return (observableOrValue?: V | Observable<V>): Subscription => {
const observable$ = isObservable(observableOrValue)
? observableOrValue
: of(observableOrValue);
return observable$.pipe(takeUntil(this.destroy$)).subscribe(value => {
// any new 👇 value is pushed into a stream
origin$.next(value);
});
};
}
}