From 627349dd3347e934858f9d00df9cee41a97889a7 Mon Sep 17 00:00:00 2001 From: Tobias Laundal Date: Thu, 16 Apr 2020 14:34:29 +0200 Subject: [PATCH] feat: introduce high level state stream tooling Introduces the `persistentReducedStream` function, which allows for defining a reduced state stream without manually piping the action stream. It returns a `StateStream` object, which has the latest state available as a property, and which can be treated as an `Observable`. --- src/index.ts | 2 + src/persistentReducedStream.spec.ts | 193 ++++++++++++++++++++++++++++ src/persistentReducedStream.ts | 93 ++++++++++++++ 3 files changed, 288 insertions(+) create mode 100644 src/persistentReducedStream.spec.ts create mode 100644 src/persistentReducedStream.ts diff --git a/src/index.ts b/src/index.ts index af5cdef9..1388e243 100644 --- a/src/index.ts +++ b/src/index.ts @@ -35,3 +35,5 @@ export { export { derivedStream } from './derivedStream'; export { combineLatest, merge, zip } from './decoratedObservableCombiners'; + +export { persistentReducedStream } from './persistentReducedStream'; diff --git a/src/persistentReducedStream.spec.ts b/src/persistentReducedStream.spec.ts new file mode 100644 index 00000000..a8049631 --- /dev/null +++ b/src/persistentReducedStream.spec.ts @@ -0,0 +1,193 @@ +import test from 'ava'; +import { + StateStream, + persistentReducedStream, +} from './persistentReducedStream'; +import { never, of, Subject } from 'rxjs'; +import { marbles } from 'rxjs-marbles/ava'; +import { incrementMocks } from './internal/testing/mock'; +import { map } from 'rxjs/operators'; + +const { reducers, actionCreators, handlers } = incrementMocks; +const { actions, numbers, errors } = incrementMocks.marbles; +const reducerArray = Object.values(reducers); + +test('StateStream should expose its state immediately', (t) => { + const state = 'hello'; + const state$ = new StateStream(state, never()); + + t.deepEqual(state$.state, state); +}); + +test('StateStream should not initially be closed', (t) => { + const state$ = new StateStream(null, never()); + + t.false(state$.closed); +}); + +test( + 'StateStream should follow the underlying observable', + marbles((m, t) => { + const source$ = m.hot('-abc'); + const expected = ' 0abc'; + + const state$ = new StateStream('0', source$); + + m.expect(state$).toBeObservable(expected); + + m.flush(); + t.deepEqual(state$.state, 'c'); + }) +); + +test( + 'StateStream should error when the underlying observable errors', + marbles((m) => { + const source$ = m.hot('-a#'); + const expected = ' 0a#'; + + const state$ = new StateStream('0', source$); + + m.expect(state$).toBeObservable(expected); + }) +); + +test( + 'StateStream should complete when the underlying observable completes', + marbles((m) => { + const source$ = m.hot('-a|'); + const expected = ' 0a|'; + + const state$ = new StateStream('0', source$); + + m.expect(state$).toBeObservable(expected); + }) +); + +test( + 'StateStream should support piping', + marbles((m) => { + const source$ = m.hot('-12'); + const expected = ' 024'; + + const state$ = new StateStream(0, source$); + const actual$ = state$.pipe(map((a) => `${a * 2}`)); + + m.expect(actual$).toBeObservable(expected); + }) +); + +test( + 'StateStream should be possible to unsubscribe', + marbles((m, t) => { + const trigger = m.hot('--|'); + const source$ = m.hot('-a-c'); + const expected$ = ' 0a--'; + const subscription = ' ^-!'; + + const state$ = new StateStream('0', source$); + trigger.subscribe({ complete: () => state$.unsubscribe() }); + + m.expect(state$).toBeObservable(expected$); + m.expect(source$).toHaveSubscriptions(subscription); + + m.flush(); + t.true(state$.closed); + }) +); + +test('persistentReducedStream should expose the state immediately', (t) => { + const initialState = 5; + const state$ = persistentReducedStream('test', initialState, [], never()); + + t.deepEqual(state$.state, initialState); +}); + +test( + 'persistentReducedStream reduces state', + marbles((m, t) => { + const action$ = m.hot(' --1-', actions); + const expected$ = m.hot('1-2-', numbers); + const initialState = 1; + + const state$ = persistentReducedStream( + 'testStream', + initialState, + reducerArray, + action$ + ); + + m.expect(state$).toBeObservable(expected$); + + m.flush(); + t.deepEqual(state$.state, numbers[2]); + }) +); + +test('persistentReducedStream should call reducer once when there are multiple subs', (t) => { + const initialState = 1; + handlers.incrementOne.resetHistory(); + const action$ = of(actionCreators.incrementOne()); + const state$ = persistentReducedStream( + 'testStream', + initialState, + reducerArray, + action$ + ); + + const sub1 = state$.subscribe(); + const sub2 = state$.subscribe(); + t.assert(handlers.incrementOne.calledOnce); + sub1.unsubscribe(); + sub2.unsubscribe(); +}); + +test( + 'persistentReducedStream should never reset state', + marbles((m) => { + const initialState = 1; + const action$ = m.hot(' -1-1-1-', actions); + const sub1 = ' ^-!----'; + const sub1Expected$ = m.hot('12-----', numbers); + const sub2 = ' ----^-!'; + const sub2Expected$ = m.hot('----34-', numbers); + const state$ = persistentReducedStream( + 'testStream', + initialState, + reducerArray, + action$ + ); + + m.expect(state$, sub1).toBeObservable(sub1Expected$); + m.expect(state$, sub2).toBeObservable(sub2Expected$); + }) +); + +test('persistentReducedStream should always reduce', (t) => { + const initialState = 1; + const action$ = of(actions[1]); + + const state$ = persistentReducedStream( + 'testStream', + initialState, + reducerArray, + action$ + ); + + t.deepEqual(state$.state, 2); +}); + +test( + 'persistentReducedStream catches errors and emits them to error subject', + marbles((m) => { + const action$ = m.hot(' -d-1', actions); + const expected$ = m.hot('1--2', numbers); + const errorMarbles = ' -e-'; + const error$ = new Subject(); + + m.expect(error$).toBeObservable(errorMarbles, errors); + m.expect( + persistentReducedStream('testStream', 1, reducerArray, action$, error$) + ).toBeObservable(expected$); + }) +); diff --git a/src/persistentReducedStream.ts b/src/persistentReducedStream.ts new file mode 100644 index 00000000..4c7c61e9 --- /dev/null +++ b/src/persistentReducedStream.ts @@ -0,0 +1,93 @@ +import { + BehaviorSubject, + Observable, + Subscription, + Subject, + SubscriptionLike, + Operator, + Subscriber, + TeardownLogic, +} from 'rxjs'; +import { ActionStream } from './types/helpers'; +import { RegisteredReducer, combineReducers } from './reducer'; +import { defaultErrorSubject } from './internal/defaultErrorSubject'; +import { markName } from './internal/markers'; +import { tag } from 'rxjs-spy/operators'; + +/** + * A hot, persistent observable with a state field + */ +export class StateStream extends Observable + implements SubscriptionLike { + private subject: BehaviorSubject; + private subscription: Subscription; + + constructor(initialState: State, reducedState$: Observable) { + super(); + this.subject = new BehaviorSubject(initialState); + this.subscription = reducedState$.subscribe(this.subject); + } + + get state(): State { + return this.subject.getValue(); + } + + get closed() { + return this.subject.closed; + } + + lift(operator: Operator): Observable { + return this.subject.lift(operator); + } + + unsubscribe() { + this.subscription.unsubscribe(); + this.subject.unsubscribe(); + } + + _trySubscribe(subscriber: Subscriber): TeardownLogic { + return this.subject._trySubscribe(subscriber); + } + + _subscribe(subscriber: Subscriber): Subscription { + return this.subject._subscribe(subscriber); + } +} + +/** + * Create a reduced state stream + * + * A reduced state stream is a stream that scans over an action stream and other + * stream to build up a state. It is eternally subscribed and always exposes + * it latest value like a Behaviour subject. + * + * ``` + * const myState$ = persistentReducedStream( + * 'myState$', + * initialState, + * reducers, + * action$ + * ); + * + * myState$.value === initialState // Will be true + * ``` + * + * @param name The name of the stream, used for placing a marker and spy tag + * @param initialState The initial state of the stream + * @param reducers The reducers that build up the stream state + * @param action$ The action stream the action reducers should reduce over + */ +export const persistentReducedStream = ( + name: string, + initialState: State, + reducers: RegisteredReducer[], + action$: ActionStream, + errorSubject: Subject = defaultErrorSubject +): StateStream => { + const reducedState$ = action$.pipe( + combineReducers(initialState, reducers, errorSubject), + markName(name), + tag(name) + ); + return new StateStream(initialState, reducedState$); +};