diff --git a/packages/libp2p/src/connection-monitor.ts b/packages/libp2p/src/connection-monitor.ts index d557a1f2f1..d756ad6a3f 100644 --- a/packages/libp2p/src/connection-monitor.ts +++ b/packages/libp2p/src/connection-monitor.ts @@ -1,5 +1,5 @@ import { randomBytes } from '@libp2p/crypto' -import { serviceCapabilities } from '@libp2p/interface' +import { serviceCapabilities, setMaxListeners } from '@libp2p/interface' import { AdaptiveTimeout } from '@libp2p/utils/adaptive-timeout' import { byteStream } from 'it-byte-stream' import type { ComponentLogger, Logger, Metrics, Startable } from '@libp2p/interface' @@ -11,6 +11,7 @@ const PROTOCOL_VERSION = '1.0.0' const PROTOCOL_NAME = 'ping' const PROTOCOL_PREFIX = 'ipfs' const PING_LENGTH = 32 +const DEFAULT_ABORT_CONNECTION_ON_PING_FAILURE = true export interface ConnectionMonitorInit { /** @@ -65,6 +66,7 @@ export class ConnectionMonitor implements Startable { private readonly pingIntervalMs: number private abortController?: AbortController private readonly timeout: AdaptiveTimeout + private readonly abortConnectionOnPingFailure: boolean constructor (components: ConnectionMonitorComponents, init: ConnectionMonitorInit = {}) { this.components = components @@ -72,7 +74,7 @@ export class ConnectionMonitor implements Startable { this.log = components.logger.forComponent('libp2p:connection-monitor') this.pingIntervalMs = init.pingInterval ?? DEFAULT_PING_INTERVAL_MS - + this.abortConnectionOnPingFailure = init.abortConnectionOnPingFailure ?? DEFAULT_ABORT_CONNECTION_ON_PING_FAILURE this.timeout = new AdaptiveTimeout({ ...(init.pingTimeout ?? {}), metrics: components.metrics, @@ -88,6 +90,7 @@ export class ConnectionMonitor implements Startable { start (): void { this.abortController = new AbortController() + setMaxListeners(Infinity, this.abortController.signal) this.heartbeatInterval = setInterval(() => { this.components.connectionManager.getConnections().forEach(conn => { @@ -131,8 +134,14 @@ export class ConnectionMonitor implements Startable { } }) .catch(err => { - this.log.error('error during heartbeat, aborting connection', err) - conn.abort(err) + this.log.error('error during heartbeat', err) + + if (this.abortConnectionOnPingFailure) { + this.log.error('aborting connection due to ping failure') + conn.abort(err) + } else { + this.log('connection ping failed, but not aborting due to abortConnectionOnPingFailure flag') + } }) }) }, this.pingIntervalMs) diff --git a/packages/libp2p/test/connection-monitor/index.spec.ts b/packages/libp2p/test/connection-monitor/index.spec.ts index 808c137c7c..ce798cf078 100644 --- a/packages/libp2p/test/connection-monitor/index.spec.ts +++ b/packages/libp2p/test/connection-monitor/index.spec.ts @@ -129,6 +129,46 @@ describe('connection monitor', () => { components.connectionManager.getConnections.returns([connection]) + await delay(500) + + expect(connection.abort).to.have.property('called', true) + }) + + it('should not abort a connection that fails when abortConnectionOnPingFailure is false', async () => { + monitor = new ConnectionMonitor(components, { + pingInterval: 10, + abortConnectionOnPingFailure: false + }) + + await start(monitor) + + const connection = stubInterface() + connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => { + throw new ConnectionClosedError('Connection closed') + }) + + components.connectionManager.getConnections.returns([connection]) + + await delay(500) + + expect(connection.abort).to.have.property('called', false) + }) + + it('should abort a connection that fails when abortConnectionOnPingFailure is true', async () => { + monitor = new ConnectionMonitor(components, { + pingInterval: 10, + abortConnectionOnPingFailure: true + }) + + await start(monitor) + + const connection = stubInterface() + connection.newStream.withArgs('/ipfs/ping/1.0.0').callsFake(async (protocols, opts) => { + throw new ConnectionClosedError('Connection closed') + }) + + components.connectionManager.getConnections.returns([connection]) + await delay(100) expect(connection.abort).to.have.property('called', true)