Skip to content

Commit

Permalink
cancel execution on triggered abort signal despite hanging async reso…
Browse files Browse the repository at this point in the history
…lvers (#4267)

Prior to this pull request, cancellation worked by checking the abort signal status during execution, and throwing the reason if the abort signal has been triggered. This fails if an asynchronous resolver hangs.

This pull request changes the cancellation method to wrap promises returned by resolvers so that they immediately reject on cancellation.
  • Loading branch information
yaacovCR authored Oct 31, 2024
1 parent 0ffc1e1 commit e4d7e85
Show file tree
Hide file tree
Showing 5 changed files with 409 additions and 56 deletions.
6 changes: 6 additions & 0 deletions src/execution/IncrementalPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { pathToArray } from '../jsutils/Path.js';
import type { GraphQLError } from '../error/GraphQLError.js';

import { IncrementalGraph } from './IncrementalGraph.js';
import type { PromiseCanceller } from './PromiseCanceller.js';
import type {
CancellableStreamRecord,
CompletedExecutionGroup,
Expand Down Expand Up @@ -43,6 +44,7 @@ export function buildIncrementalResponse(
}

interface IncrementalPublisherContext {
promiseCanceller: PromiseCanceller | undefined;
cancellableStreams: Set<CancellableStreamRecord> | undefined;
}

Expand Down Expand Up @@ -125,6 +127,7 @@ class IncrementalPublisher {
IteratorResult<SubsequentIncrementalExecutionResult, void>
> => {
if (isDone) {
this._context.promiseCanceller?.disconnect();
await this._returnAsyncIteratorsIgnoringErrors();
return { value: undefined, done: true };
}
Expand Down Expand Up @@ -171,6 +174,9 @@ class IncrementalPublisher {
batch = await this._incrementalGraph.nextCompletedBatch();
} while (batch !== undefined);

// TODO: add test for this case
/* c8 ignore next */
this._context.promiseCanceller?.disconnect();
await this._returnAsyncIteratorsIgnoringErrors();
return { value: undefined, done: true };
};
Expand Down
53 changes: 53 additions & 0 deletions src/execution/PromiseCanceller.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { promiseWithResolvers } from '../jsutils/promiseWithResolvers.js';

/**
* A PromiseCanceller object can be used to cancel multiple promises
* using a single AbortSignal.
*
* @internal
*/
export class PromiseCanceller {
abortSignal: AbortSignal;
abort: () => void;

private _aborts: Set<() => void>;

constructor(abortSignal: AbortSignal) {
this.abortSignal = abortSignal;
this._aborts = new Set<() => void>();
this.abort = () => {
for (const abort of this._aborts) {
abort();
}
};

abortSignal.addEventListener('abort', this.abort);
}

disconnect(): void {
this.abortSignal.removeEventListener('abort', this.abort);
}

withCancellation<T>(originalPromise: Promise<T>): Promise<T> {
if (this.abortSignal.aborted) {
// eslint-disable-next-line @typescript-eslint/prefer-promise-reject-errors
return Promise.reject(this.abortSignal.reason);
}

const { promise, resolve, reject } = promiseWithResolvers<T>();
const abort = () => reject(this.abortSignal.reason);
this._aborts.add(abort);
originalPromise.then(
(resolved) => {
this._aborts.delete(abort);
resolve(resolved);
},
(error: unknown) => {
this._aborts.delete(abort);
reject(error);
},
);

return promise;
}
}
56 changes: 56 additions & 0 deletions src/execution/__tests__/PromiseCanceller-test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import { describe, it } from 'mocha';

import { expectPromise } from '../../__testUtils__/expectPromise.js';

import { PromiseCanceller } from '../PromiseCanceller.js';

describe('PromiseCanceller', () => {
it('works to cancel an already resolved promise', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;

const promiseCanceller = new PromiseCanceller(abortSignal);

const promise = Promise.resolve(1);

const withCancellation = promiseCanceller.withCancellation(promise);

abortController.abort(new Error('Cancelled!'));

await expectPromise(withCancellation).toRejectWith('Cancelled!');
});

it('works to cancel a hanging promise', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;

const promiseCanceller = new PromiseCanceller(abortSignal);

const promise = new Promise(() => {
/* never resolves */
});

const withCancellation = promiseCanceller.withCancellation(promise);

abortController.abort(new Error('Cancelled!'));

await expectPromise(withCancellation).toRejectWith('Cancelled!');
});

it('works to cancel a hanging promise created after abort signal triggered', async () => {
const abortController = new AbortController();
const abortSignal = abortController.signal;

abortController.abort(new Error('Cancelled!'));

const promiseCanceller = new PromiseCanceller(abortSignal);

const promise = new Promise(() => {
/* never resolves */
});

const withCancellation = promiseCanceller.withCancellation(promise);

await expectPromise(withCancellation).toRejectWith('Cancelled!');
});
});
Loading

0 comments on commit e4d7e85

Please sign in to comment.