Skip to content

Commit

Permalink
feat(reducers): Support for stream reducers
Browse files Browse the repository at this point in the history
`reducers` and `combineReducers` now support reducing from other
streams.
  • Loading branch information
tlaundal committed Feb 27, 2020
1 parent 940ef95 commit eec3fb5
Show file tree
Hide file tree
Showing 2 changed files with 181 additions and 37 deletions.
94 changes: 80 additions & 14 deletions src/reducer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,22 @@ const handleMany = reducer(
const handleDecrement = reducer(decrement, throwErrorFn);
const alwaysReset = reducer([incrementOne, incrementMany, decrement], () => 5);

const inputs = {
const actions = {
'1': incrementOne(),
'2': incrementMany(2),
d: decrement(),
};
const outputs = {
const words = {
a: '1',
b: '12',
};
const numbers = {
'1': 1,
'2': 2,
'3': 3,
'4': 4,
'5': 5,
'6': 6,
};

test('reducer should store reducer function', t => {
Expand All @@ -39,35 +46,94 @@ test('reducer should store reducer function', t => {
test(
'combineReducers should reduce actions to state',
marbles(m => {
const source = m.hot(' 121', inputs);
const expected = m.hot('245', outputs);
const action$ = m.hot(' 121', actions);
const expected$ = m.hot('245', numbers);

m.expect(
action$.pipe(combineReducers(1, [handleOne, handleMany]))
).toBeObservable(expected$);
})
);

test(
'combineReducers should reduce state from other streams',
marbles(m => {
const action$ = m.hot(' --');
const word$ = m.hot(' ab', words);
const expected$ = m.hot('24', numbers);

const handleWord = reducer(
word$,
(state: number, word) => state + word.length
);

m.expect(action$.pipe(combineReducers(1, [handleWord]))).toBeObservable(
expected$
);
})
);

test(
'combineReducers should support both actions and other streams',
marbles(m => {
const action$ = m.hot(' -1-', actions);
const word$ = m.hot(' a-b', words);
const expected$ = m.hot('235', numbers);

const handleWord = reducer(
word$,
(state: number, word) => state + word.length
);

m.expect(
source.pipe(combineReducers(1, [handleOne, handleMany]))
).toBeObservable(expected);
action$.pipe(combineReducers(1, [handleOne, handleWord]))
).toBeObservable(expected$);
})
);

test(
'combineReducers should not silence errors',
marbles(m => {
const source = m.hot(' 1d1', inputs);
const expected = m.hot('2# ', outputs);
const action$ = m.hot(' 1d1', actions);
const expected$ = m.hot('2# ', numbers);

m.expect(
source.pipe(combineReducers(1, [handleOne, handleDecrement]))
).toBeObservable(expected);
action$.pipe(combineReducers(1, [handleOne, handleDecrement]))
).toBeObservable(expected$);
})
);

test(
'combineReducers should handle reducers for multiple actions',
marbles(m => {
const source = m.hot(' 1', inputs);
const expected = m.hot('5', outputs);
const action$ = m.hot(' 12', actions);
const expected$ = m.hot('55', numbers);

m.expect(action$.pipe(combineReducers(1, [alwaysReset]))).toBeObservable(
expected$
);
})
);

test(
'combineReducers should reduce in predictable order',
marbles(m => {
// The "predictable order" seems to be the order in which the streams emit.
// Eg. The ordering of these seed definitions matter, but not the order of
// the reducers.
const action$ = m.hot(' 1', actions);
const reset$ = m.hot(' 6', numbers);
const divide$ = m.hot(' 2', numbers);
const expected$ = m.hot('(263)', numbers);

m.expect(source.pipe(combineReducers(1, [alwaysReset]))).toBeObservable(
expected
const resetReducer = reducer(reset$, (_: number, state) => state);
const divideReducer = reducer(
divide$,
(state: number, divisor) => state / divisor
);

m.expect(
action$.pipe(combineReducers(1, [divideReducer, resetReducer, handleOne]))
).toBeObservable(expected$);
})
);
124 changes: 101 additions & 23 deletions src/reducer.ts
Original file line number Diff line number Diff line change
@@ -1,27 +1,55 @@
import { OperatorFunction, pipe } from 'rxjs';
import { scan } from 'rxjs/operators';
import { OperatorFunction, pipe, Observable } from 'rxjs';
import { scan, map } from 'rxjs/operators';
import {
VoidPayload,
UnknownAction,
UnknownActionCreator,
UnknownActionCreatorWithPayload,
} from 'rxbeach/internal';
import { ofType } from 'rxbeach/operators';
import { ofType, merge } from 'rxbeach/operators';

const wrapInArray = <T>(val: T | T[]): T[] =>
Array.isArray(val) ? val : [val];

export type Reducer<State, Payload = VoidPayload> = (
previousState: State,
payload: Payload
) => State;

type RegisteredActionReducer<State, Payload = any> = Reducer<State, Payload> & {
trigger: {
actions: UnknownActionCreator[];
};
};
type RegisteredStreamReducer<State, Payload = any> = Reducer<State, Payload> & {
trigger: {
source$: Observable<Payload>;
};
};

export type RegisteredReducer<State, Payload = any> = Reducer<
State,
Payload
> & {
trigger: {
actions: UnknownActionCreator[];
};
trigger:
| {
actions: UnknownActionCreator[];
}
| {
source$: Observable<Payload>;
};
};

const isActionReducer = <State, Payload>(
reducerFn: RegisteredReducer<State, Payload>
): reducerFn is RegisteredActionReducer<State, Payload> =>
'actions' in reducerFn.trigger;

const isStreamReducer = <State, Payload>(
reducerFn: RegisteredReducer<State, Payload>
): reducerFn is RegisteredStreamReducer<State, Payload> =>
'source$' in reducerFn.trigger;

type ReducerCreator = {
/**
* Define a reducer for an action with payload
Expand Down Expand Up @@ -108,50 +136,100 @@ type ReducerCreator = {
actionCreator: UnknownActionCreator[],
reducer: Reducer<State, VoidPayload>
): RegisteredReducer<State, VoidPayload>;

/**
* Define a reducer for a stream
*
* @see combineReducers
* @param source$ The stream which will trigger this reducer
* @param reducer The reducer function
* @template `State` - The state the reducer reduces to
* @template `Payload` - The type of values `source$` emits
* @returns A registered reducer that can be passed into `combineReducers`, or
* called directly as if it was the `reducer` parameter itself.
*/
<State, Payload>(
source$: Observable<Payload>,
reducer: Reducer<State, Payload>
): RegisteredReducer<State, Payload>;
};

export const reducer: ReducerCreator = <State>(
actionCreator: UnknownActionCreator | UnknownActionCreator[],
actionCreator:
| UnknownActionCreator
| UnknownActionCreator[]
| Observable<any>,
reducerFn: Reducer<State, any>
): RegisteredReducer<State, unknown> => {
) => {
const wrapper = (state: State, payload: any) => reducerFn(state, payload);
wrapper.trigger = {
actions: Array.isArray(actionCreator) ? actionCreator : [actionCreator],
};
if (actionCreator instanceof Observable) {
wrapper.trigger = {
source$: actionCreator,
};
} else {
wrapper.trigger = {
actions: wrapInArray(actionCreator),
};
}
return wrapper;
};

const ACTION_ORIGIN = Symbol('Action origin');

/**
* Combine reducer entries into a stream operator
* Combine registered reducers into a stream operator
*
* Each reducer will receive the previous state (or the seed if it's the first
* invocation) together with the payloads of the actions of the given reducer,
* or the emitted values from the stream of the given reducer.
*
* The payload of each incoming action is applied to the matching reducers
* together with the previous state (or the seed if it's the first invocation),
* and the returned state is emitted.
* The behaviour is undefined if multiple reducers are registered for the same
* actions.
*
* This operator does not change whether the stream is hot or cold.
*
* The order of invocation for the reducers is controlled by the rxjs operator
* `merge`, which is called with all the actions first and then the source
* streams in the order their reducers are defined in the `reducers` argument.
*
* @param seed The initial input to the first reducer call
* @param reducers The reducer entries that should be combined
* @see rxjs.merge
*/
export const combineReducers = <State>(
seed: State,
reducers: RegisteredReducer<State, any>[]
): OperatorFunction<UnknownAction, State> => {
const actionReducers = reducers.filter(isActionReducer);
const streamReducers = reducers.filter(isStreamReducer);
const reducersByActionType = new Map(
reducers.flatMap(reducerFn =>
actionReducers.flatMap(reducerFn =>
reducerFn.trigger.actions.map(action => [action.type, reducerFn])
)
);

type Packet =
| { origin: typeof ACTION_ORIGIN; value: UnknownAction }
| { origin: number; value: any };

const source$s = streamReducers.map((reducerFn, i) =>
reducerFn.trigger.source$.pipe(
map((payload): Packet => ({ origin: i, value: payload }))
)
);

return pipe(
ofType(...reducers.flatMap(reducerFn => reducerFn.trigger.actions)),
scan((state, { type, payload }: UnknownAction) => {
const RegisteredReducer = reducersByActionType.get(type);
if (RegisteredReducer === undefined) {
// This shouldn't be possible
return state;
ofType(...actionReducers.flatMap(reducerFn => reducerFn.trigger.actions)),
map((action): Packet => ({ origin: ACTION_ORIGIN, value: action })),
merge(...source$s),
scan((state, packet) => {
if (packet.origin === ACTION_ORIGIN) {
const reducerFn = reducersByActionType.get(packet.value.type)!;
return reducerFn(state, packet.value.payload);
}

return RegisteredReducer(state, payload);
const reducerFn = streamReducers[packet.origin];
return reducerFn(state, packet.value);
}, seed)
);
};

0 comments on commit eec3fb5

Please sign in to comment.