Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Feb 22, 2024
1 parent fd0e02c commit d1fb5b2
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 75 deletions.
5 changes: 4 additions & 1 deletion index.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ module.exports.BalancedPool = BalancedPool
module.exports.Agent = Agent
module.exports.ProxyAgent = ProxyAgent

// XXX: Re-export redirect and retry handlers...
module.exports.interceptors = {
redirect: require('./lib/interceptors/redirect'),
retry: require('./lib/interceptors/retry')
}

module.exports.buildConnector = buildConnector
module.exports.errors = errors
Expand Down
11 changes: 7 additions & 4 deletions lib/api/api-connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const { AsyncResource } = require('node:async_hooks')
const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const util = require('../core/util')
const redirect = require('../handler/RedirectHandler')
const redirect = require('../interceptors/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

class ConnectHandler extends AsyncResource {
Expand Down Expand Up @@ -91,9 +91,12 @@ function connect (opts, callback) {
}

try {
let dispatch = (opts, handler) => this.dispatch(opts, handler)
dispatch = redirect(dispatch)
dispatch({ ...opts, method: 'CONNECT' }, new ConnectHandler(opts, callback))
this
.intercept(dispatcher => opts.maxRedirections ? redirect(dispatcher, null) : dispatcher)
.dispatch({
...opts,
method: 'CONNECT'
}, new ConnectHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
8 changes: 4 additions & 4 deletions lib/api/api-pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const {
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const redirect = require('../handler/RedirectHandler')
const redirect = require('../interceptors/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

const kResume = Symbol('resume')
Expand Down Expand Up @@ -241,9 +241,9 @@ function pipeline (opts, handler) {
try {
const pipelineHandler = new PipelineHandler(opts, handler)

let dispatch = (opts, handler) => this.dispatch(opts, handler)
dispatch = redirect(dispatch)
dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)
this
.intercept(dispatcher => opts.maxRedirections ? redirect(dispatcher, null) : dispatcher)
.dispatch({ ...opts, body: pipelineHandler.req }, pipelineHandler)

return pipelineHandler.ret
} catch (err) {
Expand Down
8 changes: 4 additions & 4 deletions lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const {
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const redirect = require('../handler/RedirectHandler')
const redirect = require('../interceptors/redirect')
const { getResolveErrorBodyCallback } = require('./util')
const { addSignal, removeSignal } = require('./abort-signal')

Expand Down Expand Up @@ -167,9 +167,9 @@ function request (opts, callback) {
}

try {
let dispatch = (opts, handler) => this.dispatch(opts, handler)
dispatch = redirect(dispatch)
dispatch(opts, new RequestHandler(opts, callback))
this
.intercept(dispatcher => opts.maxRedirections ? redirect(dispatcher, null) : dispatcher)
.dispatch(opts, new RequestHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
8 changes: 4 additions & 4 deletions lib/api/api-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ const {
RequestAbortedError
} = require('../core/errors')
const util = require('../core/util')
const redirect = require('../handler/RedirectHandler')
const redirect = require('../interceptors/redirect')
const { getResolveErrorBodyCallback } = require('./util')
const { AsyncResource } = require('node:async_hooks')
const { addSignal, removeSignal } = require('./abort-signal')
Expand Down Expand Up @@ -208,9 +208,9 @@ function stream (opts, factory, callback) {
}

try {
let dispatch = (opts, handler) => this.dispatch(opts, handler)
dispatch = redirect(dispatch)
dispatch(opts, new StreamHandler(opts, factory, callback))
this
.intercept(dispatcher => opts.maxRedirections ? redirect(dispatcher, null) : dispatcher)
.dispatch(opts, new StreamHandler(opts, factory, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
16 changes: 8 additions & 8 deletions lib/api/api-upgrade.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ const assert = require('node:assert')
const { AsyncResource } = require('node:async_hooks')
const { InvalidArgumentError, RequestAbortedError, SocketError } = require('../core/errors')
const util = require('../core/util')
const redirect = require('../handler/RedirectHandler')
const redirect = require('../interceptors/redirect')
const { addSignal, removeSignal } = require('./abort-signal')

class UpgradeHandler extends AsyncResource {
Expand Down Expand Up @@ -88,13 +88,13 @@ function upgrade (opts, callback) {
}

try {
let dispatch = (opts, handler) => this.dispatch(opts, handler)
dispatch = redirect(dispatch)
dispatch({
...opts,
method: opts.method || 'GET',
upgrade: opts.protocol || 'Websocket'
}, new UpgradeHandler(opts, callback))
this
.intercept(dispatcher => opts.maxRedirections ? redirect(dispatcher, null) : dispatcher)
.dispatch({
...opts,
method: opts.method || 'GET',
upgrade: opts.protocol || 'Websocket'
}, new UpgradeHandler(opts, callback))
} catch (err) {
if (typeof callback !== 'function') {
throw err
Expand Down
18 changes: 18 additions & 0 deletions lib/dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,11 @@

const EventEmitter = require('node:events')

const kDispatcherVersion = Symbol.for('undici.dispatcher.version')

class Dispatcher extends EventEmitter {
[kDispatcherVersion] = 1

dispatch () {
throw new Error('not implemented')
}
Expand All @@ -14,6 +18,20 @@ class Dispatcher extends EventEmitter {
destroy () {
throw new Error('not implemented')
}

intercept (...interceptors) {
let dispatcher = this
for (const interceptor of interceptors) {
if (typeof interceptor !== 'function') {
throw new Error('invalid interceptor')
}
dispatcher = interceptor(dispatcher) ?? dispatcher
if (dispatcher[kDispatcherVersion] !== 1) {
throw new Error('invalid dispatcher')
}
}
return dispatcher
}
}

module.exports = Dispatcher
31 changes: 0 additions & 31 deletions lib/handler/DecoratorHandler.js

This file was deleted.

56 changes: 38 additions & 18 deletions lib/handler/RedirectHandler.js → lib/interceptors/redirect.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,28 +24,14 @@ class BodyAsyncIterable {
}

class RedirectHandler {
static decorate (dispatch) {
return (opts, handler) => {
if (opts?.maxRedirections == null || opts.maxRedirections === 0) {
return dispatch(opts, handler)
}

if (!Number.isInteger(opts.maxRedirections) || opts.maxRedirections < 0) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

return dispatch(opts, new RedirectHandler(dispatch, opts.maxRedirections, opts, handler))
}
}

constructor (dispatch, maxRedirections, opts, handler) {
constructor (dispatcher, maxRedirections, opts, handler) {
if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

util.validateHandler(handler, opts.method, opts.upgrade)

this.dispatch = dispatch
this.dispatcher = dispatcher
this.location = null
this.abort = null
this.opts = { ...opts, maxRedirections: 0 } // opts must be a copy
Expand Down Expand Up @@ -182,7 +168,7 @@ class RedirectHandler {
this.location = null
this.abort = null

this.dispatch(this.opts, this)
this.dispatcher.dispatch(this.opts, this)
} else {
this.handler.onComplete(trailers)
}
Expand Down Expand Up @@ -237,4 +223,38 @@ function cleanRequestHeaders (headers, removeContent, unknownOrigin) {
return ret
}

module.exports = RedirectHandler.decorate
class RedirectDispatcher {
#maxRedirections
#dispatcher

constructor (dispatcher, opts) {
if (opts?.maxRedirections != null && (!Number.isInteger(opts.maxRedirections) || opts.maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

this.#maxRedirections = opts?.maxRedirections
this.#dispatcher = dispatcher
}

dispatch (opts, handler) {
const maxRedirections = opts.maxRedirections ?? this.#maxRedirections

if (maxRedirections != null && (!Number.isInteger(maxRedirections) || maxRedirections < 0)) {
throw new InvalidArgumentError('maxRedirections must be a positive number')
}

return maxRedirections
? this.#dispatcher.dispatch(opts, new RedirectHandler(this.#dispatcher, opts.maxRedirections, opts, handler))
: this.#dispatcher.dispatch(opts, handler)
}

close (...args) {
return this.#dispatcher.close(...args)
}

destroy (...args) {
return this.#dispatcher.destroy(...args)
}
}

module.exports = (dispatcher, opts) => new RedirectDispatcher(dispatcher, opts)
22 changes: 21 additions & 1 deletion lib/handler/RetryHandler.js → lib/interceptors/retry.js
Original file line number Diff line number Diff line change
Expand Up @@ -332,4 +332,24 @@ class RetryHandler {
}
}

module.exports = RetryHandler
class RetryDispatcher {
#dispatcher

constructor (dispatcher, opts) {
this.#dispatcher = dispatcher
}

dispatch (opts, handler) {
return this.#dispatcher.dispatch(opts, new RetryHandler(opts, handler))
}

close (...args) {
return this.#dispatcher.close(...args)
}

destroy (...args) {
return this.#dispatcher.destroy(...args)
}
}

module.exports = (dispatcher, opts) => new RetryDispatcher(dispatcher, opts)

0 comments on commit d1fb5b2

Please sign in to comment.