Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: remove timeout-abort-controller dependency (#454)
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain authored Apr 17, 2023
1 parent 08dc6f1 commit 7f3245e
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 60 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@
"p-queue": "^7.3.4",
"private-ip": "^3.0.0",
"protons-runtime": "^5.0.0",
"timeout-abort-controller": "^3.0.0",
"uint8arraylist": "^2.0.0",
"uint8arrays": "^4.0.2",
"varint": "^6.0.0"
Expand Down
5 changes: 1 addition & 4 deletions src/query-self.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { setMaxListeners } from 'events'
import take from 'it-take'
import length from 'it-length'
import { QUERY_SELF_INTERVAL, QUERY_SELF_TIMEOUT, K } from './constants.js'
import { TimeoutController } from 'timeout-abort-controller'
import { anySignal } from 'any-signal'
import { logger, Logger } from '@libp2p/logger'
import type { PeerRouting } from './peer-routing/index.js'
Expand Down Expand Up @@ -71,9 +70,8 @@ export class QuerySelf implements Startable {

_querySelf (): void {
Promise.resolve().then(async () => {
const timeoutController = new TimeoutController(this.queryTimeout)
this.controller = new AbortController()
const signal = anySignal([this.controller.signal, timeoutController.signal])
const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)])

// this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged
try {
Expand All @@ -96,7 +94,6 @@ export class QuerySelf implements Startable {
this.log('query error', err)
} finally {
this.timeoutId = setTimeout(this._querySelf.bind(this), this.interval)
timeoutController.clear()
signal.clear()
}
}).catch(err => {
Expand Down
42 changes: 14 additions & 28 deletions src/query/manager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { TimeoutController } from 'timeout-abort-controller'
import { anySignal } from 'any-signal'
import {
ALPHA, K, DEFAULT_QUERY_TIMEOUT
Expand Down Expand Up @@ -39,7 +38,7 @@ export class QueryManager implements Startable {
private readonly lan: boolean
public disjointPaths: number
private readonly alpha: number
private readonly controllers: Set<AbortController>
private readonly shutDownController: AbortController
private running: boolean
private queries: number
private metrics?: {
Expand All @@ -52,11 +51,19 @@ export class QueryManager implements Startable {

this.components = components
this.disjointPaths = disjointPaths ?? K
this.controllers = new Set()
this.running = false
this.alpha = alpha ?? ALPHA
this.lan = lan
this.queries = 0

// allow us to stop queries on shut down
this.shutDownController = new AbortController()
// make sure we don't make a lot of noise in the logs
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, this.shutDownController.signal)
}
} catch {} // fails on node < 15.4
}

isStarted (): boolean {
Expand All @@ -83,11 +90,7 @@ export class QueryManager implements Startable {
async stop (): Promise<void> {
this.running = false

for (const controller of this.controllers) {
controller.abort()
}

this.controllers.clear()
this.shutDownController.abort()
}

async * run (key: Uint8Array, peers: PeerId[], queryFunc: QueryFunc, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
Expand All @@ -96,32 +99,21 @@ export class QueryManager implements Startable {
}

const stopQueryTimer = this.metrics?.queryTime.timer()
let timeoutController

if (options.signal == null) {
// don't let queries run forever
timeoutController = new TimeoutController(DEFAULT_QUERY_TIMEOUT)
options.signal = timeoutController.signal
options.signal = AbortSignal.timeout(DEFAULT_QUERY_TIMEOUT)

// this signal will get listened to for network requests, etc
// so make sure we don't make a lot of noise in the logs
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, timeoutController.signal)
setMaxListeners(Infinity, options.signal)
}
} catch {} // fails on node < 15.4
}

// allow us to stop queries on shut down
const abortController = new AbortController()
this.controllers.add(abortController)
const signals = [abortController.signal]

if (options.signal != null) {
signals.push(options.signal)
}

const signal = anySignal(signals)
const signal = anySignal([this.shutDownController.signal, options.signal])

// this signal will get listened to for every invocation of queryFunc
// so make sure we don't make a lot of noise in the logs
Expand Down Expand Up @@ -186,12 +178,6 @@ export class QueryManager implements Startable {
} finally {
signal.clear()

this.controllers.delete(abortController)

if (timeoutController != null) {
timeoutController.clear()
}

this.queries--
this.metrics?.runningQueries.update(this.queries)

Expand Down
8 changes: 1 addition & 7 deletions src/query/query-path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { toString } from 'uint8arrays/to-string'
import defer from 'p-defer'
import { CodeError } from '@libp2p/interfaces/errors'
import { convertPeerId, convertBuffer } from '../utils.js'
import { TimeoutController } from 'timeout-abort-controller'
import { anySignal } from 'any-signal'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { EventEmitter } from '@libp2p/interfaces/events'
Expand Down Expand Up @@ -108,12 +107,10 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
const peerXor = BigInt('0x' + toString(xor(peerKadId, kadId), 'base16'))

queue.add(async () => {
let timeout
const signals = [signal]

if (queryFuncTimeout != null) {
timeout = new TimeoutController(queryFuncTimeout)
signals.push(timeout.signal)
signals.push(AbortSignal.timeout(queryFuncTimeout))
}

const compoundSignal = anySignal(signals)
Expand Down Expand Up @@ -158,8 +155,6 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
}
queue.emit('completed', event)
}

timeout?.clear()
} catch (err: any) {
if (!signal.aborted) {
return queryErrorEvent({
Expand All @@ -169,7 +164,6 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
}
} finally {
compoundSignal.clear()
timeout?.clear()
}
}, {
// use xor value as the queue priority - closer peers should execute first
Expand Down
11 changes: 1 addition & 10 deletions src/routing-table/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import KBuck from 'k-bucket'
import * as utils from '../utils.js'
import Queue from 'p-queue'
import { TimeoutController } from 'timeout-abort-controller'
import { logger } from '@libp2p/logger'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Startable } from '@libp2p/interfaces/startable'
Expand Down Expand Up @@ -222,13 +221,9 @@ export class RoutingTable implements Startable {
try {
await Promise.all(
oldContacts.map(async oldContact => {
let timeoutController

try {
timeoutController = new TimeoutController(this.pingTimeout)

const options = {
signal: timeoutController.signal
signal: AbortSignal.timeout(this.pingTimeout)
}

this.log('pinging old contact %p', oldContact.peer)
Expand All @@ -245,10 +240,6 @@ export class RoutingTable implements Startable {
this.kb.remove(oldContact.id)
}
} finally {
if (timeoutController != null) {
timeoutController.clear()
}

this.metrics?.routingTableSize.update(this.size)
}
})
Expand Down
13 changes: 3 additions & 10 deletions src/routing-table/refresh.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { randomBytes } from '@libp2p/crypto'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { logger } from '@libp2p/logger'
import length from 'it-length'
import { TimeoutController } from 'timeout-abort-controller'
import { TABLE_REFRESH_INTERVAL, TABLE_REFRESH_QUERY_TIMEOUT } from '../constants.js'
import type { RoutingTable } from './index.js'
import type { Logger } from '@libp2p/logger'
Expand Down Expand Up @@ -135,16 +134,10 @@ export class RoutingTableRefresh {

this.log('starting refreshing cpl %s with key %p (routing table size was %s)', cpl, peerId, this.routingTable.size)

const controller = new TimeoutController(this.refreshQueryTimeout)
const peers = await length(this.peerRouting.getClosestPeers(peerId.toBytes(), { signal: AbortSignal.timeout(this.refreshQueryTimeout) }))

try {
const peers = await length(this.peerRouting.getClosestPeers(peerId.toBytes(), { signal: controller.signal }))

this.log(`found ${peers} peers that were close to imaginary peer %p`, peerId)
this.log('finished refreshing cpl %s with key %p (routing table size is now %s)', cpl, peerId, this.routingTable.size)
} finally {
controller.clear()
}
this.log(`found ${peers} peers that were close to imaginary peer %p`, peerId)
this.log('finished refreshing cpl %s with key %p (routing table size is now %s)', cpl, peerId, this.routingTable.size)
}

_getTrackedCommonPrefixLengthsForRefresh (maxCommonPrefix: number): Date[] {
Expand Down

0 comments on commit 7f3245e

Please sign in to comment.