Skip to content

Commit

Permalink
feat(signals): Don't unsubscribe from response observable after first…
Browse files Browse the repository at this point in the history
… response

Fix #102
  • Loading branch information
Tyler Cooke authored and gabrielguerrero committed Jul 2, 2024
1 parent a02b7d5 commit 679c12e
Show file tree
Hide file tree
Showing 4 changed files with 257 additions and 11 deletions.
1 change: 1 addition & 0 deletions libs/ngrx-traits/signals/api-docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ The original call can only have zero or one parameter, use an object with multip
props as first param if you need more.</p>

**Kind**: global function
**Warning**: The default mapPipe is [exhaustMap](https://www.learnrxjs.io/learn-rxjs/operators/transformation/exhaustmap). If your call returns an observable that does not complete after the first value is emitted, any changes to the input params will be ignored. Either specify [switchMap](https://www.learnrxjs.io/learn-rxjs/operators/transformation/switchmap) as mapPipe, or use [take(1)](https://www.learnrxjs.io/learn-rxjs/operators/filtering/take) or [first()](https://www.learnrxjs.io/learn-rxjs/operators/filtering/first) as part of your call.

| Param | Type | Description |
| --- | --- | --- |
Expand Down
12 changes: 9 additions & 3 deletions libs/ngrx-traits/signals/src/lib/with-calls/with-calls.model.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
import { Signal } from '@angular/core';
import { Observable } from 'rxjs';

export type Call<Param extends any = any, Result = any> =
| (() => Observable<Result> | Promise<Result>)
| ((arg: Param) => Observable<Result> | Promise<Result>);
export type ObservableCall<Param = any, Result = any> =
| (() => Observable<Result>)
| ((arg: Param) => Observable<Result>);
export type PromiseCall<Param = any, Result = any> =
| (() => Promise<Result>)
| ((arg: Param) => Promise<Result>);
export type Call<Param = any, Result = any> =
| ObservableCall<Param, Result>
| PromiseCall<Param, Result>;
export type CallConfig<
Param = any,
Result = any,
Expand Down
174 changes: 173 additions & 1 deletion libs/ngrx-traits/signals/src/lib/with-calls/with-calls.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { BehaviorSubject, Subject, tap, throwError } from 'rxjs';
import { typedCallConfig, withCalls } from '../index';

describe('withCalls', () => {
const apiResponse = new Subject<string>();
let apiResponse = new Subject<string>();
const onSuccess = jest.fn();
const onError = jest.fn();
const Store = signalStore(
Expand Down Expand Up @@ -79,7 +79,9 @@ describe('withCalls', () => {
apiResponse.next('test');
expect(store.isTestCallLoaded()).toBeTruthy();
expect(store.testCallResult()).toBe('test');
apiResponse.complete();

apiResponse = new Subject<string>();
param.set({ ok: true });
TestBed.flushEffects();
expect(store.isTestCallLoading()).toBeTruthy();
Expand All @@ -100,7 +102,9 @@ describe('withCalls', () => {
apiResponse.next('test');
expect(store.isTestCallLoaded()).toBeTruthy();
expect(store.testCallResult()).toBe('test');
apiResponse.complete();

apiResponse = new Subject<string>();
param.next({ ok: true });
expect(store.isTestCallLoading()).toBeTruthy();
apiResponse.next('test2');
Expand Down Expand Up @@ -335,4 +339,172 @@ describe('withCalls', () => {
});
});
});

it('returning an observable should update when value changes ', async () => {
TestBed.runInInjectionContext(() => {
const store = new Store();
expect(store.isTestCallLoading()).toBeFalsy();
const param = new BehaviorSubject({ ok: true });

store.testCall(param);
expect(store.isTestCallLoading()).toBeTruthy();
apiResponse.next('test');
expect(store.isTestCallLoaded()).toBeTruthy();
expect(store.testCallResult()).toBe('test');

apiResponse.next('test2');
expect(store.isTestCallLoaded()).toBeTruthy();
expect(store.testCallResult()).toBe('test2');

expect(apiResponse.observed).toBe(true);
});
});

it('returning an observable should update to error state when it errors', async () => {
TestBed.runInInjectionContext(() => {
const store = new Store();
expect(store.isTestCallLoading()).toBeFalsy();
const param = new BehaviorSubject({ ok: true });

store.testCall(param);
expect(store.isTestCallLoading()).toBeTruthy();
apiResponse.next('test');
expect(store.isTestCallLoaded()).toBeTruthy();
expect(store.testCallResult()).toBe('test');

apiResponse.error(new Error('fail'));
expect(store.testCallError()).toEqual(new Error('fail'));
expect(store.testCallResult()).toBe('test');

expect(apiResponse.observed).toBe(false);
});
});

it('returning a promise should output when value returns', async () => {
let response: Promise<string>;
TestBed.runInInjectionContext(async () => {
const Store = signalStore(
withCalls(() => ({
testCall: () => {
response = Promise.resolve('test');
return response;
},
})),
);
const store = new Store();
expect(store.isTestCallLoading()).toBeFalsy();

store.testCall();
expect(store.isTestCallLoading()).toBeTruthy();
apiResponse.next('test');
apiResponse.complete();
await response;
expect(store.isTestCallLoaded()).toBeTruthy();
expect(store.testCallResult()).toBe('test');
});
});

it("should warn in dev mode if no callConfig is used when using an observable that doesn't complete within 100ms", () => {
const consoleWarn = jest.spyOn(console, 'warn').mockImplementation(() => {
/* Empty */
});
TestBed.runInInjectionContext(async () => {
const Store = signalStore(
withCalls(() => ({
testCall: () => apiResponse,
})),
);
const store = new Store();
expect(store.isTestCallLoading()).toBeFalsy();

store.testCall();
expect(store.isTestCallLoading()).toBeTruthy();
apiResponse.next('test');
expect(store.isTestCallLoaded()).toBeTruthy();
expect(store.testCallResult()).toBe('test');

await new Promise((resolve) => setTimeout(resolve, 100));
expect(consoleWarn).toHaveBeenCalledTimes(1);
});
});

it("should warn in dev mode if a mapPipe has not been set when using an observable that doesn't complete within 100ms", () => {
const consoleWarn = jest.spyOn(console, 'warn').mockImplementation(() => {
/* Empty */
});
TestBed.runInInjectionContext(async () => {
const Store = signalStore(
withCalls(() => ({
testCall: typedCallConfig({
call: () => apiResponse,
}),
})),
);
const store = new Store();
expect(store.isTestCallLoading()).toBeFalsy();

store.testCall();
expect(store.isTestCallLoading()).toBeTruthy();
apiResponse.next('test');
expect(store.isTestCallLoaded()).toBeTruthy();
expect(store.testCallResult()).toBe('test');

await new Promise((resolve) => setTimeout(resolve, 100));
expect(consoleWarn).toHaveBeenCalledTimes(1);
});
});

it("should not warn in dev mode if a mapPipe has been set when using an observable that doesn't complete within 100ms", () => {
const consoleWarn = jest.spyOn(console, 'warn').mockImplementation(() => {
/* Empty */
});
TestBed.runInInjectionContext(async () => {
const Store = signalStore(
withCalls(() => ({
testCall: typedCallConfig({
call: () => apiResponse,
mapPipe: 'switchMap',
}),
})),
);
const store = new Store();
expect(store.isTestCallLoading()).toBeFalsy();

store.testCall();
expect(store.isTestCallLoading()).toBeTruthy();
apiResponse.next('test');
expect(store.isTestCallLoaded()).toBeTruthy();
expect(store.testCallResult()).toBe('test');

await new Promise((resolve) => setTimeout(resolve, 100));
expect(consoleWarn).not.toHaveBeenCalledTimes(1);
});
});

it("should not warn in dev mode if a mapPipe has been explicitly set to exhaustMap when using an observable that doesn't complete within 100ms", () => {
const consoleWarn = jest.spyOn(console, 'warn').mockImplementation(() => {
/* Empty */
});
TestBed.runInInjectionContext(async () => {
const Store = signalStore(
withCalls(() => ({
testCall: typedCallConfig({
call: () => apiResponse,
mapPipe: 'exhaustMap',
}),
})),
);
const store = new Store();
expect(store.isTestCallLoading()).toBeFalsy();

store.testCall();
expect(store.isTestCallLoading()).toBeTruthy();
apiResponse.next('test');
expect(store.isTestCallLoaded()).toBeTruthy();
expect(store.testCallResult()).toBe('test');

await new Promise((resolve) => setTimeout(resolve, 100));
expect(consoleWarn).not.toHaveBeenCalledTimes(1);
});
});
});
81 changes: 74 additions & 7 deletions libs/ngrx-traits/signals/src/lib/with-calls/with-calls.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import {
computed,
DestroyRef,
EnvironmentInjector,
inject,
isDevMode,
runInInjectionContext,
Signal,
} from '@angular/core';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
import {
patchState,
signalStoreFeature,
Expand All @@ -24,13 +27,21 @@ import {
catchError,
concatMap,
exhaustMap,
first,
finalize,
from,
ignoreElements,
map,
merge,
Observable,
of,
pipe,
share,
Subject,
switchMap,
take,
takeUntil,
tap,
timer,
} from 'rxjs';

import {
Expand All @@ -44,6 +55,7 @@ import {
ExtractCallResultType,
NamedCallsStatusComputed,
NamedCallsStatusErrorComputed,
ObservableCall,
} from './with-calls.model';
import { getWithCallKeys } from './with-calls.util';

Expand Down Expand Up @@ -97,6 +109,7 @@ import { getWithCallKeys } from './with-calls.util';
* store.loadProductDetail // ({id: string} | Signal<{id: string}> | Observable<{id: string}>) => void
* store.checkout // () => void
*
* @warning The default mapPipe is {@link https://www.learnrxjs.io/learn-rxjs/operators/transformation/exhaustmap exhaustMap}. If your call returns an observable that does not complete after the first value is emitted, any changes to the input params will be ignored. Either specify {@link https://www.learnrxjs.io/learn-rxjs/operators/transformation/switchmap switchMap} as mapPipe, or use {@link https://www.learnrxjs.io/learn-rxjs/operators/filtering/take take(1)} or {@link https://www.learnrxjs.io/learn-rxjs/operators/filtering/first first()} as part of your call.
*/
export function withCalls<
Input extends SignalStoreFeatureResult,
Expand Down Expand Up @@ -233,14 +246,14 @@ export function withCalls<
[callStatusKey]: { error },
} as any);

const callFn = getCallFn(callName, call);

acc[callNameKey] = rxMethod<unknown[]>(
pipe(
mapPipe((params) => {
setLoading();
return runInInjectionContext(environmentInjector, () => {
return from(
isCallConfig(call) ? call.call(params) : call(params),
).pipe(
return callFn(params).pipe(
map((result) => {
if (
!isCallConfig(call) ||
Expand All @@ -255,7 +268,7 @@ export function withCalls<
call.onSuccess &&
call.onSuccess(result, params);
}),
first(),
takeUntilDestroyed(),
catchError((error: unknown) => {
const e =
(isCallConfig(call) &&
Expand Down Expand Up @@ -283,7 +296,9 @@ export function withCalls<
};
}

function isCallConfig(call: Call | CallConfig): call is CallConfig {
function isCallConfig<Param, Result>(
call: Call<Param, Result> | CallConfig<Param, Result>,
): call is CallConfig<Param, Result> {
return typeof call === 'object';
}
const mapPipes = {
Expand All @@ -293,7 +308,7 @@ const mapPipes = {
};

export function typedCallConfig<
Param extends any = undefined,
Param = undefined,
Result = any,
PropName extends string = '',
Error = unknown,
Expand All @@ -312,3 +327,55 @@ export function typedCallConfig<
// when CallConfig resultProp was defined
return { ...config, resultProp: config.resultProp ?? '' } as C;
}

function getCallFn<Param, Result>(
callName: string,
call: Call<Param, Result> | CallConfig<Param, Result>,
): ObservableCall<Param, Result> {
if (isCallConfig(call)) {
if (!call.mapPipe) {
return wrapMapPipeWarning(callName, call.call);
} else {
return (params) => from(call.call(params));
}
} else {
return wrapMapPipeWarning(callName, call);
}
}

/**
* @private
* Wraps a call with a warning mechanism in dev mode.
* If the call does not complete after the first value, it logs a warning
* indicating that the default "exhaustMap" pipe is being used.
*/
function wrapMapPipeWarning<Param, Result>(
callName: string,
call: Call<Param, Result>,
): ObservableCall<Param, Result> {
if (isDevMode()) {
return (params) => {
const source$ = from(call(params)).pipe(
finalize(() => completed$.next()),
share(),
);
const completed$ = new Subject<void>();

const monitor$ = source$.pipe(
take(1),
switchMap(() => timer(100)),
tap(() => {
console.warn(
`withCalls "${callName}" did not complete after the first value and is using the default "exhaustMap" pipe. This means that subsequent values from params will be ignored until the observable completes. Please specify the mapPipe operator or use an observable that completes to avoid this warning.`,
);
}),
takeUntil(completed$),
ignoreElements(),
);

return merge(monitor$, source$);
};
} else {
return (params) => from(call(params));
}
}

0 comments on commit 679c12e

Please sign in to comment.