Skip to content

Commit

Permalink
feat: support namespacing persistentReducedStream
Browse files Browse the repository at this point in the history
Makes `PersistentReducedStateStream` accept a namespace which
makes the stream filter actions by that namespace.

BREAKING CHANGE: `persistentReducedStream` has been refactored to accept
the error subject in an options object instead of a standalone argument.
  • Loading branch information
tlaundal committed Jul 30, 2020
1 parent c5340fc commit cfb1a95
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 12 deletions.
5 changes: 4 additions & 1 deletion src/internal/testing/mock.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const throwErrorFn = (): number => {
};

const ERROR = 'error';
const namespace = 'namespace';

const handleOne = reducer(incrementOne, incrementOneHandler);
const handleMany = reducer(
Expand All @@ -23,6 +24,7 @@ const handleDecrementWithError = reducer(decrement, throwErrorFn);

export const incrementMocks = {
error: ERROR,
namespace,
actionCreators: {
incrementOne,
incrementMany,
Expand All @@ -40,7 +42,8 @@ export const incrementMocks = {
'1': incrementOne(),
'2': incrementMany(2),
d: decrement(),
n: _namespaceAction('namespace', incrementOne()),
n: _namespaceAction(namespace, incrementOne()),
m: _namespaceAction('memespace', incrementOne()),
},
words: {
a: '1',
Expand Down
15 changes: 13 additions & 2 deletions src/persistentReducedStateStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { RegisteredReducer, combineReducers } from './reducer';
import { defaultErrorSubject } from './internal/defaultErrorSubject';
import { markName } from './internal/markers';
import { tag } from 'rxjs-spy/operators';
import { withNamespace } from './operators';

/**
* PersistentReducedStateStream is an Observable interface for state streams.
Expand All @@ -29,6 +30,7 @@ export class PersistentReducedStateStream<State> extends Observable<State> {
private subject: BehaviorSubject<State>;
private reducerSubscription?: Subscription;
private reducers: RegisteredReducer<State, any>[];
private namespace?: string;

public name: string;

Expand All @@ -55,7 +57,12 @@ export class PersistentReducedStateStream<State> extends Observable<State> {

const seed = initialState === undefined ? this.subject.value : initialState;

this.reducerSubscription = action$
const filteredAction$ =
this.namespace === undefined
? action$
: action$.pipe(withNamespace(this.namespace));

this.reducerSubscription = filteredAction$
.pipe(
combineReducers(seed, this.reducers, {
errorSubject: this.errorSubject,
Expand All @@ -73,6 +80,7 @@ export class PersistentReducedStateStream<State> extends Observable<State> {
});
return this;
};

/**
* Stop reducing this state stream and unsubscribe from the action$.
*
Expand All @@ -89,17 +97,20 @@ export class PersistentReducedStateStream<State> extends Observable<State> {
}
this.subject.unsubscribe();
};

constructor(
name: string,
initialState: State,
reducers: RegisteredReducer<State, any>[],
errorSubject: Subject<any> = defaultErrorSubject
errorSubject: Subject<any> = defaultErrorSubject,
namespace?: string
) {
super();
this.name = name;
this.subject = new BehaviorSubject(initialState);
this.reducers = reducers;
this.errorSubject = errorSubject;
this.namespace = namespace;
}

get state(): State {
Expand Down
46 changes: 40 additions & 6 deletions src/persistentReducedStream.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -225,12 +225,9 @@ test(
const errorMarbles = ' -e-';
const error$ = new Subject<any>();

const state$ = persistentReducedStream(
nextStreamName(),
1,
reducerArray,
error$
);
const state$ = persistentReducedStream(nextStreamName(), 1, reducerArray, {
errorSubject: error$,
});
m.expect(error$).toBeObservable(errorMarbles, errors);
m.expect(state$).toBeObservable(expected$);
state$.startReducing(action$);
Expand All @@ -243,3 +240,40 @@ test('persistentReducedStream should throw exception when accessing state after

t.throws(() => state$.state);
});

test(
'persistentReducedStream only reduces action with correct namespace',
marbles((m) => {
const action$ = m.hot(' --mn-', actions);
const expected$ = m.hot('1--2-', numbers);
const initialState = 1;

const state$ = persistentReducedStream(
nextStreamName(),
initialState,
reducerArray,
{ namespace: incrementMocks.namespace }
);
state$.startReducing(action$);

m.expect(state$).toBeObservable(expected$);
})
);

test(
'persistentReducedStream reduces namespaced actions when no namespace is set',
marbles((m) => {
const action$ = m.hot(' --1mn-', actions);
const expected$ = m.hot('1-234-', numbers);
const initialState = 1;

const state$ = persistentReducedStream(
nextStreamName(),
initialState,
reducerArray
);
state$.startReducing(action$);

m.expect(state$).toBeObservable(expected$);
})
);
11 changes: 8 additions & 3 deletions src/persistentReducedStream.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
import { Subject } from 'rxjs';
import { RegisteredReducer } from './reducer';
import { defaultErrorSubject } from './internal/defaultErrorSubject';
import { PersistentReducedStateStream } from './persistentReducedStateStream';
import { stateStreamRegistry } from './stateStreamRegistry';

type PersistentReducedStreamOptions = {
errorSubject?: Subject<any>;
namespace?: string;
};

/**
* Creates and registers a persistent reduced state stream
*
Expand Down Expand Up @@ -39,13 +43,14 @@ export const persistentReducedStream = <State>(
name: string,
initialState: State,
reducers: RegisteredReducer<State, any>[],
errorSubject: Subject<any> = defaultErrorSubject
{ errorSubject, namespace }: PersistentReducedStreamOptions = {}
): PersistentReducedStateStream<State> => {
const stream = new PersistentReducedStateStream(
name,
initialState,
reducers,
errorSubject
errorSubject,
namespace
);

stateStreamRegistry.register(stream);
Expand Down

0 comments on commit cfb1a95

Please sign in to comment.