From 0cbd823c887997a31d24d42697df54ab8637bb26 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Wed, 13 Sep 2023 18:13:16 +0100 Subject: [PATCH 1/7] fix(libp2p): opt-in to toplogy notifications on transient connections Adds a `notifyOnTransient` option when registering a network topology to opt-in to being notified when peers that support the registered protocol connect over transient connections. False by default. The logic has been switched to notify after identify has completed rather than after the first connection for a peer has been opened. The side effect here is that if `notifyOnTransient` is true, and the peer ends up opening a direct connection (for example they dial us via circuit relay, open a stream to do the WebRTC SDP exchange, then open a WebRTC connection), identify will run on the second connection so the topology will receive two notificiations. This is not a breaking change since the previous behaviour would have been to only notify on the transient connection, which is probably a bug as you can't do data-heavy things like bitswap over transient connections so is undesirable. Fixes #2036 --- packages/interface/src/index.ts | 5 + packages/interface/src/topology/index.ts | 15 +++ .../src/circuit-relay/transport/discovery.ts | 1 + packages/libp2p/src/dcutr/dcutr.ts | 1 + packages/libp2p/src/identify/identify.ts | 3 +- packages/libp2p/src/registrar.ts | 53 ++++----- .../libp2p/test/registrar/registrar.spec.ts | 105 ++++++++++++++++-- 7 files changed, 136 insertions(+), 47 deletions(-) diff --git a/packages/interface/src/index.ts b/packages/interface/src/index.ts index 198493a297..a943713f1b 100644 --- a/packages/interface/src/index.ts +++ b/packages/interface/src/index.ts @@ -99,6 +99,11 @@ export interface IdentifyResult { * If sent by the remote peer this is the deserialized signed peer record */ signedPeerRecord?: SignedPeerRecord + + /** + * The connection that the identify protocol ran over + */ + connection: Connection } /** diff --git a/packages/interface/src/topology/index.ts b/packages/interface/src/topology/index.ts index 697c598494..c34ff13953 100644 --- a/packages/interface/src/topology/index.ts +++ b/packages/interface/src/topology/index.ts @@ -5,6 +5,21 @@ export interface Topology { min?: number max?: number + /** + * If true, invoke `onConnect` for this topology on transient (e.g. short-lived + * and/or data-limited) connections. (default: false) + */ + notifyOnTransient?: boolean + + /** + * Invoked when a new connection is opened to a peer that supports the + * registered protocol + */ onConnect?: (peerId: PeerId, conn: Connection) => void + + /** + * Invoked when the last connection to a peer that supports the registered + * protocol closes + */ onDisconnect?: (peerId: PeerId) => void } diff --git a/packages/libp2p/src/circuit-relay/transport/discovery.ts b/packages/libp2p/src/circuit-relay/transport/discovery.ts index 58ddc80cd9..ea22352175 100644 --- a/packages/libp2p/src/circuit-relay/transport/discovery.ts +++ b/packages/libp2p/src/circuit-relay/transport/discovery.ts @@ -57,6 +57,7 @@ export class RelayDiscovery extends EventEmitter implement // register a topology listener for when new peers are encountered // that support the hop protocol this.topologyId = await this.registrar.register(RELAY_V2_HOP_CODEC, { + notifyOnTransient: true, onConnect: (peerId) => { this.safeDispatchEvent('relay:discover', { detail: peerId }) } diff --git a/packages/libp2p/src/dcutr/dcutr.ts b/packages/libp2p/src/dcutr/dcutr.ts index 190766bfa2..ab618ef80b 100644 --- a/packages/libp2p/src/dcutr/dcutr.ts +++ b/packages/libp2p/src/dcutr/dcutr.ts @@ -72,6 +72,7 @@ export class DefaultDCUtRService implements Startable { // register for notifications of when peers that support DCUtR connect // nb. requires the identify service to be enabled this.topologyId = await this.registrar.register(multicodec, { + notifyOnTransient: true, onConnect: (peerId, connection) => { if (!connection.transient) { // the connection is already direct, no upgrade is required diff --git a/packages/libp2p/src/identify/identify.ts b/packages/libp2p/src/identify/identify.ts index caa5248b45..0375317a26 100644 --- a/packages/libp2p/src/identify/identify.ts +++ b/packages/libp2p/src/identify/identify.ts @@ -328,7 +328,8 @@ export class DefaultIdentifyService implements Startable, IdentifyService { listenAddrs: message.listenAddrs.map(buf => multiaddr(buf)), observedAddr: message.observedAddr == null ? undefined : multiaddr(message.observedAddr), protocols: message.protocols, - signedPeerRecord + signedPeerRecord, + connection } this.events.safeDispatchEvent('peer:identify', { detail: result }) diff --git a/packages/libp2p/src/registrar.ts b/packages/libp2p/src/registrar.ts index c679888f4f..7521506ded 100644 --- a/packages/libp2p/src/registrar.ts +++ b/packages/libp2p/src/registrar.ts @@ -2,7 +2,7 @@ import { CodeError } from '@libp2p/interface/errors' import { logger } from '@libp2p/logger' import merge from 'merge-options' import { codes } from './errors.js' -import type { Libp2pEvents, PeerUpdate } from '@libp2p/interface' +import type { IdentifyResult, Libp2pEvents, PeerUpdate } from '@libp2p/interface' import type { EventEmitter } from '@libp2p/interface/events' import type { PeerId } from '@libp2p/interface/peer-id' import type { PeerStore } from '@libp2p/interface/peer-store' @@ -37,10 +37,10 @@ export class DefaultRegistrar implements Registrar { this._onDisconnect = this._onDisconnect.bind(this) this._onPeerUpdate = this._onPeerUpdate.bind(this) - this._onConnect = this._onConnect.bind(this) + this._onIdentify = this._onIdentify.bind(this) this.components.events.addEventListener('peer:disconnect', this._onDisconnect) - this.components.events.addEventListener('peer:connect', this._onConnect) + this.components.events.addEventListener('peer:identify', this._onIdentify) this.components.events.addEventListener('peer:update', this._onPeerUpdate) } @@ -183,43 +183,28 @@ export class DefaultRegistrar implements Registrar { } /** - * On peer connected if we already have their protocols. Usually used for reconnects - * as change:protocols event won't be emitted due to identical protocols. + * When identify runs over a new connection, notify any topologies interested + * in the protocols the peer supports */ - _onConnect (evt: CustomEvent): void { - const remotePeer = evt.detail - - void this.components.peerStore.get(remotePeer) - .then(peer => { - const connection = this.components.connectionManager.getConnections(peer.id)[0] - - if (connection == null) { - log('peer %p connected but the connection manager did not have a connection', peer) - // peer disconnected while we were loading their details from the peer store - return - } + _onIdentify (evt: CustomEvent): void { + const result = evt.detail - for (const protocol of peer.protocols) { - const topologies = this.topologies.get(protocol) + for (const protocol of result.protocols) { + const topologies = this.topologies.get(protocol) - if (topologies == null) { - // no topologies are interested in this protocol - continue - } + if (topologies == null) { + // no topologies are interested in this protocol + continue + } - for (const topology of topologies.values()) { - topology.onConnect?.(remotePeer, connection) - } - } - }) - .catch(err => { - if (err.code === codes.ERR_NOT_FOUND) { - // peer has not completed identify so they are not in the peer store - return + for (const topology of topologies.values()) { + if (result.connection.transient && topology.notifyOnTransient !== true) { + continue } - log.error('could not inform topologies of connecting peer %p', remotePeer, err) - }) + topology.onConnect?.(result.peerId, result.connection) + } + } } /** diff --git a/packages/libp2p/test/registrar/registrar.spec.ts b/packages/libp2p/test/registrar/registrar.spec.ts index 0e5b585acf..3c009765da 100644 --- a/packages/libp2p/test/registrar/registrar.spec.ts +++ b/packages/libp2p/test/registrar/registrar.spec.ts @@ -1,7 +1,6 @@ /* eslint-env mocha */ import { yamux } from '@chainsafe/libp2p-yamux' -import { CodeError } from '@libp2p/interface/errors' import { EventEmitter } from '@libp2p/interface/events' import { mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-compliance-tests/mocks' import { mplex } from '@libp2p/mplex' @@ -14,7 +13,6 @@ import pDefer from 'p-defer' import { type StubbedInstance, stubInterface } from 'sinon-ts' import { type Components, defaultComponents } from '../../src/components.js' import { DefaultConnectionManager } from '../../src/connection-manager/index.js' -import { codes } from '../../src/errors.js' import { plaintext } from '../../src/insecure/index.js' import { createLibp2pNode, type Libp2pNode } from '../../src/libp2p.js' import { DefaultRegistrar } from '../../src/registrar.js' @@ -140,9 +138,6 @@ describe('registrar', () => { const remotePeerId = await createEd25519PeerId() const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) - // return connection from connection manager - connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) - const topology: Topology = { onConnect: (peerId, connection) => { expect(peerId.equals(remotePeerId)).to.be.true() @@ -170,8 +165,12 @@ describe('registrar', () => { }) // remote peer connects - events.safeDispatchEvent('peer:connect', { - detail: remotePeerId + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: conn + } }) await onConnectDefer.promise @@ -206,12 +205,13 @@ describe('registrar', () => { // Register protocol await registrar.register(protocol, topology) - // No details before identify - peerStore.get.withArgs(matchPeerId(conn.remotePeer)).rejects(new CodeError('Not found', codes.ERR_NOT_FOUND)) - // remote peer connects - events.safeDispatchEvent('peer:connect', { - detail: remotePeerId + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: conn + } }) // Can get details after identify @@ -261,6 +261,87 @@ describe('registrar', () => { await onDisconnectDefer.promise }) + it('should not call topology handlers for transient connection', async () => { + const onConnectDefer = pDefer() + const onDisconnectDefer = pDefer() + + // Setup connections before registrar + const remotePeerId = await createEd25519PeerId() + const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + + // connection is transient + conn.transient = true + + // return connection from connection manager + connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) + + const topology: Topology = { + onConnect: () => { + onConnectDefer.reject(new Error('Topolgy onConnect called for transient connection')) + }, + onDisconnect: () => { + onDisconnectDefer.reject(new Error('Topolgy onDisconnect called for transient connection')) + } + } + + // Register topology for protocol + await registrar.register(protocol, topology) + + // remote peer connects + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: conn + } + }) + + await expect(Promise.any([ + onConnectDefer.promise, + onDisconnectDefer.promise, + new Promise((resolve) => { + setTimeout(() => { + resolve() + }, 1000) + }) + ])).to.eventually.not.be.rejected() + }) + + it('should call topology onConnect handler for transient connection when explicitly requested', async () => { + const onConnectDefer = pDefer() + + // Setup connections before registrar + const remotePeerId = await createEd25519PeerId() + const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + + // connection is transient + conn.transient = true + + // return connection from connection manager + connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn]) + + const topology: Topology = { + notifyOnTransient: true, + onConnect: () => { + onConnectDefer.resolve() + } + } + + // Register topology for protocol + await registrar.register(protocol, topology) + + // remote peer connects + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: conn + } + }) + + await expect(onConnectDefer.promise).to.eventually.be.undefined() + }) + it('should be able to register and unregister a handler', async () => { const deferred = pDefer() From 2322dce37fa96d28ab21482bf5b65dc1ea2525b8 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 15 Sep 2023 15:40:32 +0100 Subject: [PATCH 2/7] chore: fix tests --- packages/libp2p/src/identify/identify.ts | 48 +++++++++---------- packages/libp2p/src/registrar.ts | 42 ++++------------ packages/libp2p/test/identify/service.node.ts | 26 +++++----- .../libp2p/test/registrar/registrar.spec.ts | 39 ++++++++------- 4 files changed, 69 insertions(+), 86 deletions(-) diff --git a/packages/libp2p/src/identify/identify.ts b/packages/libp2p/src/identify/identify.ts index 0375317a26..93db06b1bd 100644 --- a/packages/libp2p/src/identify/identify.ts +++ b/packages/libp2p/src/identify/identify.ts @@ -318,23 +318,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService { this.addressManager.addObservedAddr(cleanObservedAddr) } - const signedPeerRecord = await this.#consumeIdentifyMessage(connection.remotePeer, message) - - const result: IdentifyResult = { - peerId: id, - protocolVersion: message.protocolVersion, - agentVersion: message.agentVersion, - publicKey: message.publicKey, - listenAddrs: message.listenAddrs.map(buf => multiaddr(buf)), - observedAddr: message.observedAddr == null ? undefined : multiaddr(message.observedAddr), - protocols: message.protocols, - signedPeerRecord, - connection - } - - this.events.safeDispatchEvent('peer:identify', { detail: result }) - - return result + return this.#consumeIdentifyMessage(connection, message) } /** @@ -412,7 +396,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService { const message = await pb.read(options) await stream.close(options) - await this.#consumeIdentifyMessage(connection.remotePeer, message) + await this.#consumeIdentifyMessage(connection, message) } catch (err: any) { log.error('received invalid message', err) stream.abort(err) @@ -422,8 +406,8 @@ export class DefaultIdentifyService implements Startable, IdentifyService { log('handled push from %p', connection.remotePeer) } - async #consumeIdentifyMessage (remotePeer: PeerId, message: Identify): Promise { - log('received identify from %p', remotePeer) + async #consumeIdentifyMessage (connection: Connection, message: Identify): Promise { + log('received identify from %p', connection.remotePeer) if (message == null) { throw new Error('Message was null or undefined') @@ -443,7 +427,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService { // if the peer record has been sent, prefer the addresses in the record as they are signed by the remote peer if (message.signedPeerRecord != null) { - log('received signedPeerRecord in push from %p', remotePeer) + log('received signedPeerRecord in push from %p', connection.remotePeer) let peerRecordEnvelope = message.signedPeerRecord const envelope = await RecordEnvelope.openAndCertify(peerRecordEnvelope, PeerRecord.DOMAIN) @@ -455,7 +439,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService { } // Make sure remote peer is the one sending the record - if (!remotePeer.equals(peerRecord.peerId)) { + if (!connection.remotePeer.equals(peerRecord.peerId)) { throw new Error('signing key does not match remote PeerId') } @@ -501,7 +485,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService { addresses: peerRecord.multiaddrs } } else { - log('%p did not send a signed peer record', remotePeer) + log('%p did not send a signed peer record', connection.remotePeer) } if (message.agentVersion != null) { @@ -512,9 +496,23 @@ export class DefaultIdentifyService implements Startable, IdentifyService { peer.metadata.set('ProtocolVersion', uint8ArrayFromString(message.protocolVersion)) } - await this.peerStore.patch(remotePeer, peer) + await this.peerStore.patch(connection.remotePeer, peer) - return output + const result: IdentifyResult = { + peerId: connection.remotePeer, + protocolVersion: message.protocolVersion, + agentVersion: message.agentVersion, + publicKey: message.publicKey, + listenAddrs: message.listenAddrs.map(buf => multiaddr(buf)), + observedAddr: message.observedAddr == null ? undefined : multiaddr(message.observedAddr), + protocols: message.protocols, + signedPeerRecord: output, + connection + } + + this.events.safeDispatchEvent('peer:identify', { detail: result }) + + return result } } diff --git a/packages/libp2p/src/registrar.ts b/packages/libp2p/src/registrar.ts index 7521506ded..3b6b3aff4f 100644 --- a/packages/libp2p/src/registrar.ts +++ b/packages/libp2p/src/registrar.ts @@ -2,7 +2,7 @@ import { CodeError } from '@libp2p/interface/errors' import { logger } from '@libp2p/logger' import merge from 'merge-options' import { codes } from './errors.js' -import type { IdentifyResult, Libp2pEvents, PeerUpdate } from '@libp2p/interface' +import type { Libp2pEvents, PeerUpdate } from '@libp2p/interface' import type { EventEmitter } from '@libp2p/interface/events' import type { PeerId } from '@libp2p/interface/peer-id' import type { PeerStore } from '@libp2p/interface/peer-store' @@ -37,10 +37,8 @@ export class DefaultRegistrar implements Registrar { this._onDisconnect = this._onDisconnect.bind(this) this._onPeerUpdate = this._onPeerUpdate.bind(this) - this._onIdentify = this._onIdentify.bind(this) this.components.events.addEventListener('peer:disconnect', this._onDisconnect) - this.components.events.addEventListener('peer:identify', this._onIdentify) this.components.events.addEventListener('peer:update', this._onPeerUpdate) } @@ -183,32 +181,7 @@ export class DefaultRegistrar implements Registrar { } /** - * When identify runs over a new connection, notify any topologies interested - * in the protocols the peer supports - */ - _onIdentify (evt: CustomEvent): void { - const result = evt.detail - - for (const protocol of result.protocols) { - const topologies = this.topologies.get(protocol) - - if (topologies == null) { - // no topologies are interested in this protocol - continue - } - - for (const topology of topologies.values()) { - if (result.connection.transient && topology.notifyOnTransient !== true) { - continue - } - - topology.onConnect?.(result.peerId, result.connection) - } - } - } - - /** - * Check if a new peer support the multicodecs for this topology + * Check if a new peer supports the multicodecs for this topology */ _onPeerUpdate (evt: CustomEvent): void { const { peer, previous } = evt.detail @@ -236,13 +209,14 @@ export class DefaultRegistrar implements Registrar { continue } - for (const topology of topologies.values()) { - const connection = this.components.connectionManager.getConnections(peer.id)[0] + for (const connection of this.components.connectionManager.getConnections(peer.id)) { + for (const topology of topologies.values()) { + if (connection.transient && topology.notifyOnTransient !== true) { + continue + } - if (connection == null) { - continue + topology.onConnect?.(peer.id, connection) } - topology.onConnect?.(peer.id, connection) } } } diff --git a/packages/libp2p/test/identify/service.node.ts b/packages/libp2p/test/identify/service.node.ts index e7ce86b685..bae39bc2d6 100644 --- a/packages/libp2p/test/identify/service.node.ts +++ b/packages/libp2p/test/identify/service.node.ts @@ -65,13 +65,15 @@ describe('identify', () => { expect(connection).to.exist() // wait for identify to run on the new connection - await eventPromise + const identifyResult = await eventPromise + + // should have run on the new connection + expect(identifyResult).to.have.nested.property('detail.connection', connection) // assert we have received certified announce addresses - const peer = await libp2p.peerStore.get(remoteLibp2p.peerId) - expect(peer.addresses).to.have.lengthOf(1) - expect(peer.addresses[0].isCertified).to.be.true('did not receive certified address via identify') - expect(peer.addresses[0].multiaddr.toString()).to.startWith('/dns4/localhost/', 'did not receive announce address via identify') + expect(identifyResult).to.have.deep.nested.property('detail.signedPeerRecord.addresses', [ + multiaddr(`/dns4/localhost/tcp/${REMOTE_PORT}`) + ], 'did not receive announce address via identify') }) it('should run identify automatically for inbound connections', async () => { @@ -88,14 +90,16 @@ describe('identify', () => { const connection = await remoteLibp2p.dial(multiaddr(`/ip4/127.0.0.1/tcp/${LOCAL_PORT}/p2p/${libp2p.peerId.toString()}`)) expect(connection).to.exist() - // wait for identify to run on the new connection - await eventPromise + // wait for identify to run + const identifyResult = await eventPromise + + // should have run on the new connection + expect(identifyResult).to.have.nested.property('detail.connection', connection) // assert we have received certified announce addresses - const peer = await libp2p.peerStore.get(remoteLibp2p.peerId) - expect(peer.addresses).to.have.lengthOf(1) - expect(peer.addresses[0].isCertified).to.be.true('did not receive certified address via identify') - expect(peer.addresses[0].multiaddr.toString()).to.startWith('/dns4/localhost/', 'did not receive announce address via identify') + expect(identifyResult).to.have.deep.nested.property('detail.signedPeerRecord.addresses', [ + multiaddr(`/dns4/localhost/tcp/${LOCAL_PORT}`) + ], 'did not receive announce address via identify') }) it('should identify connection on dial and get proper announce addresses', async () => { diff --git a/packages/libp2p/test/registrar/registrar.spec.ts b/packages/libp2p/test/registrar/registrar.spec.ts index 3c009765da..be13158719 100644 --- a/packages/libp2p/test/registrar/registrar.spec.ts +++ b/packages/libp2p/test/registrar/registrar.spec.ts @@ -138,6 +138,9 @@ describe('registrar', () => { const remotePeerId = await createEd25519PeerId() const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + // return connection from connection manager + connectionManager.getConnections.withArgs(remotePeerId).returns([conn]) + const topology: Topology = { onConnect: (peerId, connection) => { expect(peerId.equals(remotePeerId)).to.be.true() @@ -165,11 +168,12 @@ describe('registrar', () => { }) // remote peer connects - events.safeDispatchEvent('peer:identify', { + events.safeDispatchEvent('peer:update', { detail: { - peerId: remotePeerId, - protocols: [protocol], - connection: conn + peer: { + id: remotePeerId, + protocols: [protocol] + } } }) await onConnectDefer.promise @@ -206,11 +210,12 @@ describe('registrar', () => { await registrar.register(protocol, topology) // remote peer connects - events.safeDispatchEvent('peer:identify', { + events.safeDispatchEvent('peer:update', { detail: { - peerId: remotePeerId, - protocols: [protocol], - connection: conn + peer: { + id: remotePeerId, + protocols: [protocol] + } } }) @@ -288,11 +293,12 @@ describe('registrar', () => { await registrar.register(protocol, topology) // remote peer connects - events.safeDispatchEvent('peer:identify', { + events.safeDispatchEvent('peer:update', { detail: { - peerId: remotePeerId, - protocols: [protocol], - connection: conn + peer: { + id: remotePeerId, + protocols: [protocol] + } } }) @@ -331,11 +337,12 @@ describe('registrar', () => { await registrar.register(protocol, topology) // remote peer connects - events.safeDispatchEvent('peer:identify', { + events.safeDispatchEvent('peer:update', { detail: { - peerId: remotePeerId, - protocols: [protocol], - connection: conn + peer: { + id: remotePeerId, + protocols: [protocol] + } } }) From cfee9907a06dd371fdfdce925a47b15567cb6732 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 13 Oct 2023 19:31:26 +0300 Subject: [PATCH 3/7] chore: add test for two connections --- .../libp2p/test/registrar/registrar.spec.ts | 47 ++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/packages/libp2p/test/registrar/registrar.spec.ts b/packages/libp2p/test/registrar/registrar.spec.ts index be13158719..5820b70b79 100644 --- a/packages/libp2p/test/registrar/registrar.spec.ts +++ b/packages/libp2p/test/registrar/registrar.spec.ts @@ -29,7 +29,7 @@ import type { TransportManager } from '@libp2p/interface-internal/transport-mana const protocol = '/test/1.0.0' -describe('registrar', () => { +describe.only('registrar', () => { let components: Components let registrar: Registrar let peerId: PeerId @@ -349,6 +349,51 @@ describe('registrar', () => { await expect(onConnectDefer.promise).to.eventually.be.undefined() }) + it('should call topology handlers for non-transient connection opened after transient connection', async () => { + const onConnectDefer = pDefer() + let callCount = 0 + + const topology: Topology = { + notifyOnTransient: true, + onConnect: () => { + callCount++ + + if (callCount === 2) { + onConnectDefer.resolve() + } + } + } + + // Register topology for protocol + await registrar.register(protocol, topology) + + // Setup connections before registrar + const remotePeerId = await createEd25519PeerId() + const transientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + transientConnection.transient = true + + const nonTransientConnection = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId)) + nonTransientConnection.transient = false + + // return connection from connection manager + connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([ + transientConnection, + nonTransientConnection + ]) + + // remote peer connects + events.safeDispatchEvent('peer:update', { + detail: { + peer: { + id: remotePeerId, + protocols: [protocol] + } + } + }) + + await expect(onConnectDefer.promise).to.eventually.be.undefined() + }) + it('should be able to register and unregister a handler', async () => { const deferred = pDefer() From 539fdc2a12008f3bec85a0b1913519e912c33bfd Mon Sep 17 00:00:00 2001 From: achingbrain Date: Fri, 13 Oct 2023 19:45:14 +0300 Subject: [PATCH 4/7] chore: fix linting --- packages/libp2p/test/registrar/registrar.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/libp2p/test/registrar/registrar.spec.ts b/packages/libp2p/test/registrar/registrar.spec.ts index 5820b70b79..9a114bd03c 100644 --- a/packages/libp2p/test/registrar/registrar.spec.ts +++ b/packages/libp2p/test/registrar/registrar.spec.ts @@ -29,7 +29,7 @@ import type { TransportManager } from '@libp2p/interface-internal/transport-mana const protocol = '/test/1.0.0' -describe.only('registrar', () => { +describe('registrar', () => { let components: Components let registrar: Registrar let peerId: PeerId From b2f8d0dc439975b2505e8ed06ce5597ddb21add1 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 31 Oct 2023 12:09:04 +0000 Subject: [PATCH 5/7] fix: notify topology listeners after identify --- packages/libp2p/src/registrar.ts | 32 +++++++---- .../libp2p/test/registrar/registrar.spec.ts | 56 ++++++++++--------- 2 files changed, 51 insertions(+), 37 deletions(-) diff --git a/packages/libp2p/src/registrar.ts b/packages/libp2p/src/registrar.ts index e43e62da8a..c2998583a2 100644 --- a/packages/libp2p/src/registrar.ts +++ b/packages/libp2p/src/registrar.ts @@ -2,7 +2,7 @@ import { CodeError } from '@libp2p/interface/errors' import { logger } from '@libp2p/logger' import merge from 'merge-options' import { codes } from './errors.js' -import type { Libp2pEvents, PeerUpdate } from '@libp2p/interface' +import type { IdentifyResult, Libp2pEvents, PeerUpdate } from '@libp2p/interface' import type { TypedEventTarget } from '@libp2p/interface/events' import type { PeerId } from '@libp2p/interface/peer-id' import type { PeerStore } from '@libp2p/interface/peer-store' @@ -37,9 +37,11 @@ export class DefaultRegistrar implements Registrar { this._onDisconnect = this._onDisconnect.bind(this) this._onPeerUpdate = this._onPeerUpdate.bind(this) + this._onPeerIdentify = this._onPeerIdentify.bind(this) this.components.events.addEventListener('peer:disconnect', this._onDisconnect) this.components.events.addEventListener('peer:update', this._onPeerUpdate) + this.components.events.addEventListener('peer:identify', this._onPeerIdentify) } getProtocols (): string[] { @@ -181,12 +183,12 @@ export class DefaultRegistrar implements Registrar { } /** - * Check if a new peer supports the multicodecs for this topology + * When a peer is updated, if they have removed supported protocols notify any + * topologies interested in the removed protocols. */ _onPeerUpdate (evt: CustomEvent): void { const { peer, previous } = evt.detail const removed = (previous?.protocols ?? []).filter(protocol => !peer.protocols.includes(protocol)) - const added = peer.protocols.filter(protocol => !(previous?.protocols ?? []).includes(protocol)) for (const protocol of removed) { const topologies = this.topologies.get(protocol) @@ -200,8 +202,18 @@ export class DefaultRegistrar implements Registrar { topology.onDisconnect?.(peer.id) } } + } - for (const protocol of added) { + /** + * After identify has completed and we have received the list of supported + * protocols, notify any topologies interested in those protocols. + */ + _onPeerIdentify (evt: CustomEvent): void { + const protocols = evt.detail.protocols + const connection = evt.detail.connection + const peerId = evt.detail.peerId + + for (const protocol of protocols) { const topologies = this.topologies.get(protocol) if (topologies == null) { @@ -209,14 +221,12 @@ export class DefaultRegistrar implements Registrar { continue } - for (const connection of this.components.connectionManager.getConnections(peer.id)) { - for (const topology of topologies.values()) { - if (connection.transient && topology.notifyOnTransient !== true) { - continue - } - - topology.onConnect?.(peer.id, connection) + for (const topology of topologies.values()) { + if (connection.transient && topology.notifyOnTransient !== true) { + continue } + + topology.onConnect?.(peerId, connection) } } } diff --git a/packages/libp2p/test/registrar/registrar.spec.ts b/packages/libp2p/test/registrar/registrar.spec.ts index 19c79e54a4..302e773121 100644 --- a/packages/libp2p/test/registrar/registrar.spec.ts +++ b/packages/libp2p/test/registrar/registrar.spec.ts @@ -168,12 +168,11 @@ describe('registrar', () => { }) // remote peer connects - events.safeDispatchEvent('peer:update', { + events.safeDispatchEvent('peer:identify', { detail: { - peer: { - id: remotePeerId, - protocols: [protocol] - } + peerId: remotePeerId, + protocols: [protocol], + connection: conn } }) await onConnectDefer.promise @@ -210,12 +209,11 @@ describe('registrar', () => { await registrar.register(protocol, topology) // remote peer connects - events.safeDispatchEvent('peer:update', { + events.safeDispatchEvent('peer:identify', { detail: { - peer: { - id: remotePeerId, - protocols: [protocol] - } + peerId: remotePeerId, + protocols: [protocol], + connection: conn } }) @@ -293,12 +291,11 @@ describe('registrar', () => { await registrar.register(protocol, topology) // remote peer connects - events.safeDispatchEvent('peer:update', { + events.safeDispatchEvent('peer:identify', { detail: { - peer: { - id: remotePeerId, - protocols: [protocol] - } + peerId: remotePeerId, + protocols: [protocol], + connection: conn } }) @@ -337,12 +334,11 @@ describe('registrar', () => { await registrar.register(protocol, topology) // remote peer connects - events.safeDispatchEvent('peer:update', { + events.safeDispatchEvent('peer:identify', { detail: { - peer: { - id: remotePeerId, - protocols: [protocol] - } + peerId: remotePeerId, + protocols: [protocol], + connection: conn } }) @@ -381,13 +377,21 @@ describe('registrar', () => { nonTransientConnection ]) - // remote peer connects - events.safeDispatchEvent('peer:update', { + // remote peer connects over transient connection + events.safeDispatchEvent('peer:identify', { detail: { - peer: { - id: remotePeerId, - protocols: [protocol] - } + peerId: remotePeerId, + protocols: [protocol], + connection: transientConnection + } + }) + + // remote peer opens non-transient connection + events.safeDispatchEvent('peer:identify', { + detail: { + peerId: remotePeerId, + protocols: [protocol], + connection: nonTransientConnection } }) From 0dbe5924a21c4ecb5f90a49b11972b635f810871 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 31 Oct 2023 18:02:29 +0000 Subject: [PATCH 6/7] chore: fix interop test --- packages/pubsub/src/index.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index af8fa223de..8f02e881f2 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -213,6 +213,13 @@ export abstract class PubSubBaseProtocol = Pu protected _onPeerConnected (peerId: PeerId, conn: Connection): void { log('connected %p', peerId) + const existing = this.peers.get(peerId) + + // If peer streams already exists, do nothing + if (existing != null) { + return + } + void Promise.resolve().then(async () => { try { const stream = await conn.newStream(this.multicodecs) From bb28354d3556918ac228be3fd18437b7e87e8ae4 Mon Sep 17 00:00:00 2001 From: achingbrain Date: Tue, 31 Oct 2023 18:24:54 +0000 Subject: [PATCH 7/7] chore: fix interop, again --- packages/pubsub/src/index.ts | 6 ++---- packages/pubsub/test/utils/index.ts | 6 ++++-- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/pubsub/src/index.ts b/packages/pubsub/src/index.ts index 8f02e881f2..ba467f6e88 100644 --- a/packages/pubsub/src/index.ts +++ b/packages/pubsub/src/index.ts @@ -213,10 +213,8 @@ export abstract class PubSubBaseProtocol = Pu protected _onPeerConnected (peerId: PeerId, conn: Connection): void { log('connected %p', peerId) - const existing = this.peers.get(peerId) - - // If peer streams already exists, do nothing - if (existing != null) { + // if this connection is already in use for pubsub, ignore it + if (conn.streams.find(stream => stream.protocol != null && this.multicodecs.includes(stream.protocol)) != null) { return } diff --git a/packages/pubsub/test/utils/index.ts b/packages/pubsub/test/utils/index.ts index b98c78b76f..fdd7e5b535 100644 --- a/packages/pubsub/test/utils/index.ts +++ b/packages/pubsub/test/utils/index.ts @@ -136,7 +136,8 @@ export const ConnectionPair = (): [Connection, Connection] => { ...d0, protocol: protocol[0], closeWrite: async () => {} - }) + }), + streams: [] }, { // @ts-expect-error incomplete implementation @@ -144,7 +145,8 @@ export const ConnectionPair = (): [Connection, Connection] => { ...d1, protocol: protocol[0], closeWrite: async () => {} - }) + }), + streams: [] } ] }