Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Feb 27, 2024
1 parent 079c084 commit 8bda5ee
Show file tree
Hide file tree
Showing 4 changed files with 52 additions and 53 deletions.
3 changes: 1 addition & 2 deletions lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,9 @@ module.exports = {
kHTTP2BuildRequest: Symbol('http2 build request'),
kHTTP1BuildRequest: Symbol('http1 build request'),
kHTTP2CopyHeaders: Symbol('http2 copy headers'),
kHTTPConnVersion: Symbol('http connection version'),
kRetryHandlerDefaultRetry: Symbol('retry agent default retry'),
kConstruct: Symbol('constructable'),
kListeners: Symbol('listeners'),
kHTTPWrite: Symbol('write'),
kHTTPContext: Symbol('http context'),
kMaxConcurrentStreams: Symbol('max concurrent streams')
}
25 changes: 14 additions & 11 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,9 @@ const {
kMaxRequests,
kCounter,
kMaxResponseSize,
kHTTPConnVersion,
kListeners,
kOnError,
kResume,
kHTTPWrite
kResume
} = require('../core/symbols.js')

const constants = require('../llhttp/constants.js')
Expand Down Expand Up @@ -645,9 +643,6 @@ function onParserTimeout (parser) {
}

async function connectH1 (client, socket) {
client[kHTTPConnVersion] = 'h1'
client[kHTTPWrite] = (...args) => writeH1(client, ...args)

if (!llhttpInstance) {
llhttpInstance = await llhttpPromise
llhttpPromise = null
Expand Down Expand Up @@ -737,6 +732,18 @@ async function connectH1 (client, socket) {

client[kResume]()
})

return {
version: 'h1',
write (...args) {
return writeH1(client, ...args)
},
resume () {
resumeH1(client)
},
destroy () {
}
}
}

function resumeH1 (client) {
Expand Down Expand Up @@ -1276,8 +1283,4 @@ class AsyncWriter {
}
}

module.exports = {
connectH1,
writeH1,
resumeH1
}
module.exports = connectH1
29 changes: 16 additions & 13 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,12 @@ const {
kError,
kSocket,
kStrictContentLength,
kHTTPConnVersion,
kOnError,
// HTTP2
kMaxConcurrentStreams,
kHTTP2Session,
kHTTP2CopyHeaders,
kResume,
kHTTPWrite
kResume
} = require('../core/symbols.js')

const kOpenStreams = Symbol('open streams')
Expand Down Expand Up @@ -59,12 +57,6 @@ const {
} = http2

