Skip to content

Commit

Permalink
feat: introduce high level state stream tooling
Browse files Browse the repository at this point in the history
Introduces the `persistentReducedStream` function, which allows for defining
a reduced state stream without manually piping the action stream. It returns
a `StateStream` object, which has the latest state available as a property,
and which can be treated as an `Observable`.
  • Loading branch information
tlaundal committed Apr 23, 2020
1 parent fb7d0ff commit 627349d
Show file tree
Hide file tree
Showing 3 changed files with 288 additions and 0 deletions.
2 changes: 2 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,5 @@ export {
export { derivedStream } from './derivedStream';

export { combineLatest, merge, zip } from './decoratedObservableCombiners';

export { persistentReducedStream } from './persistentReducedStream';
193 changes: 193 additions & 0 deletions src/persistentReducedStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
import test from 'ava';
import {
StateStream,
persistentReducedStream,
} from './persistentReducedStream';
import { never, of, Subject } from 'rxjs';
import { marbles } from 'rxjs-marbles/ava';
import { incrementMocks } from './internal/testing/mock';
import { map } from 'rxjs/operators';

const { reducers, actionCreators, handlers } = incrementMocks;
const { actions, numbers, errors } = incrementMocks.marbles;
const reducerArray = Object.values(reducers);

test('StateStream should expose its state immediately', (t) => {
const state = 'hello';
const state$ = new StateStream(state, never());

t.deepEqual(state$.state, state);
});

test('StateStream should not initially be closed', (t) => {
const state$ = new StateStream(null, never());

t.false(state$.closed);
});

test(
'StateStream should follow the underlying observable',
marbles((m, t) => {
const source$ = m.hot('-abc');
const expected = ' 0abc';

const state$ = new StateStream('0', source$);

m.expect(state$).toBeObservable(expected);

m.flush();
t.deepEqual(state$.state, 'c');
})
);

test(
'StateStream should error when the underlying observable errors',
marbles((m) => {
const source$ = m.hot('-a#');
const expected = ' 0a#';

const state$ = new StateStream('0', source$);

m.expect(state$).toBeObservable(expected);
})
);

test(
'StateStream should complete when the underlying observable completes',
marbles((m) => {
const source$ = m.hot('-a|');
const expected = ' 0a|';

const state$ = new StateStream('0', source$);

m.expect(state$).toBeObservable(expected);
})
);

test(
'StateStream should support piping',
marbles((m) => {
const source$ = m.hot('-12');
const expected = ' 024';

const state$ = new StateStream(0, source$);
const actual$ = state$.pipe(map((a) => `${a * 2}`));

m.expect(actual$).toBeObservable(expected);
})
);

test(
'StateStream should be possible to unsubscribe',
marbles((m, t) => {
const trigger = m.hot('--|');
const source$ = m.hot('-a-c');
const expected$ = ' 0a--';
const subscription = ' ^-!';

const state$ = new StateStream('0', source$);
trigger.subscribe({ complete: () => state$.unsubscribe() });

m.expect(state$).toBeObservable(expected$);
m.expect(source$).toHaveSubscriptions(subscription);

m.flush();
t.true(state$.closed);
})
);

test('persistentReducedStream should expose the state immediately', (t) => {
const initialState = 5;
const state$ = persistentReducedStream('test', initialState, [], never());

t.deepEqual(state$.state, initialState);
});

test(
'persistentReducedStream reduces state',
marbles((m, t) => {
const action$ = m.hot(' --1-', actions);
const expected$ = m.hot('1-2-', numbers);
const initialState = 1;

const state$ = persistentReducedStream(
'testStream',
initialState,
reducerArray,
action$
);

m.expect(state$).toBeObservable(expected$);

m.flush();
t.deepEqual(state$.state, numbers[2]);
})
);

test('persistentReducedStream should call reducer once when there are multiple subs', (t) => {
const initialState = 1;
handlers.incrementOne.resetHistory();
const action$ = of(actionCreators.incrementOne());
const state$ = persistentReducedStream(
'testStream',
initialState,
reducerArray,
action$
);

const sub1 = state$.subscribe();
const sub2 = state$.subscribe();
t.assert(handlers.incrementOne.calledOnce);
sub1.unsubscribe();
sub2.unsubscribe();
});

test(
'persistentReducedStream should never reset state',
marbles((m) => {
const initialState = 1;
const action$ = m.hot(' -1-1-1-', actions);
const sub1 = ' ^-!----';
const sub1Expected$ = m.hot('12-----', numbers);
const sub2 = ' ----^-!';
const sub2Expected$ = m.hot('----34-', numbers);
const state$ = persistentReducedStream(
'testStream',
initialState,
reducerArray,
action$
);

m.expect(state$, sub1).toBeObservable(sub1Expected$);
m.expect(state$, sub2).toBeObservable(sub2Expected$);
})
);

test('persistentReducedStream should always reduce', (t) => {
const initialState = 1;
const action$ = of(actions[1]);

const state$ = persistentReducedStream(
'testStream',
initialState,
reducerArray,
action$
);

t.deepEqual(state$.state, 2);
});

test(
'persistentReducedStream catches errors and emits them to error subject',
marbles((m) => {
const action$ = m.hot(' -d-1', actions);
const expected$ = m.hot('1--2', numbers);
const errorMarbles = ' -e-';
const error$ = new Subject<any>();

m.expect(error$).toBeObservable(errorMarbles, errors);
m.expect(
persistentReducedStream('testStream', 1, reducerArray, action$, error$)
).toBeObservable(expected$);
})
);
93 changes: 93 additions & 0 deletions src/persistentReducedStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import {
BehaviorSubject,
Observable,
Subscription,
Subject,
SubscriptionLike,
Operator,
Subscriber,
TeardownLogic,
} from 'rxjs';
import { ActionStream } from './types/helpers';
import { RegisteredReducer, combineReducers } from './reducer';
import { defaultErrorSubject } from './internal/defaultErrorSubject';
import { markName } from './internal/markers';
import { tag } from 'rxjs-spy/operators';

/**
* A hot, persistent observable with a state field
*/
export class StateStream<State> extends Observable<State>
implements SubscriptionLike {
private subject: BehaviorSubject<State>;
private subscription: Subscription;

constructor(initialState: State, reducedState$: Observable<State>) {
super();
this.subject = new BehaviorSubject(initialState);
this.subscription = reducedState$.subscribe(this.subject);
}

get state(): State {
return this.subject.getValue();
}

get closed() {
return this.subject.closed;
}

lift<R>(operator: Operator<State, R>): Observable<R> {
return this.subject.lift(operator);
}

unsubscribe() {
this.subscription.unsubscribe();
this.subject.unsubscribe();
}

_trySubscribe(subscriber: Subscriber<State>): TeardownLogic {
return this.subject._trySubscribe(subscriber);
}

_subscribe(subscriber: Subscriber<State>): Subscription {
return this.subject._subscribe(subscriber);
}
}

/**
* Create a reduced state stream
*
* A reduced state stream is a stream that scans over an action stream and other
* stream to build up a state. It is eternally subscribed and always exposes
* it latest value like a Behaviour subject.
*
* ```
* const myState$ = persistentReducedStream(
* 'myState$',
* initialState,
* reducers,
* action$
* );
*
* myState$.value === initialState // Will be true
* ```
*
* @param name The name of the stream, used for placing a marker and spy tag
* @param initialState The initial state of the stream
* @param reducers The reducers that build up the stream state
* @param action$ The action stream the action reducers should reduce over
*/
export const persistentReducedStream = <State>(
name: string,
initialState: State,
reducers: RegisteredReducer<State, any>[],
action$: ActionStream,
errorSubject: Subject<any> = defaultErrorSubject
): StateStream<State> => {
const reducedState$ = action$.pipe(
combineReducers(initialState, reducers, errorSubject),
markName(name),
tag(name)
);
return new StateStream(initialState, reducedState$);
};

0 comments on commit 627349d

Please sign in to comment.