Skip to content

Commit

Permalink
refactor: move out more h2 from core client (#2860)
Browse files Browse the repository at this point in the history
* refactor: move out more h2 from core client

* WIP

* WIP

* WIP
  • Loading branch information
ronag committed Feb 28, 2024
1 parent c10c310 commit 19b4d39
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 84 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,5 @@ undici-fetch.js

# .npmrc has platform specific value for windows
.npmrc

.tap
5 changes: 3 additions & 2 deletions lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ module.exports = {
kHTTP2BuildRequest: Symbol('http2 build request'),
kHTTP1BuildRequest: Symbol('http1 build request'),
kHTTP2CopyHeaders: Symbol('http2 copy headers'),
kHTTPConnVersion: Symbol('http connection version'),
kRetryHandlerDefaultRetry: Symbol('retry agent default retry'),
kConstruct: Symbol('constructable'),
kListeners: Symbol('listeners')
kListeners: Symbol('listeners'),
kHTTPContext: Symbol('http context'),
kMaxConcurrentStreams: Symbol('max concurrent streams')
}
21 changes: 13 additions & 8 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ const {
kMaxRequests,
kCounter,
kMaxResponseSize,
kHTTPConnVersion,
kListeners,
kOnError,
kResume
Expand Down Expand Up @@ -644,8 +643,6 @@ function onParserTimeout (parser) {
}

async function connectH1 (client, socket) {
client[kHTTPConnVersion] = 'h1'

if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
llhttpPromise = null
Expand Down Expand Up @@ -735,6 +732,18 @@ async function connectH1 (client, socket) {

client[kResume]()
})

return {
version: 'h1',
write (...args) {
return writeH1(client, ...args)
},
resume () {
resumeH1(client)
},
destroy () {
}
}
}

function resumeH1 (client) {
Expand Down Expand Up @@ -1274,8 +1283,4 @@ class AsyncWriter {
}
}

module.exports = {
connectH1,
writeH1,
resumeH1
}
module.exports = connectH1
61 changes: 36 additions & 25 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ const {
kError,
kSocket,
kStrictContentLength,
kHTTPConnVersion,
kOnError,
// HTTP2
kHost,
kMaxConcurrentStreams,
kHTTP2Session,
kHTTP2SessionState,
kHTTP2CopyHeaders,
kResume
} = require('../core/symbols.js')

const kOpenStreams = Symbol('open streams')

// Experimental
let h2ExperimentalWarned = false

Expand All @@ -57,8 +57,6 @@ const {
} = http2

async function connectH2 (client, socket) {
client[kHTTPConnVersion] = 'h2'

if (!h2ExperimentalWarned) {
h2ExperimentalWarned = true
process.emitWarning('H2 support is experimental, expect them to change at any time.', {
Expand All @@ -68,9 +66,10 @@ async function connectH2 (client, socket) {

const session = http2.connect(client[kUrl], {
createConnection: () => socket,
peerMaxConcurrentStreams: client[kHTTP2SessionState].maxConcurrentStreams
peerMaxConcurrentStreams: client[kMaxConcurrentStreams]
})

session[kOpenStreams] = 0
session[kClient] = client
session[kSocket] = socket
session.on('error', onHttp2SessionError)
Expand Down Expand Up @@ -124,6 +123,20 @@ async function connectH2 (client, socket) {
socket.on('end', function () {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})

return {
version: 'h2',
write (...args) {
// TODO (fix): return
writeH2(client, ...args)
},
resume () {

},
destroy (err) {
session.destroy(err)
}
}
}

function onHttp2SessionError (err) {
Expand Down Expand Up @@ -217,9 +230,10 @@ function writeH2 (client, request) {

/** @type {import('node:http2').ClientHttp2Stream} */
let stream
const h2State = client[kHTTP2SessionState]

headers[HTTP2_HEADER_AUTHORITY] = host || client[kHost]
const { hostname, port } = client[kUrl]

headers[HTTP2_HEADER_AUTHORITY] = host || `${hostname}${port ? `:${port}` : ''}`
headers[HTTP2_HEADER_METHOD] = method

try {
Expand All @@ -235,8 +249,8 @@ function writeH2 (client, request) {
if (stream != null) {
util.destroy(stream, err)

h2State.openStreams -= 1
if (h2State.openStreams === 0) {
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}
}
Expand All @@ -257,18 +271,18 @@ function writeH2 (client, request) {

if (stream.id && !stream.pending) {
request.onUpgrade(null, null, stream)
++h2State.openStreams
++session[kOpenStreams]
} else {
stream.once('ready', () => {
request.onUpgrade(null, null, stream)
++h2State.openStreams
++session[kOpenStreams]
})
}

stream.once('close', () => {
h2State.openStreams -= 1
session[kOpenStreams] -= 1
// TODO(HTTP/2): unref only if current streams count is 0
if (h2State.openStreams === 0) session.unref()
if (session[kOpenStreams] === 0) session.unref()
})

return true
Expand Down Expand Up @@ -348,7 +362,7 @@ function writeH2 (client, request) {
}

// Increment counter as we have new several streams open
++h2State.openStreams
++session[kOpenStreams]

stream.once('response', headers => {
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
Expand All @@ -371,8 +385,8 @@ function writeH2 (client, request) {
// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side
h2State.openStreams -= 1
if (h2State.openStreams === 0) {
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}

Expand All @@ -388,16 +402,16 @@ function writeH2 (client, request) {
})

stream.once('close', () => {
h2State.openStreams -= 1
session[kOpenStreams] -= 1
// TODO(HTTP/2): unref only if current streams count is 0
if (h2State.openStreams === 0) {
if (session[kOpenStreams] === 0) {
session.unref()
}
})

stream.once('error', function (err) {
if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
h2State.streams -= 1
session[kOpenStreams] -= 1
util.destroy(stream, err)
}
})
Expand All @@ -407,7 +421,7 @@ function writeH2 (client, request) {
errorRequest(client, request, err)

if (client[kHTTP2Session] && !client[kHTTP2Session].destroyed && !this.closed && !this.destroyed) {
h2State.streams -= 1
session[kOpenStreams] -= 1
util.destroy(stream, err)
}
})
Expand Down Expand Up @@ -599,7 +613,4 @@ async function writeIterable ({ h2stream, body, client, request, socket, content
}
}

module.exports = {
connectH2,
writeH2
}
module.exports = connectH2
Loading

0 comments on commit 19b4d39

Please sign in to comment.