Skip to content

Commit

Permalink
refactor: move setMaxListeners to @libp2p/interface (#2154)
Browse files Browse the repository at this point in the history
- Discussed in #2138
- Move `setMaxListeners` handling into a single wrapped function exported from `@libp2p/interface/events`
- Instead of setting `"browser": {"events": false}` as was discussed in #2138, an actual file is given. Without doing this, an annoying warning is produced in this package and every consumer package in the monorepo!
  • Loading branch information
wemeetagain authored Oct 31, 2023
1 parent 6958136 commit 50442d7
Show file tree
Hide file tree
Showing 16 changed files with 47 additions and 90 deletions.
3 changes: 3 additions & 0 deletions packages/interface/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@
"!dist/test",
"!**/*.tsbuildinfo"
],
"browser": {
"events": "./dist/src/events.browser.js"
},
"exports": {
".": {
"types": "./dist/src/index.d.ts",
Expand Down
2 changes: 2 additions & 0 deletions packages/interface/src/events.browser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/** Noop for browser compatibility */
export function setMaxListeners (): void {}
11 changes: 11 additions & 0 deletions packages/interface/src/events.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import { setMaxListeners as nodeSetMaxListeners } from 'events'

export interface EventCallback<EventType> { (evt: EventType): void }
export interface EventObject<EventType> { handleEvent: EventCallback<EventType> }
export type EventHandler<EventType> = EventCallback<EventType> | EventObject<EventType>
Expand Down Expand Up @@ -117,3 +119,12 @@ export const CustomEvent = globalThis.CustomEvent ?? CustomEventPolyfill

// TODO: remove this in v1
export { TypedEventEmitter as EventEmitter }

// create a setMaxListeners that doesn't break browser usage
export const setMaxListeners: typeof nodeSetMaxListeners = (n, ...eventTargets) => {
try {
nodeSetMaxListeners(n, ...eventTargets)
} catch {
// swallow error, gulp
}
}
3 changes: 1 addition & 2 deletions packages/kad-dht/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
"test:chrome-webworker": "aegir test -t webworker",
"test:firefox": "aegir test -t browser -- --browser firefox",
"test:firefox-webworker": "aegir test -t webworker -- --browser firefox",
"dep-check": "aegir dep-check -i events"
"dep-check": "aegir dep-check"
},
"dependencies": {
"@libp2p/crypto": "^2.0.6",
Expand All @@ -63,7 +63,6 @@
"abortable-iterator": "^5.0.1",
"any-signal": "^4.1.1",
"datastore-core": "^9.0.1",
"events": "^3.3.0",
"hashlru": "^2.3.0",
"interface-datastore": "^8.2.0",
"it-all": "^3.0.2",
Expand Down
8 changes: 2 additions & 6 deletions packages/kad-dht/src/query-self.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { setMaxListeners } from 'events'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger, type Logger } from '@libp2p/logger'
import { anySignal } from 'any-signal'
import length from 'it-length'
Expand Down Expand Up @@ -110,11 +110,7 @@ export class QuerySelf implements Startable {
const signal = anySignal([this.controller.signal, AbortSignal.timeout(this.queryTimeout)])

// this controller will get used for lots of dial attempts so make sure we don't cause warnings to be logged
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, signal)
}
} catch {} // fails on node < 15.4
setMaxListeners(Infinity, signal)

try {
if (this.routingTable.size === 0) {
Expand Down
21 changes: 4 additions & 17 deletions packages/kad-dht/src/query/manager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { setMaxListeners } from 'events'
import { AbortError } from '@libp2p/interface/errors'
import { TypedEventEmitter, CustomEvent } from '@libp2p/interface/events'
import { TypedEventEmitter, CustomEvent, setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { PeerSet } from '@libp2p/peer-collections'
import { anySignal } from 'any-signal'
Expand Down Expand Up @@ -75,11 +74,7 @@ export class QueryManager implements Startable {
// allow us to stop queries on shut down
this.shutDownController = new AbortController()
// make sure we don't make a lot of noise in the logs
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, this.shutDownController.signal)
}
} catch {} // fails on node < 15.4
setMaxListeners(Infinity, this.shutDownController.signal)
}

isStarted (): boolean {
Expand Down Expand Up @@ -122,22 +117,14 @@ export class QueryManager implements Startable {

// this signal will get listened to for network requests, etc
// so make sure we don't make a lot of noise in the logs
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, options.signal)
}
} catch {} // fails on node < 15.4
setMaxListeners(Infinity, options.signal)
}

