Skip to content

Commit

Permalink
fix: ard-9960 fixed carry bug
Browse files Browse the repository at this point in the history
  • Loading branch information
tomoboy committed Sep 9, 2021
1 parent bca7f60 commit 2f3073e
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 6 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@ node_modules
dist
.nyc_output
coverage
.idea
14 changes: 13 additions & 1 deletion src/operators/operators.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,19 @@ test(
).toBeObservable(expected);
})
);

test(
'carry only subscribes to the original observable once',
marbles((m) => {
const source$ = m.cold(' f', { f });
const expected = m.hot('1', {
'1': [f.payload, f.payload.foo] as [FooPayload, number],
});
m.expect(
source$.pipe(extractPayload(), carry(map((e) => e.foo)))
).toBeObservable(expected);
m.expect(source$).toHaveSubscriptions([' ^']);
})
);
test(
'apply should provide context to operators',
marbles((m) => {
Expand Down
12 changes: 7 additions & 5 deletions src/operators/operators.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { MonoTypeOperatorFunction, OperatorFunction, of, pipe } from 'rxjs';
import { filter, flatMap, map, withLatestFrom } from 'rxjs/operators';
import { filter, flatMap, map, share, withLatestFrom } from 'rxjs/operators';
import { ActionWithPayload, ActionWithoutPayload } from '../types/Action';
import {
UnknownAction,
Expand Down Expand Up @@ -124,7 +124,7 @@ export const withoutNamespace = (

/**
* Stream operator that carries the initial payload alongside the results
* from the operator parameter
* from the operator parameter. Using this makes the observable/stream hot.
*
* ```
* routine(
Expand All @@ -142,11 +142,13 @@ export const carry =
<Carried, Emitted>(
operator: OperatorFunction<Carried, Emitted>
): OperatorFunction<Carried, [Carried, Emitted]> =>
(observable) =>
observable.pipe(
(observable) => {
const hot = observable.pipe(share());
return hot.pipe(
operator,
withLatestFrom(observable, (emitted, carried) => [carried, emitted])
withLatestFrom(hot, (emitted, carried) => [carried, emitted])
);
};

/**
* A utility operator for using pipes which need a value to be present
Expand Down

0 comments on commit 2f3073e

Please sign in to comment.