diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js deleted file mode 100644 index 0a53b75e501..00000000000 --- a/lib/dispatcher/client-h2.js +++ /dev/null @@ -1,653 +0,0 @@ -'use strict' - -const assert = require('node:assert') -const { pipeline } = require('node:stream') -const util = require('../core/util.js') -const { - RequestContentLengthMismatchError, - RequestAbortedError, - SocketError, - InformationalError -} = require('../core/errors.js') -const { - kUrl, - kReset, - kClient, - kRunning, - kPending, - kQueue, - kPendingIdx, - kRunningIdx, - kError, - kSocket, - kStrictContentLength, - kOnError, - // HTTP2 - kMaxConcurrentStreams, - kHTTP2Session, - kResume -} = require('../core/symbols.js') - -const kOpenStreams = Symbol('open streams') - -// Experimental -let h2ExperimentalWarned = false - -/** @type {import('http2')} */ -let http2 -try { - http2 = require('node:http2') -} catch { - // @ts-ignore - http2 = { constants: {} } -} - -const { - constants: { - HTTP2_HEADER_AUTHORITY, - HTTP2_HEADER_METHOD, - HTTP2_HEADER_PATH, - HTTP2_HEADER_SCHEME, - HTTP2_HEADER_CONTENT_LENGTH, - HTTP2_HEADER_EXPECT, - HTTP2_HEADER_STATUS - } -} = http2 - -function parseH2Headers (headers) { - // set-cookie is always an array. Duplicates are added to the array. - // For duplicate cookie headers, the values are joined together with '; '. - headers = Object.entries(headers).flat(2) - - const result = [] - - for (const header of headers) { - result.push(Buffer.from(header)) - } - - return result -} - -async function connectH2 (client, socket) { - client[kSocket] = socket - - if (!h2ExperimentalWarned) { - h2ExperimentalWarned = true - process.emitWarning('H2 support is experimental, expect them to change at any time.', { - code: 'UNDICI-H2' - }) - } - - const session = http2.connect(client[kUrl], { - createConnection: () => socket, - peerMaxConcurrentStreams: client[kMaxConcurrentStreams] - }) - - session[kOpenStreams] = 0 - session[kClient] = client - session[kSocket] = socket - session.on('error', onHttp2SessionError) - session.on('frameError', onHttp2FrameError) - session.on('end', onHttp2SessionEnd) - session.on('goaway', onHTTP2GoAway) - session.on('close', function () { - const { [kClient]: client } = this - - const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) - - client[kSocket] = null - - assert(client[kPending] === 0) - - // Fail entire queue. - const requests = client[kQueue].splice(client[kRunningIdx]) - for (let i = 0; i < requests.length; i++) { - const request = requests[i] - errorRequest(client, request, err) - } - - client[kPendingIdx] = client[kRunningIdx] - - assert(client[kRunning] === 0) - - client.emit('disconnect', client[kUrl], [client], err) - - client[kResume]() - }) - session.unref() - - client[kHTTP2Session] = session - socket[kHTTP2Session] = session - - socket.on('error', function (err) { - assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') - - this[kError] = err - - this[kClient][kOnError](err) - }) - socket.on('end', function () { - util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) - }) - - let closed = false - socket.on('close', () => { - closed = true - }) - - return { - version: 'h2', - defaultPipelining: Infinity, - write (...args) { - // TODO (fix): return - writeH2(client, ...args) - }, - resume () { - - }, - destroy (err, callback) { - session.destroy(err) - if (closed) { - queueMicrotask(callback) - } else { - socket.destroy(err).on('close', callback) - } - }, - get destroyed () { - return socket.destroyed - }, - busy () { - return false - } - } -} - -function onHttp2SessionError (err) { - assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') - - this[kSocket][kError] = err - - this[kClient][kOnError](err) -} - -function onHttp2FrameError (type, code, id) { - const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) - - if (id === 0) { - this[kSocket][kError] = err - this[kClient][kOnError](err) - } -} - -function onHttp2SessionEnd () { - this.destroy(new SocketError('other side closed')) - util.destroy(this[kSocket], new SocketError('other side closed')) -} - -function onHTTP2GoAway (code) { - const client = this[kClient] - const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`) - client[kSocket] = null - client[kHTTP2Session] = null - - if (client.destroyed) { - assert(this[kPending] === 0) - - // Fail entire queue. - const requests = client[kQueue].splice(client[kRunningIdx]) - for (let i = 0; i < requests.length; i++) { - const request = requests[i] - errorRequest(this, request, err) - } - } else if (client[kRunning] > 0) { - // Fail head of pipeline. - const request = client[kQueue][client[kRunningIdx]] - client[kQueue][client[kRunningIdx]++] = null - - errorRequest(client, request, err) - } - - client[kPendingIdx] = client[kRunningIdx] - - assert(client[kRunning] === 0) - - client.emit('disconnect', - client[kUrl], - [client], - err - ) - - client[kResume]() -} - -function errorRequest (client, request, err) { - try { - request.onError(err) - assert(request.aborted) - } catch (err) { - client.emit('error', err) - } -} - -// https://www.rfc-editor.org/rfc/rfc7230#section-3.3.2 -function shouldSendContentLength (method) { - return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT' -} - -function writeH2 (client, request) { - const session = client[kHTTP2Session] - const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request - - if (upgrade) { - errorRequest(client, request, new Error('Upgrade not supported for H2')) - return false - } - - if (request.aborted) { - return false - } - - const headers = {} - for (let n = 0; n < reqHeaders.length; n += 2) { - const key = reqHeaders[n + 0] - const val = reqHeaders[n + 1] - - if (Array.isArray(val)) { - for (let i = 0; i < val.length; i++) { - if (headers[key]) { - headers[key] += `,${val[i]}` - } else { - headers[key] = val[i] - } - } - } else { - headers[key] = val - } - } - - /** @type {import('node:http2').ClientHttp2Stream} */ - let stream - - const { hostname, port } = client[kUrl] - - headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}` - headers[HTTP2_HEADER_METHOD] = method - - try { - // We are already connected, streams are pending. - // We can call on connect, and wait for abort - request.onConnect((err) => { - if (request.aborted || request.completed) { - return - } - - err = err || new RequestAbortedError() - - if (stream != null) { - util.destroy(stream, err) - - session[kOpenStreams] -= 1 - if (session[kOpenStreams] === 0) { - session.unref() - } - } - - errorRequest(client, request, err) - }) - } catch (err) { - errorRequest(client, request, err) - } - - if (method === 'CONNECT') { - session.ref() - // We are already connected, streams are pending, first request - // will create a new stream. We trigger a request to create the stream and wait until - // `ready` event is triggered - // We disabled endStream to allow the user to write to the stream - stream = session.request(headers, { endStream: false, signal }) - - if (stream.id && !stream.pending) { - request.onUpgrade(null, null, stream) - ++session[kOpenStreams] - } else { - stream.once('ready', () => { - request.onUpgrade(null, null, stream) - ++session[kOpenStreams] - }) - } - - stream.once('close', () => { - session[kOpenStreams] -= 1 - // TODO(HTTP/2): unref only if current streams count is 0 - if (session[kOpenStreams] === 0) session.unref() - }) - - return true - } - - // https://tools.ietf.org/html/rfc7540#section-8.3 - // :path and :scheme headers must be omitted when sending CONNECT - - headers[HTTP2_HEADER_PATH] = path - headers[HTTP2_HEADER_SCHEME] = 'https' - - // https://tools.ietf.org/html/rfc7231#section-4.3.1 - // https://tools.ietf.org/html/rfc7231#section-4.3.2 - // https://tools.ietf.org/html/rfc7231#section-4.3.5 - - // Sending a payload body on a request that does not - // expect it can cause undefined behavior on some - // servers and corrupt connection state. Do not - // re-use the connection for further requests. - - const expectsPayload = ( - method === 'PUT' || - method === 'POST' || - method === 'PATCH' - ) - - if (body && typeof body.read === 'function') { - // Try to read EOF in order to get length. - body.read(0) - } - - let contentLength = util.bodyLength(body) - - if (contentLength == null) { - contentLength = request.contentLength - } - - if (contentLength === 0 || !expectsPayload) { - // https://tools.ietf.org/html/rfc7230#section-3.3.2 - // A user agent SHOULD NOT send a Content-Length header field when - // the request message does not contain a payload body and the method - // semantics do not anticipate such a body. - - contentLength = null - } - - // https://github.com/nodejs/undici/issues/2046 - // A user agent may send a Content-Length header with 0 value, this should be allowed. - if (shouldSendContentLength(method) && contentLength > 0 && request.contentLength != null && request.contentLength !== contentLength) { - if (client[kStrictContentLength]) { - errorRequest(client, request, new RequestContentLengthMismatchError()) - return false - } - - process.emitWarning(new RequestContentLengthMismatchError()) - } - - if (contentLength != null) { - assert(body, 'no body must not have content length') - headers[HTTP2_HEADER_CONTENT_LENGTH] = `${contentLength}` - } - - session.ref() - - const shouldEndStream = method === 'GET' || method === 'HEAD' || body === null - if (expectContinue) { - headers[HTTP2_HEADER_EXPECT] = '100-continue' - stream = session.request(headers, { endStream: shouldEndStream, signal }) - - stream.once('continue', writeBodyH2) - } else { - stream = session.request(headers, { - endStream: shouldEndStream, - signal - }) - writeBodyH2() - } - - // Increment counter as we have new several streams open - ++session[kOpenStreams] - - stream.once('response', headers => { - const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers - request.onResponseStarted() - - if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) { - stream.pause() - } - - stream.on('data', (chunk) => { - if (request.onData(chunk) === false) { - stream.pause() - } - }) - }) - - stream.once('end', () => { - // When state is null, it means we haven't consumed body and the stream still do not have - // a state. - // Present specially when using pipeline or stream - if (stream.state?.state == null || stream.state.state < 6) { - request.onComplete([]) - return - } - - // Stream is closed or half-closed-remote (6), decrement counter and cleanup - // It does not have sense to continue working with the stream as we do not - // have yet RST_STREAM support on client-side - session[kOpenStreams] -= 1 - if (session[kOpenStreams] === 0) { - session.unref() - } - - const err = new InformationalError('HTTP/2: stream half-closed (remote)') - errorRequest(client, request, err) - util.destroy(stream, err) - }) - - stream.once('close', () => { - session[kOpenStreams] -= 1 - // TODO(HTTP/2): unref only if current streams count is 0 - if (session[kOpenStreams] === 0) { - session.unref() - } - }) - - stream.once('error', function (err) { - if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { - session[kOpenStreams] -= 1 - util.destroy(stream, err) - } - }) - - stream.once('frameError', (type, code) => { - const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) - errorRequest(client, request, err) - - if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { - session[kOpenStreams] -= 1 - util.destroy(stream, err) - } - }) - - // stream.on('aborted', () => { - // // TODO(HTTP/2): Support aborted - // }) - - // stream.on('timeout', () => { - // // TODO(HTTP/2): Support timeout - // }) - - // stream.on('push', headers => { - // // TODO(HTTP/2): Support push - // }) - - // stream.on('trailers', headers => { - // // TODO(HTTP/2): Support trailers - // }) - - return true - - function writeBodyH2 () { - /* istanbul ignore else: assertion */ - if (!body) { - request.onRequestSent() - } else if (util.isBuffer(body)) { - assert(contentLength === body.byteLength, 'buffer body must have content length') - stream.cork() - stream.write(body) - stream.uncork() - stream.end() - request.onBodySent(body) - request.onRequestSent() - } else if (util.isBlobLike(body)) { - if (typeof body.stream === 'function') { - writeIterable({ - client, - request, - contentLength, - h2stream: stream, - expectsPayload, - body: body.stream(), - socket: client[kSocket], - header: '' - }) - } else { - writeBlob({ - body, - client, - request, - contentLength, - expectsPayload, - h2stream: stream, - header: '', - socket: client[kSocket] - }) - } - } else if (util.isStream(body)) { - writeStream({ - body, - client, - request, - contentLength, - expectsPayload, - socket: client[kSocket], - h2stream: stream, - header: '' - }) - } else if (util.isIterable(body)) { - writeIterable({ - body, - client, - request, - contentLength, - expectsPayload, - header: '', - h2stream: stream, - socket: client[kSocket] - }) - } else { - assert(false) - } - } -} - -function writeStream ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { - assert(contentLength !== 0 || client[kRunning] === 0, 'stream body cannot be pipelined') - - // For HTTP/2, is enough to pipe the stream - const pipe = pipeline( - body, - h2stream, - (err) => { - if (err) { - util.destroy(body, err) - util.destroy(h2stream, err) - } else { - request.onRequestSent() - } - } - ) - - pipe.on('data', onPipeData) - pipe.once('end', () => { - pipe.removeListener('data', onPipeData) - util.destroy(pipe) - }) - - function onPipeData (chunk) { - request.onBodySent(chunk) - } -} - -async function writeBlob ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { - assert(contentLength === body.size, 'blob body must have content length') - - try { - if (contentLength != null && contentLength !== body.size) { - throw new RequestContentLengthMismatchError() - } - - const buffer = Buffer.from(await body.arrayBuffer()) - - h2stream.cork() - h2stream.write(buffer) - h2stream.uncork() - - request.onBodySent(buffer) - request.onRequestSent() - - if (!expectsPayload) { - socket[kReset] = true - } - - client[kResume]() - } catch (err) { - util.destroy(h2stream) - } -} - -async function writeIterable ({ h2stream, body, client, request, socket, contentLength, header, expectsPayload }) { - assert(contentLength !== 0 || client[kRunning] === 0, 'iterator body cannot be pipelined') - - let callback = null - function onDrain () { - if (callback) { - const cb = callback - callback = null - cb() - } - } - - const waitForDrain = () => new Promise((resolve, reject) => { - assert(callback === null) - - if (socket[kError]) { - reject(socket[kError]) - } else { - callback = resolve - } - }) - - h2stream - .on('close', onDrain) - .on('drain', onDrain) - - try { - // It's up to the user to somehow abort the async iterable. - for await (const chunk of body) { - if (socket[kError]) { - throw socket[kError] - } - - const res = h2stream.write(chunk) - request.onBodySent(chunk) - if (!res) { - await waitForDrain() - } - } - } catch (err) { - h2stream.destroy(err) - } finally { - request.onRequestSent() - h2stream.end() - h2stream - .off('close', onDrain) - .off('drain', onDrain) - } -} - -module.exports = connectH2 diff --git a/lib/dispatcher/client-h2.js.rej b/lib/dispatcher/client-h2.js.rej new file mode 100644 index 00000000000..4e49510ed4a --- /dev/null +++ b/lib/dispatcher/client-h2.js.rej @@ -0,0 +1,20 @@ +diff a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js (rejected hunks) +@@ -391,6 +391,18 @@ function writeH2 (client, request) { + const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers + request.onResponseStarted() + ++ // Due to the stream nature, it is possible we face a race condition ++ // where the stream has been assigned, but the request has been aborted ++ // the request remains in-flight and headers hasn't been received yet ++ // for those scenarios, best effort is to destroy the stream immediately ++ // as there's no value to keep it open. ++ if (request.aborted || request.completed) { ++ const err = new RequestAbortedError() ++ errorRequest(client, request, err) ++ util.destroy(stream, err) ++ return ++ } ++ + if (request.onHeaders(Number(statusCode), realHeaders, stream.resume.bind(stream), '') === false) { + stream.pause() + } diff --git a/test/http2.js b/test/http2.js index 849a0cc43ba..a4ecc1ac3e8 100644 --- a/test/http2.js +++ b/test/http2.js @@ -1296,3 +1296,88 @@ test('Should throw informational error on half-closed streams (remote)', async t t.strictEqual(err.code, 'UND_ERR_INFO') }) }) + +test('#2364 - Concurrent aborts', async t => { + const server = createSecureServer(pem) + + server.on('stream', (stream, headers, _flags, rawHeaders) => { + t.strictEqual(headers['x-my-header'], 'foo') + t.strictEqual(headers[':method'], 'GET') + setTimeout(() => { + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': 'hello', + ':status': 200 + }) + stream.end('hello h2!') + }, 100) + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t = tspl(t, { plan: 18 }) + after(() => server.close()) + after(() => client.close()) + const controller = new AbortController() + + client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }, (err, response) => { + t.ifError(err) + t.strictEqual(response.headers['content-type'], 'text/plain; charset=utf-8') + t.strictEqual(response.headers['x-custom-h2'], 'hello') + t.strictEqual(response.statusCode, 200) + response.body.dump() + }) + + client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + }, + signal: controller.signal + }, (err, response) => { + t.strictEqual(err.name, 'AbortError') + }) + + client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }, (err, response) => { + t.ifError(err) + t.strictEqual(response.headers['content-type'], 'text/plain; charset=utf-8') + t.strictEqual(response.headers['x-custom-h2'], 'hello') + t.strictEqual(response.statusCode, 200) + }) + + client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + }, + signal: controller.signal + }, (err, response) => { + t.strictEqual(err.name, 'AbortError') + }) + + controller.abort() + + await t.completed +})