const signal = anySignal([this.shutDownController.signal, options.signal])

// this signal will get listened to for every invocation of queryFunc
// so make sure we don't make a lot of noise in the logs
try {
if (setMaxListeners != null) {
setMaxListeners(Infinity, signal)
}
} catch {} // fails on node < 15.4
setMaxListeners(Infinity, signal)

const log = logger(`libp2p:kad-dht:${this.lan ? 'lan' : 'wan'}:query:` + uint8ArrayToString(key, 'base58btc'))

Expand Down
3 changes: 1 addition & 2 deletions packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
"scripts": {
"clean": "aegir clean",
"lint": "aegir lint",
"dep-check": "aegir dep-check -i events",
"dep-check": "aegir dep-check",
"prepublishOnly": "node scripts/update-version.js && npm run build",
"build": "aegir build",
"generate": "run-s generate:proto:*",
Expand Down Expand Up @@ -139,7 +139,6 @@
"any-signal": "^4.1.1",
"datastore-core": "^9.0.1",
"delay": "^6.0.0",
"events": "^3.3.0",
"interface-datastore": "^8.2.0",
"it-all": "^3.0.2",
"it-drain": "^3.0.2",
Expand Down
12 changes: 3 additions & 9 deletions packages/libp2p/src/autonat/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
* ```
*/

import { setMaxListeners } from 'events'
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { createEd25519PeerId } from '@libp2p/peer-id-factory'
Expand Down Expand Up @@ -164,10 +164,7 @@ class DefaultAutoNATService implements Startable {

// this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning
// appearing in the console
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch {}
setMaxListeners(Infinity, signal)

const ourHosts = this.components.addressManager.getAddresses()
.map(ma => ma.toOptions().host)
Expand Down Expand Up @@ -432,10 +429,7 @@ class DefaultAutoNATService implements Startable {

// this controller may be used while dialing lots of peers so prevent MaxListenersExceededWarning
// appearing in the console
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch {}
setMaxListeners(Infinity, signal)

const self = this

Expand Down
8 changes: 2 additions & 6 deletions packages/libp2p/src/circuit-relay/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { setMaxListeners } from 'events'
import { TypedEventEmitter } from '@libp2p/interface/events'
import { TypedEventEmitter, setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { peerIdFromBytes } from '@libp2p/peer-id'
import { RecordEnvelope } from '@libp2p/peer-record'
Expand Down Expand Up @@ -134,10 +133,7 @@ class CircuitRelayServer extends TypedEventEmitter<RelayServerEvents> implements
this.maxOutboundHopStreams = init.maxOutboundHopStreams
this.maxOutboundStopStreams = init.maxOutboundStopStreams ?? defaults.maxOutboundStopStreams

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, this.shutdownController.signal)
} catch { }
setMaxListeners(Infinity, this.shutdownController.signal)

if (init.advertise != null && init.advertise !== false) {
this.advertService = new AdvertService(components, init.advertise === true ? undefined : init.advertise)
Expand Down
7 changes: 2 additions & 5 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { setMaxListeners } from 'events'
import { AbortError, CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { type Multiaddr, type Resolver, resolvers } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -96,10 +96,7 @@ export class DialQueue {
this.transportManager = components.transportManager
this.shutDownController = new AbortController()

try {
// This emitter gets listened to a lot
setMaxListeners?.(Infinity, this.shutDownController.signal)
} catch {}
setMaxListeners(Infinity, this.shutDownController.signal)

this.pendingDialCount = components.metrics?.registerMetric('libp2p_dialler_pending_dials')
this.inProgressDialCount = components.metrics?.registerMetric('libp2p_dialler_in_progress_dials')
Expand Down
12 changes: 3 additions & 9 deletions packages/libp2p/src/connection-manager/utils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { setMaxListeners } from 'events'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { type AbortOptions, multiaddr, type Multiaddr } from '@multiformats/multiaddr'
import { type ClearableSignal, anySignal } from 'any-signal'
Expand Down Expand Up @@ -55,21 +55,15 @@ export function combineSignals (...signals: Array<AbortSignal | undefined>): Cle

for (const sig of signals) {
if (sig != null) {
try {
// fails on node < 15.4
setMaxListeners?.(Infinity, sig)
} catch { }
setMaxListeners(Infinity, sig)
sigs.push(sig)
}
}

// let any signal abort the dial
const signal = anySignal(sigs)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch {}
setMaxListeners(Infinity, signal)

return signal
}
7 changes: 2 additions & 5 deletions packages/libp2p/src/connection/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { setMaxListeners } from 'events'
import { symbol } from '@libp2p/interface/connection'
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import type { AbortOptions } from '@libp2p/interface'
import type { Direction, Connection, Stream, ConnectionTimeline, ConnectionStatus, NewStreamOptions } from '@libp2p/interface/connection'
Expand Down Expand Up @@ -152,10 +152,7 @@ export class ConnectionImpl implements Connection {

options.signal = options?.signal ?? AbortSignal.timeout(CLOSE_TIMEOUT)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, options.signal)
} catch { }
setMaxListeners(Infinity, options.signal)

try {
log.trace('closing all streams')
Expand Down
7 changes: 2 additions & 5 deletions packages/libp2p/src/fetch/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { setMaxListeners } from 'events'
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import first from 'it-first'
import * as lp from 'it-length-prefixed'
Expand Down Expand Up @@ -146,10 +146,7 @@ class DefaultFetchService implements Startable, FetchService {
log('using default timeout of %d ms', this.init.timeout)
signal = AbortSignal.timeout(this.init.timeout ?? DEFAULT_TIMEOUT)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch {}
setMaxListeners(Infinity, signal)
}

try {
Expand Down
12 changes: 3 additions & 9 deletions packages/libp2p/src/identify/identify.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { setMaxListeners } from 'events'
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import { peerIdFromKeys } from '@libp2p/peer-id'
import { RecordEnvelope, PeerRecord } from '@libp2p/peer-record'
Expand Down Expand Up @@ -187,10 +187,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {

const signal = AbortSignal.timeout(this.timeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch {}
setMaxListeners(Infinity, signal)

try {
stream = await connection.newStream([this.identifyPushProtocolStr], {
Expand Down Expand Up @@ -345,10 +342,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {

const signal = AbortSignal.timeout(this.timeout)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch {}
setMaxListeners(Infinity, signal)

try {
const publicKey = this.peerId.publicKey ?? new Uint8Array(0)
Expand Down
9 changes: 3 additions & 6 deletions packages/libp2p/src/libp2p.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { setMaxListeners } from 'events'
import { unmarshalPublicKey } from '@libp2p/crypto/keys'
import { type ContentRouting, contentRouting } from '@libp2p/interface/content-routing'
import { CodeError } from '@libp2p/interface/errors'
import { TypedEventEmitter, CustomEvent } from '@libp2p/interface/events'
import { TypedEventEmitter, CustomEvent, setMaxListeners } from '@libp2p/interface/events'
import { peerDiscovery } from '@libp2p/interface/peer-discovery'
import { type PeerRouting, peerRouting } from '@libp2p/interface/peer-routing'
import { DefaultKeyChain } from '@libp2p/keychain'
Expand Down Expand Up @@ -70,10 +69,8 @@ export class Libp2pNode<T extends ServiceMap = Record<string, unknown>> extends
return internalResult || externalResult
}

try {
// This emitter gets listened to a lot
setMaxListeners?.(Infinity, events)
} catch {}
// This emitter gets listened to a lot
setMaxListeners(Infinity, events)

this.#started = false
this.peerId = init.peerId
Expand Down
12 changes: 3 additions & 9 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { setMaxListeners } from 'events'
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger } from '@libp2p/logger'
import * as mss from '@libp2p/multistream-select'
import { peerIdFromString } from '@libp2p/peer-id'
Expand Down Expand Up @@ -170,10 +170,7 @@ export class DefaultUpgrader implements Upgrader {

signal.addEventListener('abort', onAbort, { once: true })

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, signal)
} catch { }
setMaxListeners(Infinity, signal)

try {
if ((await this.components.connectionGater.denyInboundConnection?.(maConn)) === true) {
Expand Down Expand Up @@ -444,10 +441,7 @@ export class DefaultUpgrader implements Upgrader {

options.signal = AbortSignal.timeout(30000)

try {
// fails on node < 15.4
setMaxListeners?.(Infinity, options.signal)
} catch { }
setMaxListeners(Infinity, options.signal)
}

const { stream, protocol } = await mss.select(muxedStream, protocols, options)
Expand Down

0 comments on commit 50442d7

Please sign in to comment.