From 441718aae0cfb622e27b80c2ad2b183093209a0e Mon Sep 17 00:00:00 2001 From: Rafaa Ferid Date: Fri, 23 Sep 2022 14:43:50 +0200 Subject: [PATCH] feat: add reduced stream util chore: unit tests for reducedStream chore: refactor reducedStream fix: reducedStream unit test fix: _reducedStream name --- src/index.ts | 1 + src/internal/reducedStream.ts | 40 ++++++++ src/persistentReducedStream.ts | 28 ++---- src/reducedStream.spec.ts | 166 +++++++++++++++++++++++++++++++++ src/reducedStream.ts | 46 +++++++++ 5 files changed, 260 insertions(+), 21 deletions(-) create mode 100644 src/internal/reducedStream.ts create mode 100644 src/reducedStream.spec.ts create mode 100644 src/reducedStream.ts diff --git a/src/index.ts b/src/index.ts index ef394231..3e5b700a 100644 --- a/src/index.ts +++ b/src/index.ts @@ -36,6 +36,7 @@ export { } from './routines'; export { derivedStream } from './derivedStream'; +export { reducedStream } from './reducedStream'; export { persistentDerivedStream } from './persistentDerivedStream'; export { persistentReducedStream } from './persistentReducedStream'; diff --git a/src/internal/reducedStream.ts b/src/internal/reducedStream.ts new file mode 100644 index 00000000..226dd520 --- /dev/null +++ b/src/internal/reducedStream.ts @@ -0,0 +1,40 @@ +import { RegisteredReducer, combineReducers } from '../reducer'; +import { action$ as defaultAction$ } from '../action$'; +import { Observable, Subject } from 'rxjs'; +import { ActionStream } from '../types/helpers'; +import { withNamespace } from '../operators'; + +export type ReducedStreamOptions = { + errorSubject?: Subject; + namespace?: string; + action$?: ActionStream; +}; + +/** + * Creates a simple reduced state stream (without tagging, making sure to have initial state, nor replay) + * + * @param initialState The initial state of the stream + * @param reducers The reducers that build up the stream state + * @param {ReducedStreamOptions} options + * @returns + */ +export const reducedStream = ( + initialState: State, + reducers: RegisteredReducer[], + { + errorSubject, + namespace, + action$ = defaultAction$, + }: ReducedStreamOptions = {} +): Observable => { + const filteredAction$ = + namespace === undefined ? action$ : action$.pipe(withNamespace(namespace)); + + const source$ = filteredAction$.pipe( + combineReducers(initialState, reducers, { + errorSubject: errorSubject, + namespace: namespace, + }) + ); + return source$; +}; diff --git a/src/persistentReducedStream.ts b/src/persistentReducedStream.ts index 83a36d82..0f3fd2d2 100644 --- a/src/persistentReducedStream.ts +++ b/src/persistentReducedStream.ts @@ -1,17 +1,11 @@ -import { Subject } from 'rxjs'; -import { RegisteredReducer, combineReducers } from './reducer'; +import { RegisteredReducer } from './reducer'; import { ObservableState } from './observableState'; import { stateStreamRegistry } from './stateStreamRegistry'; -import { action$ as defaultAction$ } from './action$'; -import { ActionStream } from './types/helpers'; -import { withNamespace } from './operators'; import { tag } from 'rxjs-spy/operators'; - -type PersistentReducedStreamOptions = { - errorSubject?: Subject; - namespace?: string; - action$?: ActionStream; -}; +import { + ReducedStreamOptions, + reducedStream as reducedStreamInternal, +} from './internal/reducedStream'; /** * Creates and registers a persistent reduced state stream @@ -48,17 +42,9 @@ export const persistentReducedStream = ( name: string, initialState: State, reducers: RegisteredReducer[], - { errorSubject, namespace, action$: a$ }: PersistentReducedStreamOptions = {} + options: ReducedStreamOptions = {} ): ObservableState => { - const action$ = a$ ?? defaultAction$; - const filteredAction$ = - namespace === undefined ? action$ : action$.pipe(withNamespace(namespace)); - - const source$ = filteredAction$.pipe( - combineReducers(initialState, reducers, { - errorSubject: errorSubject, - namespace: namespace, - }), + const source$ = reducedStreamInternal(initialState, reducers, options).pipe( tag(name) ); diff --git a/src/reducedStream.spec.ts b/src/reducedStream.spec.ts new file mode 100644 index 00000000..e8269957 --- /dev/null +++ b/src/reducedStream.spec.ts @@ -0,0 +1,166 @@ +import test from 'ava'; +import { reducedStream } from './reducedStream'; +import { Subject, from, map, of } from 'rxjs'; +import { marbles } from 'rxjs-marbles/ava'; +import { incrementMocks } from './internal/testing/mock'; +import { reducer } from './reducer'; + +const { reducers, actionCreators, handlers } = incrementMocks; +const { actions, numbers, errors } = incrementMocks.marbles; +const reducerArray = Object.values(reducers); + +let counter = 1; +const nextStreamName = () => `testStream-${counter++}`; + +test('reducedStream should expose initial state immediately after subscribtion', (t) => { + const state = 'hello'; + const state$ = reducedStream(nextStreamName(), state, []); + state$.subscribe((emittedState) => { + t.is(emittedState, state); + }); + t.plan(1); +}); + +test( + 'reducedStream reduces state', + marbles((m) => { + const action$ = m.hot(' --1-2', actions); + const expected$ = m.hot('1-2-4', numbers); + const initialState = 1; + + const state$ = reducedStream(nextStreamName(), initialState, reducerArray, { + action$, + }); + + m.expect(state$).toBeObservable(expected$); + }) +); + +test( + 'reducedStream should support piping', + marbles((m) => { + const action$ = m.hot('-1-1-2', actions); + const expected = ' 02-4-8'; + + const state$ = reducedStream(nextStreamName(), 0, reducerArray, { + action$, + }); + const actual$ = state$.pipe(map((a) => `${a * 2}`)); + + m.expect(actual$).toBeObservable(expected); + }) +); + +test('reducedStream should call reducer once when there are multiple subs', (t) => { + const initialState = 1; + handlers.incrementOne.resetHistory(); + const action$ = of(actionCreators.incrementOne()); + const state$ = reducedStream(nextStreamName(), initialState, reducerArray, { + action$, + }); + + const sub1 = state$.subscribe(); + const sub2 = state$.subscribe(); + t.assert(handlers.incrementOne.calledOnce); + sub1.unsubscribe(); + sub2.unsubscribe(); +}); + +test( + 'reducedStream do not reduce when it has no subscribers', + marbles((m, t) => { + const initialState = 5; + const action$ = m.hot('-1-2-1-', actions); + reducedStream( + nextStreamName(), + initialState, + [ + reducer(actionCreators.incrementOne, (previous) => { + t.fail(); + return previous + 1; + }), + ], + { + action$, + } + ); + t.plan(0); + }) +); + +test( + 'reducedStream catches errors and emits them to error subject without losing subscription', + marbles((m) => { + const action$ = m.hot(' -d-1-d-2', actions); + const expected$ = m.hot('1--2---4', numbers); + const errorMarbles = ' -e---e--'; + const error$ = new Subject(); + + const state$ = reducedStream(nextStreamName(), 1, reducerArray, { + errorSubject: error$, + action$, + }); + m.expect(error$).toBeObservable(errorMarbles, errors); + m.expect(state$).toBeObservable(expected$); + }) +); + +test( + 'reducedStream only reduces action with correct namespace', + marbles((m) => { + const action$ = m.hot(' --mnnm-', actions); + const expected$ = m.hot('1--23--', numbers); + const initialState = 1; + + const state$ = reducedStream(nextStreamName(), initialState, reducerArray, { + namespace: incrementMocks.namespace, + action$, + }); + + m.expect(from(state$)).toBeObservable(expected$); + }) +); + +test( + 'reducedStream reduces namespaced actions when no namespace is set', + marbles((m) => { + const action$ = m.hot(' --1mn-', actions); + const expected$ = m.hot('1-234-', numbers); + const initialState = 1; + + const state$ = reducedStream(nextStreamName(), initialState, reducerArray, { + action$, + }); + + m.expect(from(state$)).toBeObservable(expected$); + }) +); + +test( + 'reducedStream forwards namespace to reducers', + marbles((m) => { + const action$ = m.hot(' --nm-2-n', actions); + const expected$ = m.hot('1-2----2', numbers); + const initialState = 1; + + const verifyNamespaceReducer = reducer( + actionCreators.incrementOne, + (state, _, namespace) => { + if (namespace === incrementMocks.namespace) { + return 2; + } + return state; + } + ); + + const state$ = reducedStream( + nextStreamName(), + initialState, + [verifyNamespaceReducer], + { namespace: incrementMocks.namespace, action$ } + ); + state$.subscribe(); + + m.expect(from(state$)).toBeObservable(expected$); + }) +); diff --git a/src/reducedStream.ts b/src/reducedStream.ts new file mode 100644 index 00000000..1d750345 --- /dev/null +++ b/src/reducedStream.ts @@ -0,0 +1,46 @@ +import { Observable, shareReplay, startWith } from 'rxjs'; +import { RegisteredReducer } from './reducer'; +import { tag } from 'rxjs-spy/operators'; +import { + ReducedStreamOptions, + reducedStream as reducedStreamInternal, +} from './internal/reducedStream'; + +/** + * Creates reduced state stream + * + * This stream scans over an action stream and other streams to build up state. + * + * To start reducing state, you must first subscribe to the action$ + * (connecting a component using the React HOC ```connect``` would be enough) + * + * The state stream is ref counted, which means that the stream + * will reset to its `defaultState` when there are no subscribers. + * + * If you wish to persist the state throughout the application lifecycle, + * you should use ```persistentReducedStream```. + * + * It is guaranteed that the state stream will always emit a value upon subscription. + * + * The values emitted from the stream are shared between the subscribers, + * and the reducers are only ran once per input action. + * + * @param name The name of the stream, used for placing a spy tag + * @param initialState The initial state of the stream + * @param reducers The reducers that build up the stream state + */ +export const reducedStream = ( + name: string, + initialState: State, + reducers: RegisteredReducer[], + options: ReducedStreamOptions = {} +): Observable => { + return reducedStreamInternal(initialState, reducers, options).pipe( + startWith(initialState), + shareReplay({ + refCount: true, + bufferSize: 1, + }), + tag(name) + ); +};