Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dispatch compose #2795

Merged
merged 2 commits into from
Feb 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,5 @@ fuzz-results-*.json
# Bundle output
undici-fetch.js
/test/imports/undici-import.js

.tap
15 changes: 5 additions & 10 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,7 @@ const MockClient = require('./lib/mock/mock-client')
const MockAgent = require('./lib/mock/mock-agent')
const MockPool = require('./lib/mock/mock-pool')
const mockErrors = require('./lib/mock/mock-errors')
const ProxyAgent = require('./lib/proxy-agent')
const RetryAgent = require('./lib/retry-agent')
const RetryHandler = require('./lib/handler/RetryHandler')
const { getGlobalDispatcher, setGlobalDispatcher } = require('./lib/global')
const DecoratorHandler = require('./lib/handler/DecoratorHandler')
const RedirectHandler = require('./lib/handler/RedirectHandler')

Object.assign(Dispatcher.prototype, api)

Expand All @@ -28,12 +23,12 @@ module.exports.Client = Client
module.exports.Pool = Pool
module.exports.BalancedPool = BalancedPool
module.exports.Agent = Agent
module.exports.ProxyAgent = ProxyAgent
module.exports.RetryAgent = RetryAgent
module.exports.RetryHandler = RetryHandler

module.exports.DecoratorHandler = DecoratorHandler
module.exports.RedirectHandler = RedirectHandler
module.exports.interceptor = {
redirect: require('./lib/interceptor/redirect'),
retry: require('./lib/interceptor/retry'),
proxy: require('./lib/interceptor/proxy')
}

module.exports.buildConnector = buildConnector
module.exports.errors = errors
Expand Down
18 changes: 4 additions & 14 deletions lib/api/api-connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const { AsyncResource } = require('node:async_hooks')
const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

class ConnectHandler extends AsyncResource {
Expand Down Expand Up @@ -91,19 +91,9 @@ function connect (opts, callback) {
}

try {
const connectHandler = new ConnectHandler(opts, callback)
const connectOptions = { ...opts, method: 'CONNECT' }

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)(connectOptions, connectHandler)
return
}

this.dispatch(connectOptions, connectHandler)
this
.compose(redirect(opts))
.dispatch({ ...opts, method: opts?.method || 'CONNECT' }, new ConnectHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
14 changes: 4 additions & 10 deletions lib/api/api-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const {
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

const kResume = Symbol('resume')
Expand Down Expand Up @@ -241,15 +241,9 @@ function pipeline (opts, handler) {
try {
const pipelineHandler = new PipelineHandler(opts, handler)

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)({ ...opts, body: pipelineHandler.req }, pipelineHandler)
} else {
this.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)
}
this
.compose(redirect(opts))
.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)

