Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

fix: remove timeout-abort-controller dependency #454

Merged
merged 2 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@
"p-queue": "^7.3.4",
"private-ip": "^3.0.0",
"protons-runtime": "^5.0.0",
"timeout-abort-controller": "^3.0.0",
"uint8arraylist": "^2.0.0",
"uint8arrays": "^4.0.2",
"varint": "^6.0.0"
Expand Down
5 changes: 1 addition & 4 deletions src/query-self.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { setMaxListeners } from 'events'
import take from 'it-take'
import length from 'it-length'
import { QUERY_SELF_INTERVAL, QUERY_SELF_TIMEOUT, K } from './constants.js'
import { TimeoutController } from 'timeout-abort-controller'
import { anySignal } from 'any-signal'
import { logger, Logger } from '@libp2p/logger'
import type { PeerRouting } from './peer-routing/index.js'
Expand Down Expand Up @@ -71,9 +70,8 @@ export class QuerySelf implements Startable {

_querySelf (): void {
Promise.resolve().then(async () => {
const timeoutController = new TimeoutController(this.queryTimeout)
this.controller = new AbortController()
const signal = anySignal([this.controller.signal, timeoutController.signal])
const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)])

// this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged
try {
Expand All @@ -96,7 +94,6 @@ export class QuerySelf implements Startable {
this.log('query error', err)
} finally {
this.timeoutId = setTimeout(this._querySelf.bind(this), this.interval)
timeoutController.clear()
signal.clear()
}
}).catch(err => {
Expand Down
42 changes: 14 additions & 28 deletions src/query/manager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { TimeoutController } from 'timeout-abort-controller'
import { anySignal } from 'any-signal'
import {
ALPHA, K, DEFAULT_QUERY_TIMEOUT
Expand Down Expand Up @@ -39,7 +38,7 @@ export class QueryManager implements Startable {
private readonly lan: boolean
public disjointPaths: number
private readonly alpha: number
private readonly controllers: Set<AbortController>
private readonly shutDownController: AbortController
private running: boolean
private queries: number
private metrics?: {
Expand All @@ -52,11 +51,19 @@ export class QueryManager implements Startable {

this.components = components
this.disjointPaths = disjointPaths ?? K
this.controllers = new Set()
this.running = false
this.alpha = alpha ?? ALPHA
this.lan = lan
this.queries = 0

// allow us to stop queries on shut down
this.shutDownController = new AbortController()
// make sure we don't make a lot of noise in the logs
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, this.shutDownController.signal)
}
} catch {} // fails on node < 15.4
}

isStarted (): boolean {
Expand All @@ -83,11 +90,7 @@ export class QueryManager implements Startable {
async stop (): Promise<void> {
this.running = false

for (const controller of this.controllers) {
controller.abort()
}

this.controllers.clear()
this.shutDownController.abort()
}

async * run (key: Uint8Array, peers: PeerId[], queryFunc: QueryFunc, options: QueryOptions = {}): AsyncGenerator<QueryEvent> {
Expand All @@ -96,32 +99,21 @@ export class QueryManager implements Startable {
}

const stopQueryTimer = this.metrics?.queryTime.timer()
let timeoutController

if (options.signal == null) {
// don't let queries run forever
timeoutController = new TimeoutController(DEFAULT_QUERY_TIMEOUT)
options.signal = timeoutController.signal
options.signal = AbortSignal.timeout(DEFAULT_QUERY_TIMEOUT)

// this signal will get listened to for network requests, etc
// so make sure we don't make a lot of noise in the logs
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, timeoutController.signal)
setMaxListeners(Infinity, options.signal)
}
} catch {} // fails on node < 15.4
}

// allow us to stop queries on shut down
const abortController = new AbortController()
this.controllers.add(abortController)
const signals = [abortController.signal]

if (options.signal != null) {
signals.push(options.signal)
}

const signal = anySignal(signals)
const signal = anySignal([this.shutDownController.signal, options.signal])

// this signal will get listened to for every invocation of queryFunc
// so make sure we don't make a lot of noise in the logs
Expand Down Expand Up @@ -186,12 +178,6 @@ export class QueryManager implements Startable {
} finally {
signal.clear()

this.controllers.delete(abortController)

if (timeoutController != null) {
timeoutController.clear()
}

this.queries--
this.metrics?.runningQueries.update(this.queries)

Expand Down
8 changes: 1 addition & 7 deletions src/query/query-path.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { toString } from 'uint8arrays/to-string'
import defer from 'p-defer'
import { CodeError } from '@libp2p/interfaces/errors'
import { convertPeerId, convertBuffer } from '../utils.js'
import { TimeoutController } from 'timeout-abort-controller'
import { anySignal } from 'any-signal'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { EventEmitter } from '@libp2p/interfaces/events'
Expand Down Expand Up @@ -108,12 +107,10 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
const peerXor = BigInt('0x' + toString(xor(peerKadId, kadId), 'base16'))

queue.add(async () => {
let timeout
const signals = [signal]

if (queryFuncTimeout != null) {
timeout = new TimeoutController(queryFuncTimeout)
signals.push(timeout.signal)
signals.push(AbortSignal.timeout(queryFuncTimeout))
}

const compoundSignal = anySignal(signals)
Expand Down Expand Up @@ -158,8 +155,6 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
}
queue.emit('completed', event)
}

timeout?.clear()
} catch (err: any) {
if (!signal.aborted) {
return queryErrorEvent({
Expand All @@ -169,7 +164,6 @@ export async function * queryPath (options: QueryPathOptions): AsyncGenerator<Qu
}
} finally {
compoundSignal.clear()
timeout?.clear()
}
}, {
// use xor value as the queue priority - closer peers should execute first
Expand Down
11 changes: 1 addition & 10 deletions src/routing-table/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import KBuck from 'k-bucket'
import * as utils from '../utils.js'
import Queue from 'p-queue'
import { TimeoutController } from 'timeout-abort-controller'
import { logger } from '@libp2p/logger'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { Startable } from '@libp2p/interfaces/startable'
Expand Down Expand Up @@ -222,13 +221,9 @@ export class RoutingTable implements Startable {
try {
await Promise.all(
oldContacts.map(async oldContact => {
let timeoutController

try {
timeoutController = new TimeoutController(this.pingTimeout)

const options = {
signal: timeoutController.signal
signal: AbortSignal.timeout(this.pingTimeout)
}

this.log('pinging old contact %p', oldContact.peer)
Expand All @@ -245,10 +240,6 @@ export class RoutingTable implements Startable {
this.kb.remove(oldContact.id)
}
} finally {
if (timeoutController != null) {
timeoutController.clear()
}

this.metrics?.routingTableSize.update(this.size)
}
})
Expand Down
13 changes: 3 additions & 10 deletions src/routing-table/refresh.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { randomBytes } from '@libp2p/crypto'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { logger } from '@libp2p/logger'
import length from 'it-length'
import { TimeoutController } from 'timeout-abort-controller'
import { TABLE_REFRESH_INTERVAL, TABLE_REFRESH_QUERY_TIMEOUT } from '../constants.js'
import type { RoutingTable } from './index.js'
import type { Logger } from '@libp2p/logger'
Expand Down Expand Up @@ -135,16 +134,10 @@ export class RoutingTableRefresh {

this.log('starting refreshing cpl %s with key %p (routing table size was %s)', cpl, peerId, this.routingTable.size)

const controller = new TimeoutController(this.refreshQueryTimeout)
const peers = await length(this.peerRouting.getClosestPeers(peerId.toBytes(), { signal: AbortSignal.timeout(this.refreshQueryTimeout) }))

try {
const peers = await length(this.peerRouting.getClosestPeers(peerId.toBytes(), { signal: controller.signal }))

this.log(`found ${peers} peers that were close to imaginary peer %p`, peerId)
this.log('finished refreshing cpl %s with key %p (routing table size is now %s)', cpl, peerId, this.routingTable.size)
} finally {
controller.clear()
}
this.log(`found ${peers} peers that were close to imaginary peer %p`, peerId)
this.log('finished refreshing cpl %s with key %p (routing table size is now %s)', cpl, peerId, this.routingTable.size)
}

_getTrackedCommonPrefixLengthsForRefresh (maxCommonPrefix: number): Date[] {
Expand Down