Skip to content

Commit

Permalink
feat: persistentDerivedStream
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaa2 committed Sep 9, 2022
1 parent 994f3f4 commit 1bff4db
Show file tree
Hide file tree
Showing 3 changed files with 86 additions and 0 deletions.
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export {

export { derivedStream } from './derivedStream';

export { persistentDerivedStream } from './persistentDerivedStream';
export { persistentReducedStream } from './persistentReducedStream';
export { ObservableState } from './observableState';

Expand Down
66 changes: 66 additions & 0 deletions src/persistentDerivedStream.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import test from 'ava';
import { combineLatest, from, map } from 'rxjs';
import { marbles } from 'rxjs-marbles/ava';
import { incrementMocks } from './internal/testing/mock';
import { persistentDerivedStream } from './persistentDerivedStream';
import { persistentReducedStream } from './persistentReducedStream';
import { ActionStream } from './types/helpers';

const { reducers } = incrementMocks;
const { actions, numbers } = incrementMocks.marbles;
const reducerArray = Object.values(reducers);

let counter = 1;
const nextStreamName = () => `testStream-${counter++}`;

const getTestObservableState = (action$: ActionStream) => {
const state$ = persistentReducedStream<number>(
nextStreamName(),
1,
reducerArray,
{
action$,
}
);
state$.connect();
return state$;
};

test(
'persistentDerivedStream should subscribe to its source.',
marbles((m) => {
const action$ = m.hot(' ----1--2-----1', actions);
const number$ = m.hot(' --2----1------', numbers);
const expected$ = m.hot('2-3-4--(65)--6', numbers);

const state$ = getTestObservableState(action$);

const derivedState$ = persistentDerivedStream<number>(
nextStreamName(),
combineLatest([state$, number$]).pipe(map(([n, n2]) => n + n2)),
2
);
derivedState$.connect();
m.expect(from(derivedState$)).toBeObservable(expected$);
})
);

test(
'persistentDerivedStream should expose the state of its source.',
marbles((m, t) => {
const action$ = m.hot(' --1---2-----1', actions);
const number$ = m.hot(' -1-------7---', numbers);

const initialState: [number, number] = [0, 0];
const state$ = getTestObservableState(action$);

const derivedState$ = persistentDerivedStream<[number, number]>(
nextStreamName(),
combineLatest([number$, state$]),
initialState
);
derivedState$.connect();
m.flush();
t.deepEqual(derivedState$.state, [7, 5]);
})
);
19 changes: 19 additions & 0 deletions src/persistentDerivedStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import { ObservableInput } from 'rxjs';
import { ObservableState } from './observableState';
import { stateStreamRegistry } from './stateStreamRegistry';

/**
* Make this stream a ObservableState stream of its source.
*
* @param name The unique name of this stream
* @param source$ The source of this stream
*/
export const persistentDerivedStream = <State>(
name: string,
source$: ObservableInput<State>,
initialState: State
): ObservableState<any> => {
const stream = new ObservableState(name, source$, initialState);
stateStreamRegistry.register(stream);
return stream;
};

0 comments on commit 1bff4db

Please sign in to comment.