diff --git a/lib/client.js b/lib/client.js index 7a7260406ef..887805038cb 100644 --- a/lib/client.js +++ b/lib/client.js @@ -71,14 +71,14 @@ const { kInterceptors, kLocalAddress, kMaxResponseSize, + kListeners, kHTTPConnVersion, // HTTP2 - kHost, - kHTTP2Session, - kHTTP2SessionState, kHTTP2BuildRequest, kHTTP2CopyHeaders, - kHTTP1BuildRequest + kHTTP1BuildRequest, + kMaxConcurrentStreams, + kOpenStreams } = require('./core/symbols') /** @type {import('http2')} */ @@ -111,6 +111,20 @@ const FastBuffer = Buffer[Symbol.species] const kClosedResolve = Symbol('kClosedResolve') +function addListener (obj, name, cb) { + const listeners = (obj[kListeners] ??= []) + listeners.push([name, cb]) + obj.on(name, cb) + return obj +} + +function removeAllListeners (obj) { + for (const [name, listener] of obj[kListeners]?.splice(0) ?? []) { + obj.removeListener(name, listener) + } + obj[kListeners] = null +} + /** * @type {import('../types/client').default} */ @@ -277,17 +291,7 @@ class Client extends DispatcherBase { this[kClosedResolve] = null this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1 this[kHTTPConnVersion] = 'h1' - - // HTTP/2 - this[kHTTP2Session] = null - this[kHTTP2SessionState] = !allowH2 - ? null - : { - // streams: null, // Fixed queue of streams - For future support of `push` - openStreams: 0, // Keep track of them to decide whether or not unref the session - maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server - } - this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}` + this[kMaxConcurrentStreams] = maxConcurrentStreams ?? 100 // Max peerConcurrentStreams for a Node h2 server // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. @@ -369,8 +373,6 @@ class Client extends DispatcherBase { } async [kClose] () { - // TODO: for H2 we need to gracefully flush the remaining enqueued - // request and close each stream. return new Promise((resolve) => { if (this[kSize]) { this[kClosedResolve] = resolve @@ -397,12 +399,6 @@ class Client extends DispatcherBase { resolve() } - if (this[kHTTP2Session] != null) { - util.destroy(this[kHTTP2Session], err) - this[kHTTP2Session] = null - this[kHTTP2SessionState] = null - } - if (this[kSocket]) { util.destroy(this[kSocket].on('close', callback), err) } else { @@ -414,64 +410,6 @@ class Client extends DispatcherBase { } } -function onHttp2SessionError (err) { - assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') - - this[kSocket][kError] = err - - onError(this[kClient], err) -} - -function onHttp2FrameError (type, code, id) { - const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) - - if (id === 0) { - this[kSocket][kError] = err - onError(this[kClient], err) - } -} - -function onHttp2SessionEnd () { - util.destroy(this, new SocketError('other side closed')) - util.destroy(this[kSocket], new SocketError('other side closed')) -} - -function onHTTP2GoAway (code) { - const client = this[kClient] - const err = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`) - client[kSocket] = null - client[kHTTP2Session] = null - - if (client.destroyed) { - assert(this[kPending] === 0) - - // Fail entire queue. - const requests = client[kQueue].splice(client[kRunningIdx]) - for (let i = 0; i < requests.length; i++) { - const request = requests[i] - errorRequest(this, request, err) - } - } else if (client[kRunning] > 0) { - // Fail head of pipeline. - const request = client[kQueue][client[kRunningIdx]] - client[kQueue][client[kRunningIdx]++] = null - - errorRequest(client, request, err) - } - - client[kPendingIdx] = client[kRunningIdx] - - assert(client[kRunning] === 0) - - client.emit('disconnect', - client[kUrl], - [client], - err - ) - - resume(client) -} - const constants = require('./llhttp/constants') const createRedirectInterceptor = require('./interceptor/redirectInterceptor') const EMPTY_BUF = Buffer.alloc(0) @@ -803,14 +741,10 @@ class Parser { socket[kClient] = null socket[kError] = null - socket - .removeListener('error', onSocketError) - .removeListener('readable', onSocketReadable) - .removeListener('end', onSocketEnd) - .removeListener('close', onSocketClose) + + removeAllListeners(socket) client[kSocket] = null - client[kHTTP2Session] = null client[kQueue][client[kRunningIdx]++] = null client.emit('disconnect', client[kUrl], [client], new InformationalError('upgrade')) @@ -1050,33 +984,6 @@ function onParserTimeout (parser) { } } -function onSocketReadable () { - const { [kParser]: parser } = this - if (parser) { - parser.readMore() - } -} - -function onSocketError (err) { - const { [kClient]: client, [kParser]: parser } = this - - assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') - - if (client[kHTTPConnVersion] !== 'h2') { - // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded - // to the user. - if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so for as a valid response. - parser.onMessageComplete() - return - } - } - - this[kError] = err - - onError(this[kClient], err) -} - function onError (client, err) { if ( client[kRunning] === 0 && @@ -1097,32 +1004,8 @@ function onError (client, err) { } } -function onSocketEnd () { - const { [kParser]: parser, [kClient]: client } = this - - if (client[kHTTPConnVersion] !== 'h2') { - if (parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so far as a valid response. - parser.onMessageComplete() - return - } - } - - util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) -} - function onSocketClose () { - const { [kClient]: client, [kParser]: parser } = this - - if (client[kHTTPConnVersion] === 'h1' && parser) { - if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) { - // We treat all incoming data so far as a valid response. - parser.onMessageComplete() - } - - this[kParser].destroy() - this[kParser] = null - } + const { [kClient]: client } = this const err = this[kError] || new SocketError('closed', util.getSocketInfo(this)) @@ -1215,8 +1098,7 @@ async function connect (client) { assert(socket) - const isH2 = socket.alpnProtocol === 'h2' - if (isH2) { + if (socket.alpnProtocol === 'h2') { if (!h2ExperimentalWarned) { h2ExperimentalWarned = true process.emitWarning('H2 support is experimental, expect them to change at any time.', { @@ -1226,21 +1108,40 @@ async function connect (client) { const session = http2.connect(client[kUrl], { createConnection: () => socket, - peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams + peerMaxConcurrentStreams: client[kMaxConcurrentStreams] }) client[kHTTPConnVersion] = 'h2' - session[kClient] = client - session[kSocket] = socket - session.on('error', onHttp2SessionError) - session.on('frameError', onHttp2FrameError) - session.on('end', onHttp2SessionEnd) - session.on('goaway', onHTTP2GoAway) + session.on('error', function (err) { + assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') + + this[kError] = err + + onError(this[kClient], err) + }) + session.on('frameError', function (type, code, id) { + if (id === 0) { + const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) + util.destroy(this, err) + util.destroy(socket, err) + } + }) + session.on('end', function () { + const err = new SocketError('other side closed') + util.destroy(this, err) + util.destroy(socket, err) + }) + session.on('goaway', function (code) { + this[kError] = new InformationalError(`HTTP/2: "GOAWAY" frame received with code ${code}`) + onSocketClose.call(session) + }) session.on('close', onSocketClose) session.unref() - client[kHTTP2Session] = session - socket[kHTTP2Session] = session + session[kOpenStreams] = 0 + session[kClient] = client + + client[kSocket] = session } else { if (!llhttpInstance) { llhttpInstance = await llhttpPromise @@ -1252,6 +1153,59 @@ async function connect (client) { socket[kReset] = false socket[kBlocking] = false socket[kParser] = new Parser(client, socket, llhttpInstance) + + addListener(socket, 'error', function (err) { + const { [kParser]: parser } = this + + // On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded + // to the user. + if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so for as a valid response. + parser.onMessageComplete() + return + } + + assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID') + + this[kError] = err + + onError(this[kClient], err) + }) + + addListener(socket, 'readable', function () { + const { [kParser]: parser } = this + + if (parser) { + parser.readMore() + } + }) + + addListener(socket, 'end', function () { + const { [kParser]: parser } = this + + if (parser && parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so far as a valid response. + parser.onMessageComplete() + } else { + util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) + } + }) + + addListener(socket, 'close', function () { + const { [kParser]: parser } = this + + if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) { + // We treat all incoming data so far as a valid response. + parser.onMessageComplete() + } + + this[kParser].destroy() + this[kParser] = null + + onSocketClose.call(this) + }) + + client[kSocket] = socket } socket[kCounter] = 0 @@ -1259,14 +1213,6 @@ async function connect (client) { socket[kClient] = client socket[kError] = null - socket - .on('error', onSocketError) - .on('readable', onSocketReadable) - .on('end', onSocketEnd) - .on('close', onSocketClose) - - client[kSocket] = socket - if (channels.connected.hasSubscribers) { channels.connected.publish({ connectParams: { @@ -1475,7 +1421,7 @@ function shouldSendContentLength (method) { function write (client, request) { if (client[kHTTPConnVersion] === 'h2') { - writeH2(client, client[kHTTP2Session], request) + writeH2(client, client[kSocket], request) return } @@ -1674,9 +1620,10 @@ function writeH2 (client, session, request) { /** @type {import('node:http2').ClientHttp2Stream} */ let stream - const h2State = client[kHTTP2SessionState] - headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost] + const { hostname, port } = client[kUrl] + + headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}` headers[HTTP2_HEADER_METHOD] = method try { @@ -1692,8 +1639,8 @@ function writeH2 (client, session, request) { if (stream != null) { util.destroy(stream, err) - h2State.openStreams -= 1 - if (h2State.openStreams === 0) { + session[kOpenStreams] -= 1 + if (session[kOpenStreams] === 0) { session.unref() } } @@ -1714,18 +1661,18 @@ function writeH2 (client, session, request) { if (stream.id && !stream.pending) { request.onUpgrade(null, null, stream) - ++h2State.openStreams + ++session[kOpenStreams] } else { stream.once('ready', () => { request.onUpgrade(null, null, stream) - ++h2State.openStreams + ++session[kOpenStreams] }) } stream.once('close', () => { - h2State.openStreams -= 1 + session[kOpenStreams] -= 1 // TODO(HTTP/2): unref only if current streams count is 0 - if (h2State.openStreams === 0) session.unref() + if (session[kOpenStreams] === 0) session.unref() }) return true @@ -1805,7 +1752,7 @@ function writeH2 (client, session, request) { } // Increment counter as we have new several streams open - ++h2State.openStreams + ++session[kOpenStreams] stream.once('response', headers => { const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers @@ -1828,8 +1775,8 @@ function writeH2 (client, session, request) { // Stream is closed or half-closed-remote (6), decrement counter and cleanup // It does not have sense to continue working with the stream as we do not // have yet RST_STREAM support on client-side - h2State.openStreams -= 1 - if (h2State.openStreams === 0) { + session[kOpenStreams] -= 1 + if (session[kOpenStreams] === 0) { session.unref() } @@ -1845,16 +1792,16 @@ function writeH2 (client, session, request) { }) stream.once('close', () => { - h2State.openStreams -= 1 + session[kOpenStreams] -= 1 // TODO(HTTP/2): unref only if current streams count is 0 - if (h2State.openStreams === 0) { + if (session[kOpenStreams] === 0) { session.unref() } }) stream.once('error', function (err) { - if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { - h2State.streams -= 1 + if (client[kSocket] && !client[kSocket].destroyed && !this.closed && !this.destroyed) { + session[kOpenStreams] -= 1 util.destroy(stream, err) } }) @@ -1863,8 +1810,8 @@ function writeH2 (client, session, request) { const err = new InformationalError(`HTTP/2: "frameError" received - type ${type}, code ${code}`) errorRequest(client, request, err) - if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { - h2State.streams -= 1 + if (client[kSocket] && !client[kSocket].destroyed && !this.closed && !this.destroyed) { + session[kOpenStreams] -= 1 util.destroy(stream, err) } }) diff --git a/lib/core/symbols.js b/lib/core/symbols.js index 68d8566fac0..538f609b991 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -13,6 +13,8 @@ module.exports = { kKeepAliveMaxTimeout: Symbol('max keep alive timeout'), kKeepAliveTimeoutThreshold: Symbol('keep alive timeout threshold'), kKeepAliveTimeoutValue: Symbol('keep alive timeout'), + kMaxConcurrentStreams: Symbol('max concurrent streams'), + kOpenStreams: Symbol('open streams'), kKeepAlive: Symbol('keep alive'), kHeadersTimeout: Symbol('headers timeout'), kBodyTimeout: Symbol('body timeout'), @@ -24,6 +26,7 @@ module.exports = { kRunning: Symbol('running'), kBlocking: Symbol('blocking'), kPending: Symbol('pending'), + kListeners: Symbol('listeners'), kSize: Symbol('size'), kBusy: Symbol('busy'), kQueued: Symbol('queued'), @@ -53,7 +56,6 @@ module.exports = { kInterceptors: Symbol('dispatch interceptors'), kMaxResponseSize: Symbol('max response size'), kHTTP2Session: Symbol('http2Session'), - kHTTP2SessionState: Symbol('http2Session state'), kHTTP2BuildRequest: Symbol('http2 build request'), kHTTP1BuildRequest: Symbol('http1 build request'), kHTTP2CopyHeaders: Symbol('http2 copy headers'), diff --git a/lib/core/util.js b/lib/core/util.js index 96e76cc1355..577ac152d9a 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -12,6 +12,14 @@ const { stringify } = require('node:querystring') const { headerNameLowerCasedRecord } = require('./constants') const { tree } = require('./tree') +/** @type {import('http2')} */ +let http2 +try { + http2 = require('node:http2') +} catch { + // @ts-ignore + http2 = { constants: {} } +} const [nodeMajor, nodeMinor] = process.versions.node.split('.').map(v => Number(v)) function nop () {} @@ -20,6 +28,10 @@ function isStream (obj) { return obj && typeof obj === 'object' && typeof obj.pipe === 'function' && typeof obj.on === 'function' } +function isH2Session (obj) { + return obj != null && typeof obj === 'object' && obj.type === http2.constants.NGHTTP2_SESSION_CLIENT +} + // based on https://github.com/node-fetch/fetch-blob/blob/8ab587d34080de94140b54f07168451e7d0b655e/index.js#L229-L241 (MIT License) function isBlobLike (object) { return (Blob && object instanceof Blob) || ( @@ -192,7 +204,7 @@ function isReadableAborted (stream) { } function destroy (stream, err) { - if (stream == null || !isStream(stream) || isDestroyed(stream)) { + if (stream == null || (!isStream(stream) && !isH2Session(stream)) || isDestroyed(stream)) { return } diff --git a/test/http2.js b/test/http2.js index 3319ffddb1e..04aa252f366 100644 --- a/test/http2.js +++ b/test/http2.js @@ -15,7 +15,6 @@ const { Client, Agent } = require('..') const isGreaterThanv20 = process.versions.node.split('.').map(Number)[0] >= 20 test('Should support H2 connection', async t => { - const body = [] const server = createSecureServer(pem) server.on('stream', (stream, headers, _flags, rawHeaders) => { @@ -51,15 +50,12 @@ test('Should support H2 connection', async t => { } }) - response.body.on('data', chunk => { - body.push(chunk) - }) + const data = await response.body.text() - await once(response.body, 'end') t.strictEqual(response.statusCode, 200) t.strictEqual(response.headers['content-type'], 'text/plain; charset=utf-8') t.strictEqual(response.headers['x-custom-h2'], 'hello') - t.strictEqual(Buffer.concat(body).toString('utf8'), 'hello h2!') + t.strictEqual(data, 'hello h2!') }) test('Should support H2 connection(multiple requests)', async t => {