From 079c08453fd1a66777161bb378f829d9249f5120 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 27 Feb 2024 11:25:04 +0100 Subject: [PATCH 1/4] refactor: move out more h2 from core client --- lib/core/symbols.js | 4 +++- lib/dispatcher/client-h1.js | 4 +++- lib/dispatcher/client-h2.js | 46 ++++++++++++++++++++++--------------- lib/dispatcher/client.js | 39 +++++++++---------------------- 4 files changed, 44 insertions(+), 49 deletions(-) diff --git a/lib/core/symbols.js b/lib/core/symbols.js index 1eb4858c378..0a8ffac3e57 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -62,5 +62,7 @@ module.exports = { kHTTPConnVersion: Symbol('http connection version'), kRetryHandlerDefaultRetry: Symbol('retry agent default retry'), kConstruct: Symbol('constructable'), - kListeners: Symbol('listeners') + kListeners: Symbol('listeners'), + kHTTPWrite: Symbol('write'), + kMaxConcurrentStreams: Symbol('max concurrent streams') } diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index 9ee1686efd9..ebcd21a3ae9 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -50,7 +50,8 @@ const { kHTTPConnVersion, kListeners, kOnError, - kResume + kResume, + kHTTPWrite } = require('../core/symbols.js') const constants = require('../llhttp/constants.js') @@ -645,6 +646,7 @@ function onParserTimeout (parser) { async function connectH1 (client, socket) { client[kHTTPConnVersion] = 'h1' + client[kHTTPWrite] = (...args) => writeH1(client, ...args) if (!llhttpInstance) { llhttpInstance = await llhttpPromise diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index 89a42ed273f..cc725884406 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -25,13 +25,15 @@ const { kHTTPConnVersion, kOnError, // HTTP2 - kHost, + kMaxConcurrentStreams, kHTTP2Session, - kHTTP2SessionState, kHTTP2CopyHeaders, - kResume + kResume, + kHTTPWrite } = require('../core/symbols.js') +const kOpenStreams = Symbol('open streams') + // Experimental let h2ExperimentalWarned = false @@ -58,6 +60,10 @@ const { async function connectH2 (client, socket) { client[kHTTPConnVersion] = 'h2' + client[kHTTPWrite] = (...args) => { + // TODO (fix): return + writeH2(client, ...args) + } if (!h2ExperimentalWarned) { h2ExperimentalWarned = true @@ -68,9 +74,10 @@ async function connectH2 (client, socket) { const session = http2.connect(client[kUrl], { createConnection: () => socket, - peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams + peerMaxConcurrentStreams: client[kMaxConcurrentStreams] }) + session[kOpenStreams] = 0 session[kClient] = client session[kSocket] = socket session.on('error', onHttp2SessionError) @@ -217,9 +224,10 @@ function writeH2 (client, request) { /** @type {import('node:http2').ClientHttp2Stream} */ let stream - const h2State = client[kHTTP2SessionState] - headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost] + const { hostname, port } = client[kUrl] + + headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}` headers[HTTP2_HEADER_METHOD] = method try { @@ -235,8 +243,8 @@ function writeH2 (client, request) { if (stream != null) { util.destroy(stream, err) - h2State.openStreams -= 1 - if (h2State.openStreams === 0) { + session[kOpenStreams] -= 1 + if (session[kOpenStreams] === 0) { session.unref() } } @@ -257,18 +265,18 @@ function writeH2 (client, request) { if (stream.id && !stream.pending) { request.onUpgrade(null, null, stream) - ++h2State.openStreams + ++session[kOpenStreams] } else { stream.once('ready', () => { request.onUpgrade(null, null, stream) - ++h2State.openStreams + ++session[kOpenStreams] }) } stream.once('close', () => { - h2State.openStreams -= 1 + session[kOpenStreams] -= 1 // TODO(HTTP/2): unref only if current streams count is 0 - if (h2State.openStreams === 0) session.unref() + if (session[kOpenStreams] === 0) session.unref() }) return true @@ -348,7 +356,7 @@ function writeH2 (client, request) { } // Increment counter as we have new several streams open - ++h2State.openStreams + ++session[kOpenStreams] stream.once('response', headers => { const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers @@ -371,8 +379,8 @@ function writeH2 (client, request) { // Stream is closed or half-closed-remote (6), decrement counter and cleanup // It does not have sense to continue working with the stream as we do not // have yet RST_STREAM support on client-side - h2State.openStreams -= 1 - if (h2State.openStreams === 0) { + session[kOpenStreams] -= 1 + if (session[kOpenStreams] === 0) { session.unref() } @@ -388,16 +396,16 @@ function writeH2 (client, request) { }) stream.once('close', () => { - h2State.openStreams -= 1 + session[kOpenStreams] -= 1 // TODO(HTTP/2): unref only if current streams count is 0 - if (h2State.openStreams === 0) { + if (session[kOpenStreams] === 0) { session.unref() } }) stream.once('error', function (err) { if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { - h2State.streams -= 1 + session[kOpenStreams] -= 1 util.destroy(stream, err) } }) @@ -407,7 +415,7 @@ function writeH2 (client, request) { errorRequest(client, request, err) if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) { - h2State.streams -= 1 + session[kOpenStreams] -= 1 util.destroy(stream, err) } }) diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index 577dbb7ab56..99a565f2ae3 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -58,16 +58,16 @@ const { kMaxResponseSize, kHTTPConnVersion, kOnError, + kHTTPWrite, // HTTP2 - kHost, + kMaxConcurrentStreams, kHTTP2Session, - kHTTP2SessionState, kHTTP2BuildRequest, kHTTP1BuildRequest, kResume } = require('../core/symbols.js') -const { connectH1, writeH1, resumeH1 } = require('./client-h1.js') -const { connectH2, writeH2 } = require('./client-h2.js') +const { connectH1, resumeH1 } = require('./client-h1.js') +const { connectH2 } = require('./client-h2.js') const kClosedResolve = Symbol('kClosedResolve') @@ -107,8 +107,8 @@ class Client extends DispatcherBase { autoSelectFamily, autoSelectFamilyAttemptTimeout, // h2 - allowH2, - maxConcurrentStreams + maxConcurrentStreams, + allowH2 } = {}) { super() @@ -237,17 +237,10 @@ class Client extends DispatcherBase { this[kClosedResolve] = null this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1 this[kHTTPConnVersion] = null + this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server // HTTP/2 this[kHTTP2Session] = null - this[kHTTP2SessionState] = !allowH2 - ? null - : { - // streams: null, // Fixed queue of streams - For future support of `push` - openStreams: 0, // Keep track of them to decide whether or not unref the session - maxConcurrentStreams: maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server - } - this[kHost] = `${this[kUrl].hostname}${this[kUrl].port ? `:${this[kUrl].port}` : ''}` // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. @@ -309,6 +302,7 @@ class Client extends DispatcherBase { [kDispatch] (opts, handler) { const origin = opts.origin || this[kUrl].origin + // TODO (fix): Why do these need tp be const request = this[kHTTPConnVersion] === 'h2' ? Request[kHTTP2BuildRequest](origin, opts, handler) : Request[kHTTP1BuildRequest](origin, opts, handler) @@ -361,13 +355,12 @@ class Client extends DispatcherBase { } if (this[kHTTP2Session] != null) { - util.destroy(this[kHTTP2Session], err) + this[kHTTP2Session].destroy(err) this[kHTTP2Session] = null - this[kHTTP2SessionState] = null } if (this[kSocket]) { - util.destroy(this[kSocket].on('close', callback), err) + this[kSocket].destroy(err).on('close', callback) } else { queueMicrotask(callback) } @@ -652,7 +645,7 @@ function _resume (client, sync) { } } - if (!request.aborted && write(client, request)) { + if (!request.aborted && client[kHTTPWrite](request)) { client[kPendingIdx]++ } else { client[kQueue].splice(client[kPendingIdx], 1) @@ -660,16 +653,6 @@ function _resume (client, sync) { } } -function write (client, request) { - if (client[kHTTPConnVersion] === 'h2') { - // TODO (fix): Why does this not return the value - // from writeH2. - writeH2(client, request) - } else { - return writeH1(client, request) - } -} - function errorRequest (client, request, err) { try { request.onError(err) From 8bda5ee3f433477acfa6db93ed798caa0255e93a Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 27 Feb 2024 15:16:49 +0100 Subject: [PATCH 2/4] WIP --- lib/core/symbols.js | 3 +-- lib/dispatcher/client-h1.js | 25 ++++++++++--------- lib/dispatcher/client-h2.js | 29 ++++++++++++---------- lib/dispatcher/client.js | 48 ++++++++++++++++--------------------- 4 files changed, 52 insertions(+), 53 deletions(-) diff --git a/lib/core/symbols.js b/lib/core/symbols.js index 0a8ffac3e57..f3c6e1d5339 100644 --- a/lib/core/symbols.js +++ b/lib/core/symbols.js @@ -59,10 +59,9 @@ module.exports = { kHTTP2BuildRequest: Symbol('http2 build request'), kHTTP1BuildRequest: Symbol('http1 build request'), kHTTP2CopyHeaders: Symbol('http2 copy headers'), - kHTTPConnVersion: Symbol('http connection version'), kRetryHandlerDefaultRetry: Symbol('retry agent default retry'), kConstruct: Symbol('constructable'), kListeners: Symbol('listeners'), - kHTTPWrite: Symbol('write'), + kHTTPContext: Symbol('http context'), kMaxConcurrentStreams: Symbol('max concurrent streams') } diff --git a/lib/dispatcher/client-h1.js b/lib/dispatcher/client-h1.js index ebcd21a3ae9..3cc9b3123b8 100644 --- a/lib/dispatcher/client-h1.js +++ b/lib/dispatcher/client-h1.js @@ -47,11 +47,9 @@ const { kMaxRequests, kCounter, kMaxResponseSize, - kHTTPConnVersion, kListeners, kOnError, - kResume, - kHTTPWrite + kResume } = require('../core/symbols.js') const constants = require('../llhttp/constants.js') @@ -645,9 +643,6 @@ function onParserTimeout (parser) { } async function connectH1 (client, socket) { - client[kHTTPConnVersion] = 'h1' - client[kHTTPWrite] = (...args) => writeH1(client, ...args) - if (!llhttpInstance) { llhttpInstance = await llhttpPromise llhttpPromise = null @@ -737,6 +732,18 @@ async function connectH1 (client, socket) { client[kResume]() }) + + return { + version: 'h1', + write (...args) { + return writeH1(client, ...args) + }, + resume () { + resumeH1(client) + }, + destroy () { + } + } } function resumeH1 (client) { @@ -1276,8 +1283,4 @@ class AsyncWriter { } } -module.exports = { - connectH1, - writeH1, - resumeH1 -} +module.exports = connectH1 diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index cc725884406..04d24b741b3 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -22,14 +22,12 @@ const { kError, kSocket, kStrictContentLength, - kHTTPConnVersion, kOnError, // HTTP2 kMaxConcurrentStreams, kHTTP2Session, kHTTP2CopyHeaders, - kResume, - kHTTPWrite + kResume } = require('../core/symbols.js') const kOpenStreams = Symbol('open streams') @@ -59,12 +57,6 @@ const { } = http2 async function connectH2 (client, socket) { - client[kHTTPConnVersion] = 'h2' - client[kHTTPWrite] = (...args) => { - // TODO (fix): return - writeH2(client, ...args) - } - if (!h2ExperimentalWarned) { h2ExperimentalWarned = true process.emitWarning('H2 support is experimental, expect them to change at any time.', { @@ -131,6 +123,20 @@ async function connectH2 (client, socket) { socket.on('end', function () { util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this))) }) + + return { + version: 'h2', + write (...args) { + // TODO (fix): return + writeH2(client, ...args) + }, + resume () { + + }, + destroy (err) { + session.destroy(err) + } + } } function onHttp2SessionError (err) { @@ -607,7 +613,4 @@ async function writeIterable ({ h2stream, body, client, request, socket, content } } -module.exports = { - connectH2, - writeH2 -} +module.exports = connectH2 diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index 99a565f2ae3..d3b0d32c3a6 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -56,18 +56,16 @@ const { kInterceptors, kLocalAddress, kMaxResponseSize, - kHTTPConnVersion, kOnError, - kHTTPWrite, + kHTTPContext, // HTTP2 kMaxConcurrentStreams, - kHTTP2Session, kHTTP2BuildRequest, kHTTP1BuildRequest, kResume } = require('../core/symbols.js') -const { connectH1, resumeH1 } = require('./client-h1.js') -const { connectH2 } = require('./client-h2.js') +const connectH1 = require('./client-h1.js') +const connectH2 = require('./client-h2.js') const kClosedResolve = Symbol('kClosedResolve') @@ -236,11 +234,8 @@ class Client extends DispatcherBase { this[kMaxRequests] = maxRequestsPerClient this[kClosedResolve] = null this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1 - this[kHTTPConnVersion] = null this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server - - // HTTP/2 - this[kHTTP2Session] = null + this[kHTTPContext] = null // kQueue is built up of 3 sections separated by // the kRunningIdx and kPendingIdx indices. @@ -302,8 +297,9 @@ class Client extends DispatcherBase { [kDispatch] (opts, handler) { const origin = opts.origin || this[kUrl].origin - // TODO (fix): Why do these need tp be - const request = this[kHTTPConnVersion] === 'h2' + // TODO (fix): Why do these need to be + // TODO (fix): This can happen before connect... + const request = this[kHTTPContext]?.version === 'h2' ? Request[kHTTP2BuildRequest](origin, opts, handler) : Request[kHTTP1BuildRequest](origin, opts, handler) @@ -354,9 +350,9 @@ class Client extends DispatcherBase { resolve(null) } - if (this[kHTTP2Session] != null) { - this[kHTTP2Session].destroy(err) - this[kHTTP2Session] = null + if (this[kHTTPContext] != null) { + this[kHTTPContext].destroy(err) + this[kHTTPContext] = null } if (this[kSocket]) { @@ -418,7 +414,7 @@ async function connect (client) { hostname, protocol, port, - version: client[kHTTPConnVersion], + version: client[kHTTPContext].version, servername: client[kServerName], localAddress: client[kLocalAddress] }, @@ -453,11 +449,9 @@ async function connect (client) { assert(socket) - if (socket.alpnProtocol === 'h2') { - await connectH2(client, socket) - } else { - await connectH1(client, socket) - } + client[kHTTPContext] = socket.alpnProtocol === 'h2' + ? await connectH2(client, socket) + : await connectH1(client, socket) socket[kCounter] = 0 socket[kMaxRequests] = client[kMaxRequests] @@ -473,7 +467,7 @@ async function connect (client) { hostname, protocol, port, - version: client[kHTTPConnVersion], + version: client[kHTTPContext].version, servername: client[kServerName], localAddress: client[kLocalAddress] }, @@ -496,7 +490,7 @@ async function connect (client) { hostname, protocol, port, - version: client[kHTTPConnVersion], + version: client[kHTTPContext].version, servername: client[kServerName], localAddress: client[kLocalAddress] }, @@ -558,8 +552,8 @@ function _resume (client, sync) { const socket = client[kSocket] - if (socket && socket.alpnProtocol !== 'h2') { - resumeH1(client) + if (client[kHTTPContext]) { + client[kHTTPContext].resume() } if (client[kBusy]) { @@ -578,7 +572,7 @@ function _resume (client, sync) { return } - if (client[kHTTPConnVersion] === 'h1') { + if (client[kHTTPContext]?.version === 'h1') { if (client[kRunning] >= (client[kPipelining] || 1)) { return } @@ -612,7 +606,7 @@ function _resume (client, sync) { return } - if (client[kHTTPConnVersion] === 'h1') { + if (client[kHTTPContext].version === 'h1') { if (socket[kWriting] || socket[kReset] || socket[kBlocking]) { return } @@ -645,7 +639,7 @@ function _resume (client, sync) { } } - if (!request.aborted && client[kHTTPWrite](request)) { + if (!request.aborted && client[kHTTPContext].write(request)) { client[kPendingIdx]++ } else { client[kQueue].splice(client[kPendingIdx], 1) From 5ff14e65889d1a061d920ff12f49a785a791976c Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Tue, 27 Feb 2024 15:17:19 +0100 Subject: [PATCH 3/4] WIP --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 0d8d333f83a..60aa663c838 100644 --- a/.gitignore +++ b/.gitignore @@ -82,3 +82,5 @@ undici-fetch.js # .npmrc has platform specific value for windows .npmrc + +.tap From f55fc28c304cfe8f7399ed93060f4d114353b928 Mon Sep 17 00:00:00 2001 From: Robert Nagy Date: Wed, 28 Feb 2024 09:28:26 +0100 Subject: [PATCH 4/4] WIP --- lib/dispatcher/client.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/dispatcher/client.js b/lib/dispatcher/client.js index d3b0d32c3a6..2081a4bcd79 100644 --- a/lib/dispatcher/client.js +++ b/lib/dispatcher/client.js @@ -414,7 +414,7 @@ async function connect (client) { hostname, protocol, port, - version: client[kHTTPContext].version, + version: client[kHTTPContext]?.version, servername: client[kServerName], localAddress: client[kLocalAddress] }, @@ -467,7 +467,7 @@ async function connect (client) { hostname, protocol, port, - version: client[kHTTPContext].version, + version: client[kHTTPContext]?.version, servername: client[kServerName], localAddress: client[kLocalAddress] }, @@ -490,7 +490,7 @@ async function connect (client) { hostname, protocol, port, - version: client[kHTTPContext].version, + version: client[kHTTPContext]?.version, servername: client[kServerName], localAddress: client[kLocalAddress] },