Skip to content

Commit

Permalink
refactor: simplify DialRequest logic per feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
jacobheun committed Jan 24, 2020
1 parent b7432bd commit e1e3be8
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 304 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"multistream-select": "^0.15.0",
"once": "^1.4.0",
"p-any": "^2.1.0",
"p-fifo": "^1.0.0",
"p-map": "^3.0.0",
"p-queue": "^6.1.1",
"p-settle": "^3.1.0",
Expand Down
7 changes: 4 additions & 3 deletions src/dialer.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@ class Dialer {
this.timeout = timeout
this.perPeerLimit = perPeerLimit
this.tokens = [...new Array(concurrency)].map((_, index) => index)

this.releaseToken = this.releaseToken.bind(this)
}

/**
Expand Down Expand Up @@ -68,7 +66,10 @@ class Dialer {
* @returns {Promise<Connection>}
*/
async connectToMultiaddrs (addrs, options = {}) {
const dialAction = (addr, options) => this.transportManager.dial(addr, options)
const dialAction = (addr, options) => {
if (options.signal.aborted) throw errCode(new Error('already aborted'), 'ERR_ALREADY_ABORTED')
return this.transportManager.dial(addr, options)
}
const dialRequest = new DialRequest({
addrs,
dialAction,
Expand Down
174 changes: 24 additions & 150 deletions src/dialer/dial-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@

const AbortController = require('abort-controller')
const AggregateError = require('aggregate-error')
const pDefer = require('p-defer')
const anySignal = require('any-signal')
const debug = require('debug')
const errCode = require('err-code')
const log = debug('libp2p:dialer:request')
log.error = debug('libp2p:dialer:request:error')
const { AbortError } = require('libp2p-interfaces/src/transport/errors')

const { TokenHolder } = require('./token-holder')
const FIFO = require('p-fifo')
const pAny = require('p-any')

class DialRequest {
/**
Expand Down Expand Up @@ -37,161 +36,36 @@ class DialRequest {
* @returns {Connection}
*/
async run (options) {
// Determine how many tokens we need
const tokensWanted = Math.min(this.addrs.length, this.dialer.perPeerLimit)
// Get the tokens
const tokens = this.dialer.getTokens(tokensWanted)
const tokens = this.dialer.getTokens(this.addrs.length)
// If no tokens are available, throw
if (tokens.length < 1) {
throw Object.assign(new Error('No dial tokens available'), { code: 'ERR_NO_DIAL_TOKENS' })
}

// For every token, run a multiaddr dial
// If there are tokens left, release them
// If there are multiaddrs left, wait for tokens to finish
const th = new TokenHolder(tokens, t => this.dialer.releaseToken(t))

// Create the dial functions
const dials = this.addrs.map(addr => {
return () => this._abortableDial(addr, options)
})

const dialResolver = new DialResolver()
while (dials.length > 0) {
if (dialResolver.finished) break
// Wait for the next available token
const token = await th.getToken()
const dial = dials.shift()
dialResolver.add(dial, () => th.releaseToken(token))
}

// Start giving back the tokens
th.drain()
// Flush all the dials to get the final response
return dialResolver.flush()
}

/**
* @private
* @param {Multiaddr} addr
* @param {object} options
* @param {AbortSignal} options.signal An AbortController signal
* @param {number} options.timeout The max dial time for each request in ms
* @returns {{abort: function(), promise: Promise<Connection>}} An AbortableDial
*/
_abortableDial (addr, options) {
log('starting dial to %s', addr)
const controller = new AbortController()
const signals = [controller.signal]
options.signal && signals.push(options.signal)
const signal = anySignal([controller.signal, options.signal])

const promise = this.dialAction(addr, { signal, timeout: options.timeout })
return {
abort: () => controller.abort(),
promise
throw errCode(new Error('No dial tokens available'), 'ERR_NO_DIAL_TOKENS')
}
}
}

class DialResolver {
constructor () {
this.dials = new Set()
this.errors = []
this.finished = false
this.didFlush = false
this._waiting = null
}
const th = new FIFO()
tokens.forEach(t => th.push(t))
const dialAbortControllers = this.addrs.map(() => new AbortController())

/**
* Adds a dial function to the resolver. The function will be immediately
* executed and its resolution tracked.
* @async
* @param {function()} dial A function that returns an AbortableDial
* @param {function()} [finallyHandler] Called when the dial resolves or rejects
*/
async add (dial, finallyHandler) {
if (this.finished) return
const abortableDial = dial()
this.dials.add(abortableDial)
try {
this._onResolve(await abortableDial.promise)
} catch (err) {
this._onReject(err)
return await pAny(this.addrs.map(async (addr, i) => {
const token = await th.shift() // get token
let conn
try {
const signal = dialAbortControllers[i].signal
conn = await this.dialAction(addr, { ...options, signal: anySignal([signal, options.signal]) })
// Remove the successful AbortController so it is no aborted
dialAbortControllers.splice(i, 1)
} catch (err) {
th.push(token) // return to token holder on error so another ma can be attempted
throw err
}
return conn
}))
} finally {
this._onFinally(abortableDial)
finallyHandler && finallyHandler()
}
}

/**
* Called when a dial resolves
* @param {Connection} result
*/
_onResolve (result) {
this.result = result
}

/**
* Called when a dial rejects
* @param {Error} err
*/
_onReject (err) {
if (err.code === AbortError.code) return
this.errors.push(err)
}

_onFinally (dial) {
this.dials.delete(dial)
// If we have a result, or all dials have finished
if (this.result || (this._waiting && this.dials.size === 0)) {
this._onFinish()
}
}

/**
* Called when dialing is completed, which means one of:
* 1. One dial succeeded
* 2. All dials failed
* 3. All dials were aborted
* @private
*/
_onFinish () {
this.finished = true
// Abort all remaining dials
for (const abortableDial of this.dials) {
abortableDial.abort()
}
this.dials.clear()

// Flush must be called
if (!this._waiting) return
// If we have a result, or an abort occurred (no errors and no result)
if (this.result || this.errors.length === 0) {
this._waiting.resolve(this.result)
} else {
this._waiting.reject(new AggregateError(this.errors))
}
}

/**
* Flushes any remaining dials and resolves the first
* successful `Connection`. Flush should be called after all
* dials have been added.
* @returns {Promise<Connection>}
*/
flush () {
if (this.finished) {
if (this.result) {
return Promise.resolve(this.result)
} else {
return Promise.reject(new AggregateError(this.errors))
}
dialAbortControllers.map(c => c.abort()) // success/failure happened, abort everything else
tokens.forEach(t => this.dialer.releaseToken(t)) // release tokens back to the dialer
}
this._waiting = pDefer()
return this._waiting.promise
}
}

module.exports.DialResolver = DialResolver
module.exports.DialRequest = DialRequest
63 changes: 0 additions & 63 deletions src/dialer/token-holder.js

This file was deleted.

83 changes: 0 additions & 83 deletions test/dialing/dial-resolver.spec.js

This file was deleted.

5 changes: 3 additions & 2 deletions test/dialing/direct.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,9 @@ describe('Dialing (direct, TCP)', () => {

// Let the call stack run
await delay(0)
// All dials should have executed
expect(localTM.dial.callCount).to.equal(3)

// Only two dials should be executed, as the first dial will succeed
expect(localTM.dial.callCount).to.equal(2)
expect(dialer.tokens).to.have.length(2)
})

Expand Down
Loading

0 comments on commit e1e3be8

Please sign in to comment.