async function connectH2 (client, socket) {
client[kHTTPConnVersion] = 'h2'
client[kHTTPWrite] = (...args) => {
// TODO (fix): return
writeH2(client, ...args)
}

if (!h2ExperimentalWarned) {
h2ExperimentalWarned = true
process.emitWarning('H2 support is experimental, expect them to change at any time.', {
Expand Down Expand Up @@ -131,6 +123,20 @@ async function connectH2 (client, socket) {
socket.on('end', function () {
util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})

return {
version: 'h2',
write (...args) {
// TODO (fix): return
writeH2(client, ...args)
},
resume () {

},
destroy (err) {
session.destroy(err)
}
}
}

function onHttp2SessionError (err) {
Expand Down Expand Up @@ -607,7 +613,4 @@ async function writeIterable ({ h2stream, body, client, request, socket, content
}
}

module.exports = {
connectH2,
writeH2
}
module.exports = connectH2
48 changes: 21 additions & 27 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -56,18 +56,16 @@ const {
kInterceptors,
kLocalAddress,
kMaxResponseSize,
kHTTPConnVersion,
kOnError,
kHTTPWrite,
kHTTPContext,
// HTTP2
kMaxConcurrentStreams,
kHTTP2Session,
kHTTP2BuildRequest,
kHTTP1BuildRequest,
kResume
} = require('../core/symbols.js')
const { connectH1, resumeH1 } = require('./client-h1.js')
const { connectH2 } = require('./client-h2.js')
const connectH1 = require('./client-h1.js')
const connectH2 = require('./client-h2.js')

const kClosedResolve = Symbol('kClosedResolve')

Expand Down Expand Up @@ -236,11 +234,8 @@ class Client extends DispatcherBase {
this[kMaxRequests] = maxRequestsPerClient
this[kClosedResolve] = null
this[kMaxResponseSize] = maxResponseSize > -1 ? maxResponseSize : -1
this[kHTTPConnVersion] = null
this[kMaxConcurrentStreams] = maxConcurrentStreams != null ? maxConcurrentStreams : 100 // Max peerConcurrentStreams for a Node h2 server

// HTTP/2
this[kHTTP2Session] = null
this[kHTTPContext] = null

// kQueue is built up of 3 sections separated by
// the kRunningIdx and kPendingIdx indices.
Expand Down Expand Up @@ -302,8 +297,9 @@ class Client extends DispatcherBase {
[kDispatch] (opts, handler) {
const origin = opts.origin || this[kUrl].origin

// TODO (fix): Why do these need tp be
const request = this[kHTTPConnVersion] === 'h2'
// TODO (fix): Why do these need to be
// TODO (fix): This can happen before connect...
const request = this[kHTTPContext]?.version === 'h2'
? Request[kHTTP2BuildRequest](origin, opts, handler)
: Request[kHTTP1BuildRequest](origin, opts, handler)

Expand Down Expand Up @@ -354,9 +350,9 @@ class Client extends DispatcherBase {
resolve(null)
}

if (this[kHTTP2Session] != null) {
this[kHTTP2Session].destroy(err)
this[kHTTP2Session] = null
if (this[kHTTPContext] != null) {
this[kHTTPContext].destroy(err)
this[kHTTPContext] = null
}

if (this[kSocket]) {
Expand Down Expand Up @@ -418,7 +414,7 @@ async function connect (client) {
hostname,
protocol,
port,
version: client[kHTTPConnVersion],
version: client[kHTTPContext].version,
servername: client[kServerName],
localAddress: client[kLocalAddress]
},
Expand Down Expand Up @@ -453,11 +449,9 @@ async function connect (client) {

assert(socket)

if (socket.alpnProtocol === 'h2') {
await connectH2(client, socket)
} else {
await connectH1(client, socket)
}
client[kHTTPContext] = socket.alpnProtocol === 'h2'
? await connectH2(client, socket)
: await connectH1(client, socket)

socket[kCounter] = 0
socket[kMaxRequests] = client[kMaxRequests]
Expand All @@ -473,7 +467,7 @@ async function connect (client) {
hostname,
protocol,
port,
version: client[kHTTPConnVersion],
version: client[kHTTPContext].version,
servername: client[kServerName],
localAddress: client[kLocalAddress]
},
Expand All @@ -496,7 +490,7 @@ async function connect (client) {
hostname,
protocol,
port,
version: client[kHTTPConnVersion],
version: client[kHTTPContext].version,
servername: client[kServerName],
localAddress: client[kLocalAddress]
},
Expand Down Expand Up @@ -558,8 +552,8 @@ function _resume (client, sync) {

const socket = client[kSocket]

if (socket && socket.alpnProtocol !== 'h2') {
resumeH1(client)
if (client[kHTTPContext]) {
client[kHTTPContext].resume()
}

if (client[kBusy]) {
Expand All @@ -578,7 +572,7 @@ function _resume (client, sync) {
return
}

if (client[kHTTPConnVersion] === 'h1') {
if (client[kHTTPContext]?.version === 'h1') {
if (client[kRunning] >= (client[kPipelining] || 1)) {
return
}
Expand Down Expand Up @@ -612,7 +606,7 @@ function _resume (client, sync) {
return
}

if (client[kHTTPConnVersion] === 'h1') {
if (client[kHTTPContext].version === 'h1') {
if (socket[kWriting] || socket[kReset] || socket[kBlocking]) {
return
}
Expand Down Expand Up @@ -645,7 +639,7 @@ function _resume (client, sync) {
}
}

if (!request.aborted && client[kHTTPWrite](request)) {
if (!request.aborted && client[kHTTPContext].write(request)) {
client[kPendingIdx]++
} else {
client[kQueue].splice(client[kPendingIdx], 1)
Expand Down

0 comments on commit 8bda5ee

Please sign in to comment.