Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: tryDispatch #3379

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/core/symbols.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module.exports = {
kClose: Symbol('close'),
kDestroy: Symbol('destroy'),
kDispatch: Symbol('dispatch'),
kTryDispatch: Symbol('tryDispatch'),
kUrl: Symbol('url'),
kWriting: Symbol('writing'),
kResuming: Symbol('resuming'),
Expand Down
14 changes: 11 additions & 3 deletions lib/dispatcher/agent.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

const { InvalidArgumentError } = require('../core/errors')
const { kClients, kRunning, kClose, kDestroy, kDispatch, kInterceptors } = require('../core/symbols')
const { kClients, kRunning, kClose, kDestroy, kDispatch, kTryDispatch, kInterceptors } = require('../core/symbols')
const DispatcherBase = require('./dispatcher-base')
const Pool = require('./pool')
const Client = require('./client')
Expand Down Expand Up @@ -79,7 +79,7 @@ class Agent extends DispatcherBase {
return ret
}

[kDispatch] (opts, handler) {
#getDispatcher (opts) {
let key
if (opts.origin && (typeof opts.origin === 'string' || opts.origin instanceof URL)) {
key = String(opts.origin)
Expand All @@ -102,7 +102,15 @@ class Agent extends DispatcherBase {
this[kClients].set(key, dispatcher)
}

return dispatcher.dispatch(opts, handler)
return dispatcher
}

[kDispatch] (opts, handler) {
return this.#getDispatcher(opts).dispatch(opts, handler)
}

[kTryDispatch] (opts, handler) {
return this.#getDispatcher(opts).tryDispatch?.(opts, handler) ?? false
}

async [kClose] () {
Expand Down
11 changes: 11 additions & 0 deletions lib/dispatcher/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const {
kClose,
kDestroy,
kDispatch,
kTryDispatch,
kInterceptors,
kLocalAddress,
kMaxResponseSize,
Expand Down Expand Up @@ -299,6 +300,16 @@ class Client extends DispatcherBase {
this.once('connect', cb)
}

[kTryDispatch] (opts, handler) {
if (this[kBusy]) {
return false
}

this[kDispatch](opts, handler)

return true
}

[kDispatch] (opts, handler) {
const origin = opts.origin || this[kUrl].origin
const request = new Request(origin, opts, handler)
Expand Down
32 changes: 31 additions & 1 deletion lib/dispatcher/dispatcher-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const {
ClientClosedError,
InvalidArgumentError
} = require('../core/errors')
const { kDestroy, kClose, kClosed, kDestroyed, kDispatch, kInterceptors } = require('../core/symbols')
const { kDestroy, kClose, kClosed, kDestroyed, kDispatch, kTryDispatch, kInterceptors } = require('../core/symbols')

const kOnDestroyed = Symbol('onDestroyed')
const kOnClosed = Symbol('onClosed')
Expand Down Expand Up @@ -185,6 +185,36 @@ class DispatcherBase extends Dispatcher {
return false
}
}

tryDispatch (opts, handler) {
if (!handler || typeof handler !== 'object') {
throw new InvalidArgumentError('handler must be an object')
}

try {
if (!opts || typeof opts !== 'object') {
throw new InvalidArgumentError('opts must be an object.')
}

if (this[kDestroyed] || this[kOnDestroyed]) {
throw new ClientDestroyedError()
}

if (this[kClosed]) {
throw new ClientClosedError()
}

return this[kTryDispatch]?.(opts, handler) ?? false
} catch (err) {
if (typeof handler.onError !== 'function') {
throw new InvalidArgumentError('invalid onError method')
}

handler.onError(err)

return false
}
}
}

module.exports = DispatcherBase
25 changes: 23 additions & 2 deletions lib/dispatcher/pool-base.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

const DispatcherBase = require('./dispatcher-base')
const FixedQueue = require('./fixed-queue')
const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch } = require('../core/symbols')
const { kConnected, kSize, kRunning, kPending, kQueued, kBusy, kFree, kUrl, kClose, kDestroy, kDispatch, kTryDispatch } = require('../core/symbols')
const PoolStats = require('./pool-stats')

const kClients = Symbol('clients')
Expand All @@ -14,6 +14,7 @@ const kOnConnect = Symbol('onConnect')
const kOnDisconnect = Symbol('onDisconnect')
const kOnConnectionError = Symbol('onConnectionError')
const kGetDispatcher = Symbol('get dispatcher')
const kGetDispatchers = Symbol('get dispatchers')
const kAddClient = Symbol('add client')
const kRemoveClient = Symbol('remove client')
const kStats = Symbol('stats')
Expand Down Expand Up @@ -134,6 +135,14 @@ class PoolBase extends DispatcherBase {
}

[kDispatch] (opts, handler) {
if (this[kGetDispatchers]) {
for (const dispatcher of this[kGetDispatchers]()) {
if (dispatcher.tryDispatch?.(opts, handler)) {
return true
}
}
}

const dispatcher = this[kGetDispatcher]()

if (!dispatcher) {
Expand All @@ -148,6 +157,17 @@ class PoolBase extends DispatcherBase {
return !this[kNeedDrain]
}

[kTryDispatch] (opts, handler) {
if (this[kGetDispatchers]) {
for (const dispatcher of this[kGetDispatchers]()) {
if (dispatcher[kTryDispatch]?.(opts, handler)) {
return true
}
}
}
return false
}

[kAddClient] (client) {
client
.on('drain', this[kOnDrain])
Expand Down Expand Up @@ -190,5 +210,6 @@ module.exports = {
kNeedDrain,
kAddClient,
kRemoveClient,
kGetDispatcher
kGetDispatcher,
kGetDispatchers
}
13 changes: 12 additions & 1 deletion lib/dispatcher/pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ const {
kClients,
kNeedDrain,
kAddClient,
kGetDispatcher
kGetDispatcher,
kGetDispatchers
} = require('./pool-base')
const Client = require('./client')
const {
Expand Down Expand Up @@ -88,6 +89,16 @@ class Pool extends PoolBase {
return dispatcher
}
}

* [kGetDispatchers] () {
yield * this[kClients]

if (!this[kConnections] || this[kClients].length < this[kConnections]) {
const dispatcher = this[kFactory](this[kUrl], this[kOptions])
this[kAddClient](dispatcher)
yield dispatcher
}
}
}

module.exports = Pool
198 changes: 0 additions & 198 deletions test/interceptor.js

This file was deleted.

Loading