Skip to content

Commit

Permalink
Merge pull request #2023 from FaberVitale/feat/alm-improve-task-abort…
Browse files Browse the repository at this point in the history
…-error
  • Loading branch information
markerikson authored Feb 12, 2022
2 parents 5c398cd + 75e8275 commit 1635f8c
Show file tree
Hide file tree
Showing 7 changed files with 152 additions and 37 deletions.
19 changes: 16 additions & 3 deletions packages/action-listener-middleware/src/exceptions.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
export class TaskAbortError implements Error {
import type { SerializedError } from '@reduxjs/toolkit'

const task = 'task'
const listener = 'listener'
const completed = 'completed'
const cancelled = 'cancelled'

/* TaskAbortError error codes */
export const taskCancelled = `${task}-${cancelled}` as const
export const taskCompleted = `${task}-${completed}` as const
export const listenerCancelled = `${listener}-${cancelled}` as const
export const listenerCompleted = `${listener}-${completed}` as const

export class TaskAbortError implements SerializedError {
name = 'TaskAbortError'
message = ''
constructor(public reason = 'unknown') {
this.message = `task cancelled (reason: ${reason})`
constructor(public code = 'unknown') {
this.message = `task cancelled (reason: ${code})`
}
}
49 changes: 31 additions & 18 deletions packages/action-listener-middleware/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,18 @@ import type {
TypedRemoveListener,
TypedStopListening,
} from './types'
import { assertFunction, catchRejection } from './utils'
import { TaskAbortError } from './exceptions'
import {
abortControllerWithReason,
assertFunction,
catchRejection,
} from './utils'
import {
listenerCancelled,
listenerCompleted,
TaskAbortError,
taskCancelled,
taskCompleted,
} from './exceptions'
import {
runTask,
promisifyAbortSignal,
Expand Down Expand Up @@ -75,21 +85,24 @@ const createFork = (parentAbortSignal: AbortSignal) => {
assertFunction(taskExecutor, 'taskExecutor')
const childAbortController = new AbortController()
const cancel = () => {
childAbortController.abort()
abortControllerWithReason(childAbortController, taskCancelled)
}

const result = runTask<T>(async (): Promise<T> => {
validateActive(parentAbortSignal)
validateActive(childAbortController.signal)
const result = (await taskExecutor({
pause: createPause(childAbortController.signal),
delay: createDelay(childAbortController.signal),
signal: childAbortController.signal,
})) as T
validateActive(parentAbortSignal)
validateActive(childAbortController.signal)
return result
}, cancel)
const result = runTask<T>(
async (): Promise<T> => {
validateActive(parentAbortSignal)
validateActive(childAbortController.signal)
const result = (await taskExecutor({
pause: createPause(childAbortController.signal),
delay: createDelay(childAbortController.signal),
signal: childAbortController.signal,
})) as T
validateActive(parentAbortSignal)
validateActive(childAbortController.signal)
return result
},
() => abortControllerWithReason(childAbortController, taskCompleted)
)

return {
result,
Expand Down Expand Up @@ -211,7 +224,7 @@ const createClearListenerMiddleware = (
return () => {
listenerMap.forEach((entry) => {
entry.pending.forEach((controller) => {
controller.abort()
abortControllerWithReason(controller, listenerCancelled)
})
})

Expand Down Expand Up @@ -363,7 +376,7 @@ export function createListenerMiddleware<
cancelActiveListeners: () => {
entry.pending.forEach((controller, _, set) => {
if (controller !== internalTaskController) {
controller.abort()
abortControllerWithReason(controller, listenerCancelled)
set.delete(controller)
}
})
Expand All @@ -378,7 +391,7 @@ export function createListenerMiddleware<
})
}
} finally {
internalTaskController.abort() // Notify that the task has completed
abortControllerWithReason(internalTaskController, listenerCompleted) // Notify that the task has completed
entry.pending.delete(internalTaskController)
}
}
Expand Down
13 changes: 6 additions & 7 deletions packages/action-listener-middleware/src/task.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
import { TaskAbortError } from './exceptions'
import type { TaskResult } from './types'
import { noop, catchRejection } from './utils'
import type { AbortSignalWithReason, TaskResult } from './types'
import { catchRejection } from './utils'

/**
* Synchronously raises {@link TaskAbortError} if the task tied to the input `signal` has been cancelled.
* @param signal
* @param reason
* @see {TaskAbortError}
*/
export const validateActive = (signal: AbortSignal, reason?: string): void => {
export const validateActive = (signal: AbortSignal): void => {
if (signal.aborted) {
throw new TaskAbortError(reason)
throw new TaskAbortError((signal as AbortSignalWithReason<string>).reason)
}
}

Expand All @@ -20,12 +20,11 @@ export const validateActive = (signal: AbortSignal, reason?: string): void => {
* @returns
*/
export const promisifyAbortSignal = (
signal: AbortSignal,
reason?: string
signal: AbortSignalWithReason<string>
): Promise<never> => {
return catchRejection(
new Promise<never>((_, reject) => {
const notifyRejection = () => reject(new TaskAbortError(reason))
const notifyRejection = () => reject(new TaskAbortError(signal.reason))

if (signal.aborted) {
notifyRejection()
Expand Down
25 changes: 17 additions & 8 deletions packages/action-listener-middleware/src/tests/fork.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { configureStore, createSlice } from '@reduxjs/toolkit'
import type { PayloadAction } from '@reduxjs/toolkit'
import type { ForkedTaskExecutor, TaskResult } from '../types'
import { createListenerMiddleware, TaskAbortError } from '../index'
import { listenerCancelled, taskCancelled } from '../exceptions'

function delay(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms))
Expand Down Expand Up @@ -122,7 +123,9 @@ describe('fork', () => {
store.dispatch(increment())
store.dispatch(increment())

expect(await deferredForkedTaskError).toEqual(new TaskAbortError())
expect(await deferredForkedTaskError).toEqual(
new TaskAbortError(listenerCancelled)
)
})

it('synchronously throws TypeError error if the provided executor is not a function', () => {
Expand Down Expand Up @@ -193,7 +196,10 @@ describe('fork', () => {
desc: 'sync exec - sync cancel',
executor: () => 42,
cancelAfterMs: -1,
expected: { status: 'cancelled', error: new TaskAbortError() },
expected: {
status: 'cancelled',
error: new TaskAbortError(taskCancelled),
},
},
{
desc: 'sync exec - async cancel',
Expand All @@ -208,7 +214,10 @@ describe('fork', () => {
throw new Error('2020')
},
cancelAfterMs: 10,
expected: { status: 'cancelled', error: new TaskAbortError() },
expected: {
status: 'cancelled',
error: new TaskAbortError(taskCancelled),
},
},
{
desc: 'async exec - success',
Expand Down Expand Up @@ -300,7 +309,7 @@ describe('fork', () => {

expect(await deferredResult).toEqual({
status: 'cancelled',
error: new TaskAbortError(),
error: new TaskAbortError(taskCancelled),
})
})

Expand Down Expand Up @@ -357,12 +366,12 @@ describe('fork', () => {
actionCreator: increment,
effect: async (_, listenerApi) => {
const forkedTask = listenerApi.fork(async (forkApi) => {
await forkApi.pause(delay(30))
await forkApi.pause(delay(1_000))

return 4
})

await listenerApi.delay(10)
await Promise.resolve()
forkedTask.cancel()
deferredResult.resolve(await forkedTask.result)
},
Expand All @@ -372,7 +381,7 @@ describe('fork', () => {

expect(await deferredResult).toEqual({
status: 'cancelled',
error: new TaskAbortError(),
error: new TaskAbortError(taskCancelled),
})
})

Expand All @@ -396,7 +405,7 @@ describe('fork', () => {

expect(await deferredResult).toEqual({
status: 'cancelled',
error: new TaskAbortError(),
error: new TaskAbortError(listenerCancelled),
})
})
})
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,12 @@ import type {
Unsubscribe,
ListenerMiddleware,
} from '../index'
import type { AddListenerOverloads, TypedRemoveListener } from '../types'
import type {
AbortSignalWithReason,
AddListenerOverloads,
TypedRemoveListener,
} from '../types'
import { listenerCancelled, listenerCompleted } from '../exceptions'

const middlewareApi = {
getState: expect.any(Function),
Expand Down Expand Up @@ -537,6 +542,36 @@ describe('createListenerMiddleware', () => {
}
)

test('listenerApi.signal has correct reason when listener is cancelled or completes', async () => {
const notifyDeferred = createAction<Deferred<string>>('notify-deferred')

startListening({
actionCreator: notifyDeferred,
async effect({ payload }, { signal, cancelActiveListeners, delay }) {
signal.addEventListener(
'abort',
() => {
payload.resolve((signal as AbortSignalWithReason<string>).reason)
},
{ once: true }
)

cancelActiveListeners()
delay(10)
},
})

const deferredCancelledSignalReason = store.dispatch(
notifyDeferred(deferred<string>())
).payload
const deferredCompletedSignalReason = store.dispatch(
notifyDeferred(deferred<string>())
).payload

expect(await deferredCancelledSignalReason).toBe(listenerCancelled)
expect(await deferredCompletedSignalReason).toBe(listenerCompleted)
})

test('"can unsubscribe via middleware api', () => {
const effect = jest.fn(
(action: TestAction1, api: ListenerEffectAPI<any, any>) => {
Expand Down
6 changes: 6 additions & 0 deletions packages/action-listener-middleware/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ import type {
} from '@reduxjs/toolkit'
import type { TaskAbortError } from './exceptions'

/**
* @internal
* At the time of writing `lib.dom.ts` does not provide `abortSignal.reason`.
*/
export type AbortSignalWithReason<T> = AbortSignal & { reason?: T }

/**
* Types copied from RTK
*/
Expand Down
40 changes: 40 additions & 0 deletions packages/action-listener-middleware/src/utils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import type { AbortSignalWithReason } from './types'

export const assertFunction: (
func: unknown,
expected: string
Expand All @@ -20,3 +22,41 @@ export const catchRejection = <T>(

return promise
}

/**
* Calls `abortController.abort(reason)` and patches `signal.reason`.
* if it is not supported.
*
* At the time of writing `signal.reason` is available in FF chrome, edge node 17 and deno.
* @param abortController
* @param reason
* @returns
* @see https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal/reason
*/
export const abortControllerWithReason = <T>(
abortController: AbortController,
reason: T
): void => {
type Consumer<T> = (val: T) => void

const signal = abortController.signal as AbortSignalWithReason<T>

if (signal.aborted) {
return
}

// Patch `reason` if necessary.
// - We use defineProperty here because reason is a getter of `AbortSignal.__proto__`.
// - We need to patch 'reason' before calling `.abort()` because listeners to the 'abort'
// event are are notified immediately.
if (!('reason' in signal)) {
Object.defineProperty(signal, 'reason', {
enumerable: true,
value: reason,
configurable: true,
writable: true,
})
}

;(abortController.abort as Consumer<typeof reason>)(reason)
}

0 comments on commit 1635f8c

Please sign in to comment.