Skip to content

Commit

Permalink
Dispatch compose (nodejs#2795)
Browse files Browse the repository at this point in the history
* feat: new interceptors API

* WIP: compose
  • Loading branch information
ronag authored and KhafraDev committed Jun 24, 2024
1 parent c3831d1 commit 5398e14
Show file tree
Hide file tree
Showing 20 changed files with 987 additions and 328 deletions.
18 changes: 4 additions & 14 deletions lib/api/api-connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const assert = require('node:assert')
const { AsyncResource } = require('node:async_hooks')
const { InvalidArgumentError, SocketError } = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

class ConnectHandler extends AsyncResource {
Expand Down Expand Up @@ -95,19 +95,9 @@ function connect (opts, callback) {
}

try {
const connectHandler = new ConnectHandler(opts, callback)
const connectOptions = { ...opts, method: 'CONNECT' }

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)(connectOptions, connectHandler)
return
}

this.dispatch(connectOptions, connectHandler)
this
.compose(redirect(opts))
.dispatch({ ...opts, method: opts?.method || 'CONNECT' }, new ConnectHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
14 changes: 4 additions & 10 deletions lib/api/api-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const {
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

const kResume = Symbol('resume')
Expand Down Expand Up @@ -243,15 +243,9 @@ function pipeline (opts, handler) {
try {
const pipelineHandler = new PipelineHandler(opts, handler)

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)({ ...opts, body: pipelineHandler.req }, pipelineHandler)
} else {
this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)
}
this
.compose(redirect(opts))
.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)

return pipelineHandler.ret
} catch (err) {
Expand Down
30 changes: 4 additions & 26 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,12 @@
'use strict'

<<<<<<< HEAD
const assert = require('node:assert')
const { Readable } = require('./readable')
const { InvalidArgumentError, RequestAbortedError } = require('../core/errors')
=======
const { AsyncResource } = require('node:async_hooks')
const Readable = require('./readable')
const {
InvalidArgumentError,
RequestAbortedError
} = require('../core/errors')
>>>>>>> 6c405130... refactor(#2722)!: Drop Interceptors support (#2754)
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { getResolveErrorBodyCallback } = require('./util')
<<<<<<< HEAD
const { AsyncResource } = require('node:async_hooks')
=======
const { addSignal, removeSignal } = require('./abort-signal')
>>>>>>> 6c405130... refactor(#2722)!: Drop Interceptors support (#2754)

class RequestHandler extends AsyncResource {
constructor (opts, callback) {
Expand Down Expand Up @@ -208,18 +195,9 @@ function request (opts, callback) {
}

try {
const handler = new RequestHandler(opts, callback)

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)(opts, handler)
return
}

this.dispatch(opts, handler)
this
.compose(redirect(opts))
.dispatch(opts, new RequestHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
17 changes: 4 additions & 13 deletions lib/api/api-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const assert = require('node:assert')
const { finished, PassThrough } = require('node:stream')
const { InvalidArgumentError, InvalidReturnValueError } = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('node:async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')
Expand Down Expand Up @@ -208,18 +208,9 @@ function stream (opts, factory, callback) {
}

try {
const handler = new StreamHandler(opts, factory, callback)

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)(opts, handler)
return
}

this.dispatch(opts, handler)
this
.compose(redirect(opts))
.dispatch(opts, new StreamHandler(opts, factory, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
28 changes: 5 additions & 23 deletions lib/api/api-upgrade.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
'use strict'

<<<<<<< HEAD
const { InvalidArgumentError, SocketError } = require('../core/errors')
=======
const assert = require('node:assert')
>>>>>>> 6c405130... refactor(#2722)!: Drop Interceptors support (#2754)
const { AsyncResource } = require('node:async_hooks')
const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const { InvalidArgumentError, SocketError } = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

class UpgradeHandler extends AsyncResource {
Expand Down Expand Up @@ -95,23 +91,9 @@ function upgrade (opts, callback) {
}

try {
const upgradeHandler = new UpgradeHandler(opts, callback)
const upgradeOpts = {
...opts,
method: opts.method || 'GET',
upgrade: opts.protocol || 'Websocket'
}

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)(upgradeOpts, upgradeHandler)
return
}

this.dispatch(upgradeOpts, upgradeHandler)
this
.compose(redirect(opts))
.dispatch({ ...opts, method: opts?.method || 'GET', upgrade: opts?.protocol || 'Websocket' }, new UpgradeHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
File renamed without changes.
206 changes: 206 additions & 0 deletions lib/interceptor/proxy.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
'use strict'

const { kProxy, kClose, kDestroy } = require('../core/symbols')
const { URL } = require('node:url')
const Agent = require('../agent')
const Pool = require('../pool')
const DispatcherBase = require('../dispatcher-base')
const { InvalidArgumentError, RequestAbortedError } = require('../core/errors')
const buildConnector = require('../core/connect')

const kAgent = Symbol('proxy agent')
const kClient = Symbol('proxy client')
const kProxyHeaders = Symbol('proxy headers')
const kRequestTls = Symbol('request tls settings')
const kProxyTls = Symbol('proxy tls settings')
const kConnectEndpoint = Symbol('connect endpoint function')

function defaultProtocolPort (protocol) {
return protocol === 'https:' ? 443 : 80
}

function buildProxyOptions (opts) {
if (typeof opts === 'string') {
opts = { uri: opts }
}

if (!opts || !opts.uri) {
throw new InvalidArgumentError('Proxy opts.uri is mandatory')
}

return {
uri: opts.uri,
protocol: opts.protocol || 'https'
}
}

function defaultFactory (origin, opts) {
return new Pool(origin, opts)
}

class ProxyAgent extends DispatcherBase {
constructor (dispatcher, opts) {
super(opts)
this[kProxy] = buildProxyOptions(opts)
this[kAgent] = dispatcher

if (typeof opts === 'string') {
opts = { uri: opts }
}

if (!opts || !opts.uri) {
throw new InvalidArgumentError('Proxy opts.uri is mandatory')
}

const { clientFactory = defaultFactory } = opts

if (typeof clientFactory !== 'function') {
throw new InvalidArgumentError('Proxy opts.clientFactory must be a function.')
}

this[kRequestTls] = opts.requestTls
this[kProxyTls] = opts.proxyTls
this[kProxyHeaders] = opts.headers || {}

const resolvedUrl = new URL(opts.uri)
const { origin, port, username, password } = resolvedUrl

if (opts.auth && opts.token) {
throw new InvalidArgumentError('opts.auth cannot be used in combination with opts.token')
} else if (opts.auth) {
/* @deprecated in favour of opts.token */
this[kProxyHeaders]['proxy-authorization'] = `Basic ${opts.auth}`
} else if (opts.token) {
this[kProxyHeaders]['proxy-authorization'] = opts.token
} else if (username && password) {
this[kProxyHeaders]['proxy-authorization'] = `Basic ${Buffer.from(`${decodeURIComponent(username)}:${decodeURIComponent(password)}`).toString('base64')}`
}

const connect = buildConnector({ ...opts.proxyTls })
this[kConnectEndpoint] = buildConnector({ ...opts.requestTls })
this[kClient] = clientFactory(resolvedUrl, { connect })
this[kAgent] = new Agent({
...opts,
connect: async (opts, callback) => {
let requestedHost = opts.host
if (!opts.port) {
requestedHost += `:${defaultProtocolPort(opts.protocol)}`
}
try {
const { socket, statusCode } = await this[kClient].connect({
origin,
port,
path: requestedHost,
signal: opts.signal,
headers: {
...this[kProxyHeaders],
host: requestedHost
}
})
if (statusCode !== 200) {
socket.on('error', () => {}).destroy()
callback(new RequestAbortedError(`Proxy response (${statusCode}) !== 200 when HTTP Tunneling`))
}
if (opts.protocol !== 'https:') {
callback(null, socket)
return
}
let servername
if (this[kRequestTls]) {
servername = this[kRequestTls].servername
} else {
servername = opts.servername
}
this[kConnectEndpoint]({ ...opts, servername, httpSocket: socket }, callback)
} catch (err) {
callback(err)
}
}
})
}

dispatch (opts, handler) {
const { host } = new URL(opts.origin)
const headers = buildHeaders(opts.headers)
throwIfProxyAuthIsSent(headers)
return this[kAgent].dispatch(
{
...opts,
headers: {
...headers,
host
}
},
handler
)
}

async [kClose] () {
await this[kAgent].close()
await this[kClient].close()
}

async [kDestroy] () {
await this[kAgent].destroy()
await this[kClient].destroy()
}
}

/**
* @param {string[] | Record<string, string>} headers
* @returns {Record<string, string>}
*/
function buildHeaders (headers) {
// When using undici.fetch, the headers list is stored
// as an array.
if (Array.isArray(headers)) {
/** @type {Record<string, string>} */
const headersPair = {}

for (let i = 0; i < headers.length; i += 2) {
headersPair[headers[i]] = headers[i + 1]
}

return headersPair
}

return headers
}

/**
* @param {Record<string, string>} headers
*
* Previous versions of ProxyAgent suggests the Proxy-Authorization in request headers
* Nevertheless, it was changed and to avoid a security vulnerability by end users
* this check was created.
* It should be removed in the next major version for performance reasons
*/
function throwIfProxyAuthIsSent (headers) {
const existProxyAuth = headers && Object.keys(headers)
.find((key) => key.toLowerCase() === 'proxy-authorization')
if (existProxyAuth) {
throw new InvalidArgumentError('Proxy-Authorization should be sent in ProxyAgent constructor')
}
}

module.exports = (opts) => {
if (typeof opts === 'string') {
opts = { uri: opts }
}

if (!opts || !opts.uri) {
throw new InvalidArgumentError('Proxy opts.uri is mandatory')
}

const { clientFactory = defaultFactory } = opts

if (typeof clientFactory !== 'function') {
throw new InvalidArgumentError('Proxy opts.clientFactory must be a function.')
}

if (opts.auth && opts.token) {
throw new InvalidArgumentError('opts.auth cannot be used in combination with opts.token')
}

return (dispatcher) => new ProxyAgent(dispatcher, opts)
}
File renamed without changes.
Loading

0 comments on commit 5398e14

Please sign in to comment.