diff --git a/.changeset/lucky-dodos-chew.md b/.changeset/lucky-dodos-chew.md new file mode 100644 index 00000000000..1ea21c1aee8 --- /dev/null +++ b/.changeset/lucky-dodos-chew.md @@ -0,0 +1,5 @@ +--- +"@whatwg-node/server": patch +--- + +Fix UWS's behavior in case of request cancellation diff --git a/.changeset/metal-jars-wash.md b/.changeset/metal-jars-wash.md new file mode 100644 index 00000000000..99ce26cf9c4 --- /dev/null +++ b/.changeset/metal-jars-wash.md @@ -0,0 +1,5 @@ +--- +"@whatwg-node/node-fetch": patch +--- + +Do not throw when Curl request cancellation diff --git a/.changeset/young-trees-call.md b/.changeset/young-trees-call.md new file mode 100644 index 00000000000..57a731cc797 --- /dev/null +++ b/.changeset/young-trees-call.md @@ -0,0 +1,5 @@ +--- +"@whatwg-node/server": patch +--- + +Use ServerResponse's close event to catch request cancellation diff --git a/packages/node-fetch/src/fetchCurl.ts b/packages/node-fetch/src/fetchCurl.ts index ced35005868..322bc5c0882 100644 --- a/packages/node-fetch/src/fetchCurl.ts +++ b/packages/node-fetch/src/fetchCurl.ts @@ -75,7 +75,11 @@ export function fetchCurl( if (fetchRequest['_signal']) { fetchRequest['_signal'].onabort = () => { if (curlHandle.isOpen) { - curlHandle.pause(CurlPause.Recv); + try { + curlHandle.pause(CurlPause.Recv); + } catch (e) { + reject(e); + } } }; } @@ -86,6 +90,9 @@ export function fetchCurl( if (streamResolved && !streamResolved.closed && !streamResolved.destroyed) { streamResolved.destroy(error); } else { + if (error.message === 'Operation was aborted by an application callback') { + error.message = 'The operation was aborted.'; + } reject(error); } curlHandle.close(); diff --git a/packages/server/src/createServerAdapter.ts b/packages/server/src/createServerAdapter.ts index aea1f07d751..0e998a1b858 100644 --- a/packages/server/src/createServerAdapter.ts +++ b/packages/server/src/createServerAdapter.ts @@ -26,6 +26,7 @@ import { isServerResponse, iterateAsyncVoid, NodeRequest, + nodeRequestResponseMap, NodeResponse, normalizeNodeRequest, sendNodeResponse, @@ -198,6 +199,7 @@ function createServerAdapter< waitUntilPromises.push(cb.catch(err => console.error(err))); }, }; + nodeRequestResponseMap.set(nodeRequest, serverResponse); let response$: Response | Promise | undefined; try { response$ = handleNodeRequest(nodeRequest, defaultServerContext as any, ...ctx); @@ -233,15 +235,22 @@ function createServerAdapter< filteredCtxParts.length > 0 ? completeAssign(defaultServerContext, ...ctx) : defaultServerContext; + + const signal = new ServerAdapterRequestAbortSignal(); + const originalResEnd = res.end.bind(res); + let resEnded = false; + res.end = function (data: any) { + resEnded = true; + return originalResEnd(data); + }; + res.onAborted(() => { + signal.sendAbort(); + }); const request = getRequestFromUWSRequest({ req, res, fetchAPI, - }); - let resAborted = false; - res.onAborted(() => { - resAborted = true; - (request.signal as ServerAdapterRequestAbortSignal).sendAbort(); + signal, }); let response$: Response | Promise | undefined; try { @@ -253,18 +262,24 @@ function createServerAdapter< return response$ .catch((e: any) => handleErrorFromRequestHandler(e, fetchAPI.Response)) .then(response => { - if (!resAborted) { - return sendResponseToUwsOpts(res, response); + if (!signal.aborted && !resEnded) { + return sendResponseToUwsOpts(res, response, signal); } }) .catch(err => { - console.error(`Unexpected error while handling request: ${err.message || err}`); + console.error( + `Unexpected error while handling request: \n${err.stack || err.message || err}`, + ); }); } try { - return sendResponseToUwsOpts(res, response$); + if (!signal.aborted && !resEnded) { + return sendResponseToUwsOpts(res, response$, signal); + } } catch (err: any) { - console.error(`Unexpected error while handling request: ${err.message || err}`); + console.error( + `Unexpected error while handling request: \n${err.stack || err.message || err}`, + ); } } diff --git a/packages/server/src/plugins/useErrorHandling.ts b/packages/server/src/plugins/useErrorHandling.ts index cc710b88013..160d948bf17 100644 --- a/packages/server/src/plugins/useErrorHandling.ts +++ b/packages/server/src/plugins/useErrorHandling.ts @@ -5,7 +5,7 @@ import { ServerAdapterPlugin } from './types.js'; export function createDefaultErrorHandler( ResponseCtor: typeof Response = DefaultResponseCtor, ): ErrorHandler { - return function defaultErrorHandler(e: any): Response | Promise { + return function defaultErrorHandler(e: any): Response { if (e.details || e.status || e.headers || e.name === 'HTTPError') { return new ResponseCtor( typeof e.details === 'object' ? JSON.stringify(e.details) : e.message, @@ -16,13 +16,17 @@ export function createDefaultErrorHandler( ); } console.error(e); - if (ResponseCtor.error) { - return ResponseCtor.error(); - } - return new ResponseCtor(null, { status: 500 }); + return createDefaultErrorResponse(ResponseCtor); }; } +function createDefaultErrorResponse(ResponseCtor: typeof Response) { + if (ResponseCtor.error) { + return ResponseCtor.error(); + } + return new ResponseCtor(null, { status: 500 }); +} + export class HTTPError extends Error { name = 'HTTPError'; constructor( @@ -55,11 +59,17 @@ export function useErrorHandling( try { const response$ = requestHandler(request, serverContext); if (isPromise(response$)) { - return response$.catch(e => errorHandler(e, request, serverContext)); + return response$.catch( + e => + errorHandler(e, request, serverContext) || + createDefaultErrorResponse(fetchAPI.Response), + ); } return response$; } catch (e) { - return errorHandler(e, request, serverContext); + return ( + errorHandler(e, request, serverContext) || createDefaultErrorResponse(fetchAPI.Response) + ); } }); }, diff --git a/packages/server/src/utils.ts b/packages/server/src/utils.ts index 4e68e25a4a3..80dbf279def 100644 --- a/packages/server/src/utils.ts +++ b/packages/server/src/utils.ts @@ -116,6 +116,8 @@ export class ServerAdapterRequestAbortSignal extends EventTarget implements Abor let bunNodeCompatModeWarned = false; +export const nodeRequestResponseMap = new WeakMap(); + export function normalizeNodeRequest( nodeRequest: NodeRequest, RequestCtor: typeof Request, @@ -130,30 +132,34 @@ export function normalizeNodeRequest( fullUrl = url.toString(); } - let signal: AbortSignal; + let signal: AbortSignal | undefined; - // If ponyfilled - if (RequestCtor !== globalThis.Request) { - signal = new ServerAdapterRequestAbortSignal(); + const nodeResponse = nodeRequestResponseMap.get(nodeRequest); + nodeRequestResponseMap.delete(nodeRequest); + if (nodeResponse?.once) { + let sendAbortSignal: VoidFunction; - if (rawRequest?.once) { - rawRequest.once('close', () => { - if (rawRequest.aborted) { - (signal as ServerAdapterRequestAbortSignal).sendAbort(); - } - }); - } - } else { - const controller = new AbortController(); - signal = controller.signal; - - if (rawRequest.once) { - rawRequest.once('close', () => { - if (rawRequest.aborted) { - controller.abort(); - } - }); + // If ponyfilled + if (RequestCtor !== globalThis.Request) { + signal = new ServerAdapterRequestAbortSignal(); + sendAbortSignal = () => (signal as ServerAdapterRequestAbortSignal).sendAbort(); + } else { + const controller = new AbortController(); + signal = controller.signal; + sendAbortSignal = () => controller.abort(); } + + const closeEventListener: EventListener = () => { + if (signal && !signal.aborted) { + sendAbortSignal(); + } + }; + + nodeResponse.once('close', closeEventListener); + + nodeResponse.once('finish', () => { + nodeResponse.removeListener('close', closeEventListener); + }); } if (nodeRequest.method === 'GET' || nodeRequest.method === 'HEAD') { diff --git a/packages/server/src/uwebsockets.ts b/packages/server/src/uwebsockets.ts index 42267eb3520..61d17739dbe 100644 --- a/packages/server/src/uwebsockets.ts +++ b/packages/server/src/uwebsockets.ts @@ -1,6 +1,5 @@ import type { Readable } from 'stream'; import type { FetchAPI } from './types.js'; -import { ServerAdapterRequestAbortSignal } from './utils.js'; export interface UWSRequest { getMethod(): string; @@ -32,9 +31,10 @@ interface GetRequestFromUWSOpts { req: UWSRequest; res: UWSResponse; fetchAPI: FetchAPI; + signal: AbortSignal; } -export function getRequestFromUWSRequest({ req, res, fetchAPI }: GetRequestFromUWSOpts) { +export function getRequestFromUWSRequest({ req, res, fetchAPI, signal }: GetRequestFromUWSOpts) { let body: ReadableStream | undefined; const method = req.getMethod(); if (method !== 'get' && method !== 'head') { @@ -64,18 +64,17 @@ export function getRequestFromUWSRequest({ req, res, fetchAPI }: GetRequestFromU method, headers, body: body as any, - signal: new ServerAdapterRequestAbortSignal(), + signal, }); } -async function forwardResponseBodyToUWSResponse(uwsResponse: UWSResponse, fetchResponse: Response) { - let resAborted = false; - uwsResponse.onAborted(function () { - resAborted = true; - }); - +async function forwardResponseBodyToUWSResponse( + uwsResponse: UWSResponse, + fetchResponse: Response, + signal: AbortSignal, +) { for await (const chunk of fetchResponse.body as any as AsyncIterable) { - if (resAborted) { + if (signal.aborted) { return; } uwsResponse.cork(() => { @@ -87,13 +86,20 @@ async function forwardResponseBodyToUWSResponse(uwsResponse: UWSResponse, fetchR }); } -export function sendResponseToUwsOpts(uwsResponse: UWSResponse, fetchResponse: Response) { +export function sendResponseToUwsOpts( + uwsResponse: UWSResponse, + fetchResponse: Response, + signal: AbortSignal, +) { if (!fetchResponse) { uwsResponse.writeStatus('404 Not Found'); uwsResponse.end(); return; } const bufferOfRes: Uint8Array = (fetchResponse as any)._buffer; + if (signal.aborted) { + return; + } uwsResponse.cork(() => { uwsResponse.writeStatus(`${fetchResponse.status} ${fetchResponse.statusText}`); for (const [key, value] of fetchResponse.headers) { @@ -122,5 +128,5 @@ export function sendResponseToUwsOpts(uwsResponse: UWSResponse, fetchResponse: R uwsResponse.end(); return; } - return forwardResponseBodyToUWSResponse(uwsResponse, fetchResponse); + return forwardResponseBodyToUWSResponse(uwsResponse, fetchResponse, signal); } diff --git a/packages/server/test/adapter.fetch.spec.ts b/packages/server/test/adapter.fetch.spec.ts index 04fda5a031c..000b921a0ee 100644 --- a/packages/server/test/adapter.fetch.spec.ts +++ b/packages/server/test/adapter.fetch.spec.ts @@ -1,5 +1,6 @@ import { Request, Response, URL } from '@whatwg-node/fetch'; import { createServerAdapter } from '../src/index.js'; +import { createDeferred } from './test-utils.js'; describe('adapter.fetch', () => { // Request as first parameter @@ -209,16 +210,17 @@ describe('adapter.fetch', () => { expect(adapter.returnThis()).toBe(adapter); }); it('handles AbortSignal', async () => { - const adapter = createServerAdapter( - req => - new Promise(resolve => { - const timeout = setTimeout(() => resolve(Response.json({ foo: 'bar' })), 100_000); - req.signal.addEventListener('abort', () => { - clearTimeout(timeout); - resolve(Response.error()); - }); - }), - ); + const adapterResponseDeferred = createDeferred(); + const adapter = createServerAdapter(req => { + req.signal.addEventListener('abort', () => { + adapterResponseDeferred.resolve( + Response.json({ + message: "You're so late!", + }), + ); + }); + return adapterResponseDeferred.promise; + }); const controller = new AbortController(); const signal = controller.signal; const promise = adapter.fetch('http://localhost', { signal }); diff --git a/packages/server/test/node.spec.ts b/packages/server/test/node.spec.ts index 5f1bfafbdad..b60764cd031 100644 --- a/packages/server/test/node.spec.ts +++ b/packages/server/test/node.spec.ts @@ -4,6 +4,7 @@ import { fetch, ReadableStream, Response, URL } from '@whatwg-node/fetch'; import { createServerAdapter } from '@whatwg-node/server'; import { runTestsForEachFetchImpl } from './test-fetch.js'; import { runTestsForEachServerImpl } from './test-server.js'; +import { createDeferred, sleep } from './test-utils.js'; describe('Node Specific Cases', () => { runTestsForEachFetchImpl(() => { @@ -159,21 +160,29 @@ describe('Node Specific Cases', () => { it('handles AbortSignal correctly', async () => { const abortListener = jest.fn(); - const serverAdapter = createServerAdapter( - req => - new Promise(resolve => { - req.signal.onabort = () => { - abortListener(); - resolve(new Response('Hello World', { status: 200 })); - }; + const adapterResponseDeferred = createDeferred(); + function resolveAdapter() { + adapterResponseDeferred.resolve( + Response.json({ + message: "You're so late!", }), - ); + ); + } + const serverAdapter = createServerAdapter(req => { + req.signal.addEventListener('abort', () => { + abortListener(); + resolveAdapter(); + }); + return adapterResponseDeferred.promise; + }); testServer.addOnceHandler(serverAdapter); const controller = new AbortController(); - setTimeout(() => controller.abort(), 1000); - const error = await fetch(testServer.url, { signal: controller.signal }).catch(e => e); - expect(error.toString().toLowerCase()).toContain('abort'); - await new Promise(resolve => setTimeout(resolve, 300)); + const response$ = fetch(testServer.url, { signal: controller.signal }); + expect(abortListener).toHaveBeenCalledTimes(0); + await sleep(300); + controller.abort(); + await expect(response$).rejects.toThrow(); + await sleep(300); expect(abortListener).toHaveBeenCalledTimes(1); }); @@ -236,7 +245,3 @@ describe('Node Specific Cases', () => { }); }); }); - -function sleep(ms: number) { - return new Promise(resolve => setTimeout(resolve, ms)); -} diff --git a/packages/server/test/test-utils.ts b/packages/server/test/test-utils.ts new file mode 100644 index 00000000000..0925800bca8 --- /dev/null +++ b/packages/server/test/test-utils.ts @@ -0,0 +1,19 @@ +interface Deferred { + promise: Promise; + resolve(value: T): void; + reject(reason: any): void; +} + +export function createDeferred(): Deferred { + let resolve: (value: T) => void; + let reject: (reason: any) => void; + const promise = new Promise((_resolve, _reject) => { + resolve = _resolve; + reject = _reject; + }); + return { promise, resolve: resolve!, reject: reject! }; +} + +export function sleep(ms: number) { + return new Promise(resolve => setTimeout(resolve, ms)); +} diff --git a/packages/server/test/useErrorHandling.spec.ts b/packages/server/test/useErrorHandling.spec.ts index ce29dbd16ab..bc2c4789d17 100644 --- a/packages/server/test/useErrorHandling.spec.ts +++ b/packages/server/test/useErrorHandling.spec.ts @@ -1,14 +1,16 @@ +import { Request } from '@whatwg-node/fetch'; import { createServerAdapter } from '../src/createServerAdapter.js'; import { useErrorHandling } from '../src/plugins/useErrorHandling.js'; describe('useErrorHandling', () => { it('should return 500 when error is thrown', async () => { + const errorHandler = jest.fn(); const router = createServerAdapter( () => { throw new Error('Unexpected error'); }, { - plugins: [useErrorHandling()], + plugins: [useErrorHandling(errorHandler)], }, ); const response = await router.fetch('http://localhost/greetings/John'); @@ -16,5 +18,10 @@ describe('useErrorHandling', () => { expect(response.statusText).toBe('Internal Server Error'); const text = await response.text(); expect(text).toHaveLength(0); + expect(errorHandler).toHaveBeenCalledWith( + new Error('Unexpected error'), + expect.any(Request), + {}, + ); }); });