return pipelineHandler.ret
} catch (err) {
Expand Down
17 changes: 4 additions & 13 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const {
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { getResolveErrorBodyCallback } = require('./util')
const { addSignal, removeSignal } = require('./abort-signal')

Expand Down Expand Up @@ -167,18 +167,9 @@ function request (opts, callback) {
}

try {
const handler = new RequestHandler(opts, callback)

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)(opts, handler)
return
}

this.dispatch(opts, handler)
this
.compose(redirect(opts))
.dispatch(opts, new RequestHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
17 changes: 4 additions & 13 deletions lib/api/api-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const {
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('node:async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')
Expand Down Expand Up @@ -208,18 +208,9 @@ function stream (opts, factory, callback) {
}

try {
const handler = new StreamHandler(opts, factory, callback)

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)(opts, handler)
return
}

this.dispatch(opts, handler)
this
.compose(redirect(opts))
.dispatch(opts, new StreamHandler(opts, factory, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
22 changes: 4 additions & 18 deletions lib/api/api-upgrade.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const assert = require('node:assert')
const { AsyncResource } = require('node:async_hooks')
const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const util = require('../core/util')
const RedirectHandler = require('../handler/RedirectHandler')
const redirect = require('../interceptor/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

class UpgradeHandler extends AsyncResource {
Expand Down Expand Up @@ -88,23 +88,9 @@ function upgrade (opts, callback) {
}

try {
const upgradeHandler = new UpgradeHandler(opts, callback)
const upgradeOpts = {
...opts,
method: opts.method || 'GET',
upgrade: opts.protocol || 'Websocket'
}

if (opts?.maxRedirections != null && (!Number.isInteger(opts?.maxRedirections) || opts?.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

if (opts?.maxRedirections > 0) {
RedirectHandler.buildDispatch(this, opts.maxRedirections)(upgradeOpts, upgradeHandler)
return
}

this.dispatch(upgradeOpts, upgradeHandler)
this
.compose(redirect(opts))
.dispatch({ ...opts, method: opts?.method || 'GET', upgrade: opts?.protocol || 'Websocket' }, new UpgradeHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
24 changes: 24 additions & 0 deletions lib/dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

const EventEmitter = require('node:events')

const kDispatcherVersion = Symbol.for('undici.dispatcher.version')

class Dispatcher extends EventEmitter {
[kDispatcherVersion] = 1

dispatch () {
throw new Error('not implemented')
}
Expand All @@ -14,6 +18,26 @@ class Dispatcher extends EventEmitter {
destroy () {
throw new Error('not implemented')
}

compose (...interceptors) {
let dispatcher = this
for (const interceptor of interceptors) {
if (interceptor == null) {
continue
}

if (typeof interceptor !== 'function') {
throw new Error('invalid interceptor')
}

dispatcher = interceptor(dispatcher) ?? dispatcher
ronag marked this conversation as resolved.
Show resolved Hide resolved

if (dispatcher[kDispatcherVersion] !== 1) {
throw new Error('invalid dispatcher')
}
ronag marked this conversation as resolved.
Show resolved Hide resolved
}
return dispatcher
}
}

module.exports = Dispatcher
31 changes: 0 additions & 31 deletions lib/handler/DecoratorHandler.js

This file was deleted.

38 changes: 29 additions & 9 deletions lib/proxy-agent.js → lib/interceptor/proxy.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
'use strict'

const { kProxy, kClose, kDestroy } = require('./core/symbols')
const { kProxy, kClose, kDestroy } = require('../core/symbols')
const { URL } = require('node:url')
const Agent = require('./agent')
const Pool = require('./pool')
const DispatcherBase = require('./dispatcher-base')
const { InvalidArgumentError, RequestAbortedError } = require('./core/errors')
const buildConnector = require('./core/connect')
const Agent = require('../agent')
const Pool = require('../pool')
const DispatcherBase = require('../dispatcher-base')
const { InvalidArgumentError, RequestAbortedError } = require('../core/errors')
const buildConnector = require('../core/connect')

const kAgent = Symbol('proxy agent')
const kClient = Symbol('proxy client')
Expand Down Expand Up @@ -39,10 +39,10 @@ function defaultFactory (origin, opts) {
}

class ProxyAgent extends DispatcherBase {
constructor (opts) {
constructor (dispatcher, opts) {
super(opts)
this[kProxy] = buildProxyOptions(opts)
this[kAgent] = new Agent(opts)
this[kAgent] = dispatcher

if (typeof opts === 'string') {
opts = { uri: opts }
Expand Down Expand Up @@ -183,4 +183,24 @@ function throwIfProxyAuthIsSent (headers) {
}
}

module.exports = ProxyAgent
module.exports = (opts) => {
if (typeof opts === 'string') {
opts = { uri: opts }
}

if (!opts || !opts.uri) {
throw new InvalidArgumentError('Proxy opts.uri is mandatory')
}

const { clientFactory = defaultFactory } = opts

if (typeof clientFactory !== 'function') {
throw new InvalidArgumentError('Proxy opts.clientFactory must be a function.')
}

if (opts.auth && opts.token) {
throw new InvalidArgumentError('opts.auth cannot be used in combination with opts.token')
}

return (dispatcher) => new ProxyAgent(dispatcher, opts)
}
Loading
Loading