Skip to content

Commit

Permalink
feat: add reduced stream util
Browse files Browse the repository at this point in the history
chore: unit tests for reducedStream

chore: refactor reducedStream

fix: reducedStream unit test

fix: _reducedStream name
  • Loading branch information
rafaa2 committed Oct 5, 2022
1 parent ef97d6e commit 441718a
Show file tree
Hide file tree
Showing 5 changed files with 260 additions and 21 deletions.
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ export {
} from './routines';

export { derivedStream } from './derivedStream';
export { reducedStream } from './reducedStream';

export { persistentDerivedStream } from './persistentDerivedStream';
export { persistentReducedStream } from './persistentReducedStream';
Expand Down
40 changes: 40 additions & 0 deletions src/internal/reducedStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { RegisteredReducer, combineReducers } from '../reducer';
import { action$ as defaultAction$ } from '../action$';
import { Observable, Subject } from 'rxjs';
import { ActionStream } from '../types/helpers';
import { withNamespace } from '../operators';

export type ReducedStreamOptions = {
errorSubject?: Subject<any>;
namespace?: string;
action$?: ActionStream;
};

/**
* Creates a simple reduced state stream (without tagging, making sure to have initial state, nor replay)
*
* @param initialState The initial state of the stream
* @param reducers The reducers that build up the stream state
* @param {ReducedStreamOptions} options
* @returns
*/
export const reducedStream = <State>(
initialState: State,
reducers: RegisteredReducer<State, any>[],
{
errorSubject,
namespace,
action$ = defaultAction$,
}: ReducedStreamOptions = {}
): Observable<State> => {
const filteredAction$ =
namespace === undefined ? action$ : action$.pipe(withNamespace(namespace));

const source$ = filteredAction$.pipe(
combineReducers(initialState, reducers, {
errorSubject: errorSubject,
namespace: namespace,
})
);
return source$;
};
28 changes: 7 additions & 21 deletions src/persistentReducedStream.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,11 @@
import { Subject } from 'rxjs';
import { RegisteredReducer, combineReducers } from './reducer';
import { RegisteredReducer } from './reducer';
import { ObservableState } from './observableState';
import { stateStreamRegistry } from './stateStreamRegistry';
import { action$ as defaultAction$ } from './action$';
import { ActionStream } from './types/helpers';
import { withNamespace } from './operators';
import { tag } from 'rxjs-spy/operators';

type PersistentReducedStreamOptions = {
errorSubject?: Subject<any>;
namespace?: string;
action$?: ActionStream;
};
import {
ReducedStreamOptions,
reducedStream as reducedStreamInternal,
} from './internal/reducedStream';

