diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index d50e335d69..5b1463e4c2 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -99,6 +99,11 @@ export interface IdentifyResult { * If sent by the remote peer this is the deserialized signed peer record */ signedPeerRecord?: SignedPeerRecord + + /** + * The connection that the identify protocol ran over + */ + connection: Connection } /** diff --git a/packages/interface/src/topology/index.ts b/packages/interface/src/topology/index.ts index 0772df3b38..25c321201b 100644 --- a/packages/interface/src/topology/index.ts +++ b/packages/interface/src/topology/index.ts @@ -5,6 +5,21 @@ export interface Topology { min?: number max?: number + /** + * If true, invoke `onConnect` for this topology on transient (e.g. short-lived + * and/or data-limited) connections. (default: false) + */ + notifyOnTransient?: boolean + + /** + * Invoked when a new connection is opened to a peer that supports the + * registered protocol + */ onConnect?(peerId: PeerId, conn: Connection): void + + /** + * Invoked when the last connection to a peer that supports the registered + * protocol closes + */ onDisconnect?(peerId: PeerId): void } diff --git a/packages/libp2p/src/circuit-relay/transport/discovery.ts b/packages/libp2p/src/circuit-relay/transport/discovery.ts index 77a4af1208..80bfa1f4d2 100644 --- a/packages/libp2p/src/circuit-relay/transport/discovery.ts +++ b/packages/libp2p/src/circuit-relay/transport/discovery.ts @@ -57,6 +57,7 @@ export class RelayDiscovery extends TypedEventEmitter impl // register a topology listener for when new peers are encountered // that support the hop protocol this.topologyId = await this.registrar.register(RELAY_V2_HOP_CODEC, { + notifyOnTransient: true, onConnect: (peerId) => { this.safeDispatchEvent('relay:discover', { detail: peerId }) } diff --git a/packages/libp2p/src/dcutr/dcutr.ts b/packages/libp2p/src/dcutr/dcutr.ts index e6852b6d03..39aeecd7ae 100644 --- a/packages/libp2p/src/dcutr/dcutr.ts +++ b/packages/libp2p/src/dcutr/dcutr.ts @@ -72,6 +72,7 @@ export class DefaultDCUtRService implements Startable { // register for notifications of when peers that support DCUtR connect // nb. requires the identify service to be enabled this.topologyId = await this.registrar.register(multicodec, { + notifyOnTransient: true, onConnect: (peerId, connection) => { if (!connection.transient) { // the connection is already direct, no upgrade is required diff --git a/packages/libp2p/src/identify/identify.ts b/packages/libp2p/src/identify/identify.ts index 940533ef50..7735122162 100644 --- a/packages/libp2p/src/identify/identify.ts +++ b/packages/libp2p/src/identify/identify.ts @@ -318,22 +318,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService { this.addressManager.addObservedAddr(cleanObservedAddr) } - const signedPeerRecord = await this.#consumeIdentifyMessage(connection.remotePeer, message) - - const result: IdentifyResult = { - peerId: id, - protocolVersion: message.protocolVersion, - agentVersion: message.agentVersion, - publicKey: message.publicKey, - listenAddrs: message.listenAddrs.map(buf => multiaddr(buf)), - observedAddr: message.observedAddr == null ? undefined : multiaddr(message.observedAddr), - protocols: message.protocols, - signedPeerRecord - } - - this.events.safeDispatchEvent('peer:identify', { detail: result }) - - return result + return this.#consumeIdentifyMessage(connection, message) } /** @@ -411,7 +396,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService { const message = await pb.read(options) await stream.close(options) - await this.#consumeIdentifyMessage(connection.remotePeer, message) + await this.#consumeIdentifyMessage(connection, message) } catch (err: any) { log.error('received invalid message', err) stream.abort(err) @@ -421,8 +406,8 @@ export class DefaultIdentifyService implements Startable, IdentifyService { log('handled push from %p', connection.remotePeer) } - async #consumeIdentifyMessage (remotePeer: PeerId, message: Identify): Promise { - log('received identify from %p', remotePeer) + async #consumeIdentifyMessage (connection: Connection, message: Identify): Promise { + log('received identify from %p', connection.remotePeer) if (message == null) { throw new Error('Message was null or undefined') @@ -442,7 +427,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService { // if the peer record has been sent, prefer the addresses in the record as they are signed by the remote peer if (message.signedPeerRecord != null) { - log('received signedPeerRecord in push from %p', remotePeer) + log('received signedPeerRecord in push from %p', connection.remotePeer) let peerRecordEnvelope = message.signedPeerRecord const envelope = await RecordEnvelope.openAndCertify(peerRecordEnvelope, PeerRecord.DOMAIN) @@ -454,7 +439,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService { } // Make sure remote peer is the one sending the record - if (!remotePeer.equals(peerRecord.peerId)) { + if (!connection.remotePeer.equals(peerRecord.peerId)) { throw new Error('signing key does not match remote PeerId') } @@ -500,7 +485,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService { addresses: peerRecord.multiaddrs } } else { - log('%p did not send a signed peer record', remotePeer) + log('%p did not send a signed peer record', connection.remotePeer) } if (message.agentVersion != null) { @@ -511,9 +496,23 @@ export class DefaultIdentifyService implements Startable, IdentifyService { peer.metadata.set('ProtocolVersion', uint8ArrayFromString(message.protocolVersion)) } - await this.peerStore.patch(remotePeer, peer) + await this.peerStore.patch(connection.remotePeer, peer) - return output + const result: IdentifyResult = { + peerId: connection.remotePeer, + protocolVersion: message.protocolVersion, + agentVersion: message.agentVersion, + publicKey: message.publicKey, + listenAddrs: message.listenAddrs.map(buf => multiaddr(buf)), + observedAddr: message.observedAddr == null ? undefined : multiaddr(message.observedAddr), + protocols: message.protocols, + signedPeerRecord: output, + connection + } + + this.events.safeDispatchEvent('peer:identify', { detail: result }) + + return result } } diff --git a/packages/libp2p/src/registrar.ts b/packages/libp2p/src/registrar.ts index 268a35c364..c2998583a2 100644 --- a/packages/libp2p/src/registrar.ts +++ b/packages/libp2p/src/registrar.ts @@ -2,7 +2,7 @@ import { CodeError } from '@libp2p/interface/errors' import { logger } from '@libp2p/logger' import merge from 'merge-options' import { codes } from './errors.js' -import type { Libp2pEvents, PeerUpdate } from '@libp2p/interface' +import type { IdentifyResult, Libp2pEvents, PeerUpdate } from '@libp2p/interface' import type { TypedEventTarget } from '@libp2p/interface/events' import type { PeerId } from '@libp2p/interface/peer-id' import type { PeerStore } from '@libp2p/interface/peer-store' @@ -37,11 +37,11 @@ export class DefaultRegistrar implements Registrar { this._onDisconnect = this._onDisconnect.bind(this) this._onPeerUpdate = this._onPeerUpdate.bind(this) - this._onConnect = this._onConnect.bind(this) + this._onPeerIdentify = this._onPeerIdentify.bind(this) this.components.events.addEventListener('peer:disconnect', this._onDisconnect) - this.components.events.addEventListener('peer:connect', this._onConnect) this.components.events.addEventListener('peer:update', this._onPeerUpdate) + this.components.events.addEventListener('peer:identify', this._onPeerIdentify) } getProtocols (): string[] { @@ -183,52 +183,12 @@ export class DefaultRegistrar implements Registrar { } /** - * On peer connected if we already have their protocols. Usually used for reconnects - * as change:protocols event won't be emitted due to identical protocols. - */ - _onConnect (evt: CustomEvent): void { - const remotePeer = evt.detail - - void this.components.peerStore.get(remotePeer) - .then(peer => { - const connection = this.components.connectionManager.getConnections(peer.id)[0] - - if (connection == null) { - log('peer %p connected but the connection manager did not have a connection', peer) - // peer disconnected while we were loading their details from the peer store - return - } - - for (const protocol of peer.protocols) { - const topologies = this.topologies.get(protocol) - - if (topologies == null) { - // no topologies are interested in this protocol - continue - } - - for (const topology of topologies.values()) { - topology.onConnect?.(remotePeer, connection) - } - } - }) - .catch(err => { - if (err.code === codes.ERR_NOT_FOUND) { - // peer has not completed identify so they are not in the peer store - return - } - - log.error('could not inform topologies of connecting peer %p', remotePeer, err) - }) - } - - /** - * Check if a new peer support the multicodecs for this topology + * When a peer is updated, if they have removed supported protocols notify any + * topologies interested in the removed protocols. */ _onPeerUpdate (evt: CustomEvent): void { const { peer, previous } = evt.detail const removed = (previous?.protocols ?? []).filter(protocol => !peer.protocols.includes(protocol)) - const added = peer.protocols.filter(protocol => !(previous?.protocols ?? []).includes(protocol)) for (const protocol of removed) { const topologies = this.topologies.get(protocol) @@ -242,8 +202,18 @@ export class DefaultRegistrar implements Registrar { topology.onDisconnect?.(peer.id) } } + } - for (const protocol of added) { + /** + * After identify has completed and we have received the list of supported + * protocols, notify any topologies interested in those protocols. + */ + _onPeerIdentify (evt: CustomEvent): void { + const protocols = evt.detail.protocols + const connection = evt.detail.connection + const peerId = evt.detail.peerId + + for (const protocol of protocols) { const topologies = this.topologies.get(protocol) if (topologies == null) { @@ -252,12 +222,11 @@ export class DefaultRegistrar implements Registrar { } for (const topology of topologies.values()) { - const connection = this.components.connectionManager.getConnections(peer.id)[0] - - if (connection == null) { + if (connection.transient && topology.notifyOnTransient !== true) { continue } - topology.onConnect?.(peer.id, connection) + + topology.onConnect?.(peerId, connection) } } } diff --git a/packages/libp2p/test/identify/service.node.ts b/packages/libp2p/test/identify/service.node.ts index e7ce86b685..bae39bc2d6 100644 --- a/packages/libp2p/test/identify/service.node.ts +++ b/packages/libp2p/test/identify/service.node.ts @@ -65,13 +65,15 @@ describe('identify', () => { expect(connection).to.exist() // wait for identify to run on the new connection - await eventPromise + const identifyResult = await eventPromise + + // should have run on the new connection + expect(identifyResult).to.have.nested.property('detail.connection', connection) // assert we have received certified announce addresses - const peer = await libp2p.peerStore.get(remoteLibp2p.peerId) - expect(peer.addresses).to.have.lengthOf(1) - expect(peer.addresses[0].isCertified).to.be.true('did not receive certified address via identify') - expect(peer.addresses[0].multiaddr.toString()).to.startWith('/dns4/localhost/', 'did not receive announce address via identify') + expect(identifyResult).to.have.deep.nested.property('detail.signedPeerRecord.addresses', [ + multiaddr(`/dns4/localhost/tcp/${REMOTE_PORT}`) + ], 'did not receive announce address via identify') }) it('should run identify automatically for inbound connections', async () => { @@ -88,14 +90,16 @@ describe('identify', () => { const connection = await remoteLibp2p.dial(multiaddr(`/ip4/127.0.0.1/tcp/${LOCAL_PORT}/p2p/${libp2p.peerId.toString()}`)) expect(connection).to.exist() - // wait for identify to run on the new connection - await eventPromise + // wait for identify to run + const identifyResult = await eventPromise + + // should have run on the new connection + expect(identifyResult).to.have.nested.property('detail.connection', connection) // assert we have received certified announce addresses - const peer = await libp2p.peerStore.get(remoteLibp2p.peerId) - expect(peer.addresses).to.have.lengthOf(1) - expect(peer.addresses[0].isCertified).to.be.true('did not receive certified address via identify') - expect(peer.addresses[0].multiaddr.toString()).to.startWith('/dns4/localhost/', 'did not receive announce address via identify') + expect(identifyResult).to.have.deep.nested.property('detail.signedPeerRecord.addresses', [ + multiaddr(`/dns4/localhost/tcp/${LOCAL_PORT}`) + ], 'did not receive announce address via identify') }) it('should identify connection on dial and get proper announce addresses', async () => { diff --git a/packages/libp2p/test/registrar/registrar.spec.ts b/packages/libp2p/test/registrar/registrar.spec.ts index 138b45b30e..302e773121 100644 --- a/packages/libp2p/test/registrar/registrar.spec.ts +++ b/packages/libp2p/test/registrar/registrar.spec.ts @@ -1,7 +1,6 @@ /* eslint-env mocha */ import { yamux } from '@chainsafe/libp2p-yamux' -import { CodeError } from '@libp2p/interface/errors' import { TypedEventEmitter, type TypedEventTarget } from '@libp2p/interface/events' import { mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-compliance-tests/mocks' import { mplex } from '@libp2p/mplex' @@ -14,7 +13,6 @@ import pDefer from 'p-defer' import { type StubbedInstance, stubInterface } from 'sinon-ts' import { type Components, defaultComponents } from '../../src/components.js' import { DefaultConnectionManager } from '../../src/connection-manager/index.js' -import { codes } from '../../src/errors.js' import { plaintext } from '../../src/insecure/index.js' import { createLibp2pNode, type Libp2pNode } from '../../src/libp2p.js' import { DefaultRegistrar } from '../../src/registrar.js' @@ -141,7 +139,7 @@ describe('registrar', () => { const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) // return connection from connection manager - connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) + connectionManager.getConnections.withArgs(remotePeerId).returns([conn]) const topology: Topology = { onConnect: (peerId, connection) => { @@ -170,8 +168,12 @@ describe('registrar', () => { }) // remote peer connects - events.safeDispatchEvent('peer:connect', { - detail: remotePeerId + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: conn + } }) await onConnectDefer.promise @@ -206,12 +208,13 @@ describe('registrar', () => { // Register protocol await registrar.register(protocol, topology) - // No details before identify - peerStore.get.withArgs(matchPeerId(conn.remotePeer)).rejects(new CodeError('Not found', codes.ERR_NOT_FOUND)) - // remote peer connects - events.safeDispatchEvent('peer:connect', { - detail: remotePeerId + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: conn + } }) // Can get details after identify @@ -261,6 +264,140 @@ describe('registrar', () => { await onDisconnectDefer.promise }) + it('should not call topology handlers for transient connection', async () => { + const onConnectDefer = pDefer() + const onDisconnectDefer = pDefer() + + // Setup connections before registrar + const remotePeerId = await createEd25519PeerId() + const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + + // connection is transient + conn.transient = true + + // return connection from connection manager + connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) + + const topology: Topology = { + onConnect: () => { + onConnectDefer.reject(new Error('Topolgy onConnect called for transient connection')) + }, + onDisconnect: () => { + onDisconnectDefer.reject(new Error('Topolgy onDisconnect called for transient connection')) + } + } + + // Register topology for protocol + await registrar.register(protocol, topology) + + // remote peer connects + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: conn + } + }) + + await expect(Promise.any([ + onConnectDefer.promise, + onDisconnectDefer.promise, + new Promise((resolve) => { + setTimeout(() => { + resolve() + }, 1000) + }) + ])).to.eventually.not.be.rejected() + }) + + it('should call topology onConnect handler for transient connection when explicitly requested', async () => { + const onConnectDefer = pDefer() + + // Setup connections before registrar + const remotePeerId = await createEd25519PeerId() + const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + + // connection is transient + conn.transient = true + + // return connection from connection manager + connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) + + const topology: Topology = { + notifyOnTransient: true, + onConnect: () => { + onConnectDefer.resolve() + } + } + + // Register topology for protocol + await registrar.register(protocol, topology) + + // remote peer connects + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: conn + } + }) + + await expect(onConnectDefer.promise).to.eventually.be.undefined() + }) + + it('should call topology handlers for non-transient connection opened after transient connection', async () => { + const onConnectDefer = pDefer() + let callCount = 0 + + const topology: Topology = { + notifyOnTransient: true, + onConnect: () => { + callCount++ + + if (callCount === 2) { + onConnectDefer.resolve() + } + } + } + + // Register topology for protocol + await registrar.register(protocol, topology) + + // Setup connections before registrar + const remotePeerId = await createEd25519PeerId() + const transientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + transientConnection.transient = true + + const nonTransientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + nonTransientConnection.transient = false + + // return connection from connection manager + connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([ + transientConnection, + nonTransientConnection + ]) + + // remote peer connects over transient connection + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: transientConnection + } + }) + + // remote peer opens non-transient connection + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: nonTransientConnection + } + }) + + await expect(onConnectDefer.promise).to.eventually.be.undefined() + }) + it('should be able to register and unregister a handler', async () => { const deferred = pDefer() diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index af8fa223de..ba467f6e88 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -213,6 +213,11 @@ export abstract class PubSubBaseProtocol = Pu protected _onPeerConnected (peerId: PeerId, conn: Connection): void { log('connected %p', peerId) + // if this connection is already in use for pubsub, ignore it + if (conn.streams.find(stream => stream.protocol != null && this.multicodecs.includes(stream.protocol)) != null) { + return + } + void Promise.resolve().then(async () => { try { const stream = await conn.newStream(this.multicodecs) diff --git a/packages/pubsub/test/utils/index.ts b/packages/pubsub/test/utils/index.ts index b98c78b76f..fdd7e5b535 100644 --- a/packages/pubsub/test/utils/index.ts +++ b/packages/pubsub/test/utils/index.ts @@ -136,7 +136,8 @@ export const ConnectionPair = (): [Connection, Connection] => { ...d0, protocol: protocol[0], closeWrite: async () => {} - }) + }), + streams: [] }, { // @ts-expect-error incomplete implementation @@ -144,7 +145,8 @@ export const ConnectionPair = (): [Connection, Connection] => { ...d1, protocol: protocol[0], closeWrite: async () => {} - }) + }), + streams: [] } ] }