Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch RTKQ "subscribe on reject" actions, and improve cache collection timer handling #2599

Merged
merged 4 commits into from
Aug 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions packages/toolkit/src/query/core/buildMiddleware/batchActions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import type { QueryThunk, RejectedAction } from '../buildThunks'
import type { SubMiddlewareBuilder } from './types'

// Copied from https://github.com/feross/queue-microtask
let promise: Promise<any>
const queueMicrotaskShim =
typeof queueMicrotask === 'function'
? queueMicrotask.bind(typeof window !== 'undefined' ? window : global)
: // reuse resolved promise, and allocate it lazily
(cb: () => void) =>
(promise || (promise = Promise.resolve())).then(cb).catch((err: any) =>
setTimeout(() => {
throw err
}, 0)
)

export const build: SubMiddlewareBuilder = ({
api,
context: { apiUid },
queryThunk,
reducerPath,
}) => {
return (mwApi) => {
let abortedQueryActionsQueue: RejectedAction<QueryThunk, any>[] = []
let dispatchQueued = false

return (next) => (action) => {
if (queryThunk.rejected.match(action)) {
const { condition, arg } = action.meta

if (condition && arg.subscribe) {
// request was aborted due to condition (another query already running)
// _Don't_ dispatch right away - queue it for a debounced grouped dispatch
abortedQueryActionsQueue.push(action)

if (!dispatchQueued) {
queueMicrotaskShim(() => {
mwApi.dispatch(
api.internalActions.subscriptionRequestsRejected(
abortedQueryActionsQueue
)
)
abortedQueryActionsQueue = []
})
dispatchQueued = true
}
// _Don't_ let the action reach the reducers now!
return
}
}

const result = next(action)

return result
}
}
}
46 changes: 35 additions & 11 deletions packages/toolkit/src/query/core/buildMiddleware/cacheCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ import type {

export type ReferenceCacheCollection = never

function isObjectEmpty(obj: Record<any, any>) {
// Apparently a for..in loop is faster than `Object.keys()` here:
// https://stackoverflow.com/a/59787784/62937
for (let k in obj) {
// If there is at least one key, it's not empty
return false
}
return true
}

declare module '../../endpointDefinitions' {
interface QueryExtraOptions<
TagTypes extends string,
Expand Down Expand Up @@ -38,6 +48,15 @@ export const THIRTY_TWO_BIT_MAX_TIMER_SECONDS = 2_147_483_647 / 1_000 - 1
export const build: SubMiddlewareBuilder = ({ reducerPath, api, context }) => {
const { removeQueryResult, unsubscribeQueryResult } = api.internalActions

function anySubscriptionsRemainingForKey(
queryCacheKey: string,
api: SubMiddlewareApi
) {
const subscriptions =
api.getState()[reducerPath].subscriptions[queryCacheKey]
return !!subscriptions && !isObjectEmpty(subscriptions)
}

return (mwApi) => {
const currentRemovalTimeouts: QueryStateMeta<TimeoutId> = {}

Expand Down Expand Up @@ -94,6 +113,11 @@ export const build: SubMiddlewareBuilder = ({ reducerPath, api, context }) => {
] as QueryDefinition<any, any, any, any>
const keepUnusedDataFor =
endpointDefinition?.keepUnusedDataFor ?? config.keepUnusedDataFor

if (keepUnusedDataFor === Infinity) {
// Hey, user said keep this forever!
return
}
// Prevent `setTimeout` timers from overflowing a 32-bit internal int, by
// clamping the max value to be at most 1000ms less than the 32-bit max.
// Look, a 24.8-day keepalive ought to be enough for anybody, right? :)
Expand All @@ -103,18 +127,18 @@ export const build: SubMiddlewareBuilder = ({ reducerPath, api, context }) => {
Math.min(keepUnusedDataFor, THIRTY_TWO_BIT_MAX_TIMER_SECONDS)
)

const currentTimeout = currentRemovalTimeouts[queryCacheKey]
if (currentTimeout) {
clearTimeout(currentTimeout)
}
currentRemovalTimeouts[queryCacheKey] = setTimeout(() => {
const subscriptions =
api.getState()[reducerPath].subscriptions[queryCacheKey]
if (!subscriptions || Object.keys(subscriptions).length === 0) {
api.dispatch(removeQueryResult({ queryCacheKey }))
if (!anySubscriptionsRemainingForKey(queryCacheKey, api)) {
const currentTimeout = currentRemovalTimeouts[queryCacheKey]
if (currentTimeout) {
clearTimeout(currentTimeout)
}
delete currentRemovalTimeouts![queryCacheKey]
}, finalKeepUnusedDataFor * 1000)
currentRemovalTimeouts[queryCacheKey] = setTimeout(() => {
if (!anySubscriptionsRemainingForKey(queryCacheKey, api)) {
api.dispatch(removeQueryResult({ queryCacheKey }))
}
delete currentRemovalTimeouts![queryCacheKey]
}, finalKeepUnusedDataFor * 1000)
}
}
}
}
2 changes: 2 additions & 0 deletions packages/toolkit/src/query/core/buildMiddleware/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import { build as buildWindowEventHandling } from './windowEventHandling'
import { build as buildCacheLifecycle } from './cacheLifecycle'
import { build as buildQueryLifecycle } from './queryLifecycle'
import { build as buildDevMiddleware } from './devMiddleware'
import { build as buildBatchActions } from './batchActions'

export function buildMiddleware<
Definitions extends EndpointDefinitions,
Expand All @@ -38,6 +39,7 @@ export function buildMiddleware<
buildWindowEventHandling,
buildCacheLifecycle,
buildQueryLifecycle,
buildBatchActions,
].map((build) =>
build({
...(input as any as BuildMiddlewareInput<
Expand Down
33 changes: 21 additions & 12 deletions packages/toolkit/src/query/core/buildSlice.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import type {
ConfigState,
} from './apiState'
import { QueryStatus } from './apiState'
import type { MutationThunk, QueryThunk } from './buildThunks'
import type { MutationThunk, QueryThunk, RejectedAction } from './buildThunks'
import { calculateProvidedByThunk } from './buildThunks'
import type {
AssertTagTypes,
Expand Down Expand Up @@ -387,6 +387,26 @@ export function buildSlice({
delete draft[queryCacheKey]![requestId]
}
},
subscriptionRequestsRejected(
draft,
action: PayloadAction<RejectedAction<QueryThunk, any>[]>
) {
// We need to process "rejected" actions caused by a component trying to start a subscription
// after there's already a cache entry. Since many components may mount at once and all want
// the same data, we use a middleware that intercepts those actions batches these together
// into a single larger action , and we'll process all of them at once.
for (let rejectedAction of action.payload) {
const {
meta: { condition, arg, requestId },
} = rejectedAction
// request was aborted due to condition (another query already running)
if (condition && arg.subscribe) {
const substate = (draft[arg.queryCacheKey] ??= {})
substate[requestId] =
arg.subscriptionOptions ?? substate[requestId] ?? {}
}
}
},
},
extraReducers: (builder) => {
builder
Expand All @@ -403,17 +423,6 @@ export function buildSlice({
arg.subscriptionOptions ?? substate[requestId] ?? {}
}
})
.addCase(
queryThunk.rejected,
(draft, { meta: { condition, arg, requestId }, error, payload }) => {
// request was aborted due to condition (another query already running)
if (condition && arg.subscribe) {
const substate = (draft[arg.queryCacheKey] ??= {})
substate[requestId] =
arg.subscriptionOptions ?? substate[requestId] ?? {}
}
}
)
// update the state to be a new object to be picked up as a "state change"
// by redux-persist's `autoMergeLevel2`
.addMatcher(hasRehydrationInfo, (draft) => ({ ...draft }))
Expand Down
37 changes: 32 additions & 5 deletions packages/toolkit/src/query/react/buildHooks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -696,16 +696,25 @@ export function buildHooks<Definitions extends EndpointDefinitions>({
pollingInterval,
})

const lastRenderHadSubscription = useRef(false)

const promiseRef = useRef<QueryActionCreatorResult<any>>()

let { queryCacheKey, requestId } = promiseRef.current || {}
const subscriptionRemoved = useSelector(
const currentRenderHasSubscription = useSelector(
(state: RootState<Definitions, string, string>) =>
!!queryCacheKey &&
!!requestId &&
!state[api.reducerPath].subscriptions[queryCacheKey]?.[requestId]
)

const subscriptionRemoved =
!currentRenderHasSubscription && lastRenderHadSubscription.current

usePossiblyImmediateEffect(() => {
lastRenderHadSubscription.current = currentRenderHasSubscription
})

usePossiblyImmediateEffect((): void | undefined => {
promiseRef.current = undefined
}, [subscriptionRemoved])
Expand Down Expand Up @@ -736,6 +745,7 @@ export function buildHooks<Definitions extends EndpointDefinitions>({
forceRefetch: refetchOnMountOrArgChange,
})
)

promiseRef.current = promise
} else if (stableSubscriptionOptions !== lastSubscriptionOptions) {
lastPromise.updateSubscriptionOptions(stableSubscriptionOptions)
Expand Down Expand Up @@ -923,8 +933,9 @@ export function buildHooks<Definitions extends EndpointDefinitions>({
...options,
})

const { data, status, isLoading, isSuccess, isError, error } = queryStateResults;
useDebugValue({ data, status, isLoading, isSuccess, isError, error });
const { data, status, isLoading, isSuccess, isError, error } =
queryStateResults
useDebugValue({ data, status, isLoading, isSuccess, isError, error })

return useMemo(
() => ({ ...queryStateResults, ...querySubscriptionResults }),
Expand Down Expand Up @@ -993,8 +1004,24 @@ export function buildHooks<Definitions extends EndpointDefinitions>({
})
}, [dispatch, fixedCacheKey, promise, requestId])

const { endpointName, data, status, isLoading, isSuccess, isError, error } = currentState;
useDebugValue({ endpointName, data, status, isLoading, isSuccess, isError, error });
const {
endpointName,
data,
status,
isLoading,
isSuccess,
isError,
error,
} = currentState
useDebugValue({
endpointName,
data,
status,
isLoading,
isSuccess,
isError,
error,
})

const finalState = useMemo(
() => ({ ...currentState, originalArgs, reset }),
Expand Down
15 changes: 9 additions & 6 deletions packages/toolkit/src/query/tests/buildHooks.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -1162,9 +1162,12 @@ describe('hooks tests', () => {
})

test('useMutation return value contains originalArgs', async () => {
const { result } = renderHook(() => api.endpoints.updateUser.useMutation(), {
wrapper: storeRef.wrapper,
})
const { result } = renderHook(
() => api.endpoints.updateUser.useMutation(),
{
wrapper: storeRef.wrapper,
}
)
const arg = { name: 'Foo' }

const firstRenderResult = result.current
Expand Down Expand Up @@ -1955,13 +1958,13 @@ describe('hooks with createApi defaults set', () => {

const addBtn = screen.getByTestId('addPost')

await waitFor(() => expect(getRenderCount()).toBe(3))
await waitFor(() => expect(getRenderCount()).toBe(4))

fireEvent.click(addBtn)
await waitFor(() => expect(getRenderCount()).toBe(5))
await waitFor(() => expect(getRenderCount()).toBe(6))
fireEvent.click(addBtn)
fireEvent.click(addBtn)
await waitFor(() => expect(getRenderCount()).toBe(7))
await waitFor(() => expect(getRenderCount()).toBe(8))
})

test('useQuery with selectFromResult option serves a deeply memoized value and does not rerender unnecessarily', async () => {
Expand Down
15 changes: 14 additions & 1 deletion packages/toolkit/src/query/tests/cacheCollection.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ describe(`query: await cleanup, keepUnusedDataFor set`, () => {
query: () => '/success',
keepUnusedDataFor: 0,
}),
query4: build.query<unknown, string>({
query: () => '/success',
keepUnusedDataFor: Infinity,
}),
}),
keepUnusedDataFor: 29,
})
Expand All @@ -126,9 +130,18 @@ describe(`query: await cleanup, keepUnusedDataFor set`, () => {
expect(onCleanup).not.toHaveBeenCalled()
store.dispatch(api.endpoints.query3.initiate('arg')).unsubscribe()
expect(onCleanup).not.toHaveBeenCalled()
jest.advanceTimersByTime(1), await waitMs()
jest.advanceTimersByTime(1)
await waitMs()
expect(onCleanup).toHaveBeenCalled()
})

test('endpoint keepUnusedDataFor: Infinity', async () => {
expect(onCleanup).not.toHaveBeenCalled()
store.dispatch(api.endpoints.query4.initiate('arg')).unsubscribe()
expect(onCleanup).not.toHaveBeenCalled()
jest.advanceTimersByTime(THIRTY_TWO_BIT_MAX_INT)
expect(onCleanup).not.toHaveBeenCalled()
})
})

function storeForApi<
Expand Down
Loading