/**
* Creates and registers a persistent reduced state stream
Expand Down Expand Up @@ -48,17 +42,9 @@ export const persistentReducedStream = <State>(
name: string,
initialState: State,
reducers: RegisteredReducer<State, any>[],
{ errorSubject, namespace, action$: a$ }: PersistentReducedStreamOptions = {}
options: ReducedStreamOptions = {}
): ObservableState<State> => {
const action$ = a$ ?? defaultAction$;
const filteredAction$ =
namespace === undefined ? action$ : action$.pipe(withNamespace(namespace));

const source$ = filteredAction$.pipe(
combineReducers(initialState, reducers, {
errorSubject: errorSubject,
namespace: namespace,
}),
const source$ = reducedStreamInternal(initialState, reducers, options).pipe(
tag(name)
);

Expand Down
166 changes: 166 additions & 0 deletions src/reducedStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
import test from 'ava';
import { reducedStream } from './reducedStream';
import { Subject, from, map, of } from 'rxjs';
import { marbles } from 'rxjs-marbles/ava';
import { incrementMocks } from './internal/testing/mock';
import { reducer } from './reducer';

const { reducers, actionCreators, handlers } = incrementMocks;
const { actions, numbers, errors } = incrementMocks.marbles;
const reducerArray = Object.values(reducers);

let counter = 1;
const nextStreamName = () => `testStream-${counter++}`;

test('reducedStream should expose initial state immediately after subscribtion', (t) => {
const state = 'hello';
const state$ = reducedStream(nextStreamName(), state, []);
state$.subscribe((emittedState) => {
t.is(emittedState, state);
});
t.plan(1);
});

test(
'reducedStream reduces state',
marbles((m) => {
const action$ = m.hot(' --1-2', actions);
const expected$ = m.hot('1-2-4', numbers);
const initialState = 1;

const state$ = reducedStream(nextStreamName(), initialState, reducerArray, {
action$,
});

m.expect(state$).toBeObservable(expected$);
})
);

test(
'reducedStream should support piping',
marbles((m) => {
const action$ = m.hot('-1-1-2', actions);
const expected = ' 02-4-8';

const state$ = reducedStream(nextStreamName(), 0, reducerArray, {
action$,
});
const actual$ = state$.pipe(map((a) => `${a * 2}`));

m.expect(actual$).toBeObservable(expected);
})
);

test('reducedStream should call reducer once when there are multiple subs', (t) => {
const initialState = 1;
handlers.incrementOne.resetHistory();
const action$ = of(actionCreators.incrementOne());
const state$ = reducedStream(nextStreamName(), initialState, reducerArray, {
action$,
});

const sub1 = state$.subscribe();
const sub2 = state$.subscribe();
t.assert(handlers.incrementOne.calledOnce);
sub1.unsubscribe();
sub2.unsubscribe();
});

test(
'reducedStream do not reduce when it has no subscribers',
marbles((m, t) => {
const initialState = 5;
const action$ = m.hot('-1-2-1-', actions);
reducedStream(
nextStreamName(),
initialState,
[
reducer(actionCreators.incrementOne, (previous) => {
t.fail();
return previous + 1;
}),
],
{
action$,
}
);
t.plan(0);
})
);

test(
'reducedStream catches errors and emits them to error subject without losing subscription',
marbles((m) => {
const action$ = m.hot(' -d-1-d-2', actions);
const expected$ = m.hot('1--2---4', numbers);
const errorMarbles = ' -e---e--';
const error$ = new Subject<any>();

const state$ = reducedStream(nextStreamName(), 1, reducerArray, {
errorSubject: error$,
action$,
});
m.expect(error$).toBeObservable(errorMarbles, errors);
m.expect(state$).toBeObservable(expected$);
})
);

test(
'reducedStream only reduces action with correct namespace',
marbles((m) => {
const action$ = m.hot(' --mnnm-', actions);
const expected$ = m.hot('1--23--', numbers);
const initialState = 1;

const state$ = reducedStream(nextStreamName(), initialState, reducerArray, {
namespace: incrementMocks.namespace,
action$,
});

m.expect(from(state$)).toBeObservable(expected$);
})
);

test(
'reducedStream reduces namespaced actions when no namespace is set',
marbles((m) => {
const action$ = m.hot(' --1mn-', actions);
const expected$ = m.hot('1-234-', numbers);
const initialState = 1;

const state$ = reducedStream(nextStreamName(), initialState, reducerArray, {
action$,
});

m.expect(from(state$)).toBeObservable(expected$);
})
);

test(
'reducedStream forwards namespace to reducers',
marbles((m) => {
const action$ = m.hot(' --nm-2-n', actions);
const expected$ = m.hot('1-2----2', numbers);
const initialState = 1;

const verifyNamespaceReducer = reducer(
actionCreators.incrementOne,
(state, _, namespace) => {
if (namespace === incrementMocks.namespace) {
return 2;
}
return state;
}
);

const state$ = reducedStream(
nextStreamName(),
initialState,
[verifyNamespaceReducer],
{ namespace: incrementMocks.namespace, action$ }
);
state$.subscribe();

m.expect(from(state$)).toBeObservable(expected$);
})
);
46 changes: 46 additions & 0 deletions src/reducedStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { Observable, shareReplay, startWith } from 'rxjs';
import { RegisteredReducer } from './reducer';
import { tag } from 'rxjs-spy/operators';
import {
ReducedStreamOptions,
reducedStream as reducedStreamInternal,
} from './internal/reducedStream';

/**
* Creates reduced state stream
*
* This stream scans over an action stream and other streams to build up state.
*
* To start reducing state, you must first subscribe to the action$
* (connecting a component using the React HOC ```connect``` would be enough)
*
* The state stream is ref counted, which means that the stream
* will reset to its `defaultState` when there are no subscribers.
*
* If you wish to persist the state throughout the application lifecycle,
* you should use ```persistentReducedStream```.
*
* It is guaranteed that the state stream will always emit a value upon subscription.
*
* The values emitted from the stream are shared between the subscribers,
* and the reducers are only ran once per input action.
*
* @param name The name of the stream, used for placing a spy tag
* @param initialState The initial state of the stream
* @param reducers The reducers that build up the stream state
*/
export const reducedStream = <State>(
name: string,
initialState: State,
reducers: RegisteredReducer<State, any>[],
options: ReducedStreamOptions = {}
): Observable<State> => {
return reducedStreamInternal(initialState, reducers, options).pipe(
startWith(initialState),
shareReplay({
refCount: true,
bufferSize: 1,
}),
tag(name)
);
};

0 comments on commit 441718a

Please sign in to comment.