Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: opt-in to toplogy notifications on transient connections #2049

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ export interface IdentifyResult {
* If sent by the remote peer this is the deserialized signed peer record
*/
signedPeerRecord?: SignedPeerRecord

/**
* The connection that the identify protocol ran over
*/
connection: Connection
}

/**
Expand Down
15 changes: 15 additions & 0 deletions packages/interface/src/topology/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ export interface Topology {
min?: number
max?: number

/**
* If true, invoke `onConnect` for this topology on transient (e.g. short-lived
* and/or data-limited) connections. (default: false)
*/
notifyOnTransient?: boolean

/**
* Invoked when a new connection is opened to a peer that supports the
* registered protocol
*/
onConnect?: (peerId: PeerId, conn: Connection) => void

/**
* Invoked when the last connection to a peer that supports the registered
* protocol closes
*/
onDisconnect?: (peerId: PeerId) => void
}
1 change: 1 addition & 0 deletions packages/libp2p/src/circuit-relay/transport/discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class RelayDiscovery extends EventEmitter<RelayDiscoveryEvents> implement
// register a topology listener for when new peers are encountered
// that support the hop protocol
this.topologyId = await this.registrar.register(RELAY_V2_HOP_CODEC, {
notifyOnTransient: true,
onConnect: (peerId) => {
this.safeDispatchEvent('relay:discover', { detail: peerId })
}
Expand Down
1 change: 1 addition & 0 deletions packages/libp2p/src/dcutr/dcutr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export class DefaultDCUtRService implements Startable {
// register for notifications of when peers that support DCUtR connect
// nb. requires the identify service to be enabled
this.topologyId = await this.registrar.register(multicodec, {
notifyOnTransient: true,
onConnect: (peerId, connection) => {
if (!connection.transient) {
// the connection is already direct, no upgrade is required
Expand Down
3 changes: 2 additions & 1 deletion packages/libp2p/src/identify/identify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
listenAddrs: message.listenAddrs.map(buf => multiaddr(buf)),
observedAddr: message.observedAddr == null ? undefined : multiaddr(message.observedAddr),
protocols: message.protocols,
signedPeerRecord
signedPeerRecord,
connection
}

this.events.safeDispatchEvent('peer:identify', { detail: result })
Expand Down
53 changes: 19 additions & 34 deletions packages/libp2p/src/registrar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import merge from 'merge-options'
import { codes } from './errors.js'
import type { Libp2pEvents, PeerUpdate } from '@libp2p/interface'
import type { IdentifyResult, Libp2pEvents, PeerUpdate } from '@libp2p/interface'
import type { EventEmitter } from '@libp2p/interface/events'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { PeerStore } from '@libp2p/interface/peer-store'
Expand Down Expand Up @@ -37,10 +37,10 @@ export class DefaultRegistrar implements Registrar {

this._onDisconnect = this._onDisconnect.bind(this)
this._onPeerUpdate = this._onPeerUpdate.bind(this)
this._onConnect = this._onConnect.bind(this)
this._onIdentify = this._onIdentify.bind(this)

this.components.events.addEventListener('peer:disconnect', this._onDisconnect)
this.components.events.addEventListener('peer:connect', this._onConnect)
this.components.events.addEventListener('peer:identify', this._onIdentify)
this.components.events.addEventListener('peer:update', this._onPeerUpdate)
}

Expand Down Expand Up @@ -183,43 +183,28 @@ export class DefaultRegistrar implements Registrar {
}

/**
* On peer connected if we already have their protocols. Usually used for reconnects
* as change:protocols event won't be emitted due to identical protocols.
* When identify runs over a new connection, notify any topologies interested
* in the protocols the peer supports
*/
_onConnect (evt: CustomEvent<PeerId>): void {
const remotePeer = evt.detail

void this.components.peerStore.get(remotePeer)
.then(peer => {
const connection = this.components.connectionManager.getConnections(peer.id)[0]

if (connection == null) {
log('peer %p connected but the connection manager did not have a connection', peer)
// peer disconnected while we were loading their details from the peer store
return
}
_onIdentify (evt: CustomEvent<IdentifyResult>): void {
const result = evt.detail

for (const protocol of peer.protocols) {
const topologies = this.topologies.get(protocol)
for (const protocol of result.protocols) {
const topologies = this.topologies.get(protocol)

if (topologies == null) {
// no topologies are interested in this protocol
continue
}
if (topologies == null) {
// no topologies are interested in this protocol
continue
}

for (const topology of topologies.values()) {
topology.onConnect?.(remotePeer, connection)
}
}
})
.catch(err => {
if (err.code === codes.ERR_NOT_FOUND) {
// peer has not completed identify so they are not in the peer store
return
for (const topology of topologies.values()) {
if (result.connection.transient && topology.notifyOnTransient !== true) {
continue
}

log.error('could not inform topologies of connecting peer %p', remotePeer, err)
})
topology.onConnect?.(result.peerId, result.connection)
}
}
}

/**
Expand Down
105 changes: 93 additions & 12 deletions packages/libp2p/test/registrar/registrar.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
/* eslint-env mocha */

import { yamux } from '@chainsafe/libp2p-yamux'
import { CodeError } from '@libp2p/interface/errors'
import { EventEmitter } from '@libp2p/interface/events'
import { mockDuplex, mockMultiaddrConnection, mockUpgrader, mockConnection } from '@libp2p/interface-compliance-tests/mocks'
import { mplex } from '@libp2p/mplex'
Expand All @@ -14,7 +13,6 @@ import pDefer from 'p-defer'
import { type StubbedInstance, stubInterface } from 'sinon-ts'
import { type Components, defaultComponents } from '../../src/components.js'
import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { codes } from '../../src/errors.js'
import { plaintext } from '../../src/insecure/index.js'
import { createLibp2pNode, type Libp2pNode } from '../../src/libp2p.js'
import { DefaultRegistrar } from '../../src/registrar.js'
Expand Down Expand Up @@ -140,9 +138,6 @@ describe('registrar', () => {
const remotePeerId = await createEd25519PeerId()
const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId))

// return connection from connection manager
connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn])

const topology: Topology = {
onConnect: (peerId, connection) => {
expect(peerId.equals(remotePeerId)).to.be.true()
Expand Down Expand Up @@ -170,8 +165,12 @@ describe('registrar', () => {
})

// remote peer connects
events.safeDispatchEvent('peer:connect', {
detail: remotePeerId
events.safeDispatchEvent('peer:identify', {
detail: {
peerId: remotePeerId,
protocols: [protocol],
connection: conn
}
})
await onConnectDefer.promise

Expand Down Expand Up @@ -206,12 +205,13 @@ describe('registrar', () => {
// Register protocol
await registrar.register(protocol, topology)

// No details before identify
peerStore.get.withArgs(matchPeerId(conn.remotePeer)).rejects(new CodeError('Not found', codes.ERR_NOT_FOUND))

// remote peer connects
events.safeDispatchEvent('peer:connect', {
detail: remotePeerId
events.safeDispatchEvent('peer:identify', {
detail: {
peerId: remotePeerId,
protocols: [protocol],
connection: conn
}
})

// Can get details after identify
Expand Down Expand Up @@ -261,6 +261,87 @@ describe('registrar', () => {
await onDisconnectDefer.promise
})

it('should not call topology handlers for transient connection', async () => {
const onConnectDefer = pDefer()
const onDisconnectDefer = pDefer()

// Setup connections before registrar
const remotePeerId = await createEd25519PeerId()
const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId))

// connection is transient
conn.transient = true

// return connection from connection manager
connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn])

const topology: Topology = {
onConnect: () => {
onConnectDefer.reject(new Error('Topolgy onConnect called for transient connection'))
},
onDisconnect: () => {
onDisconnectDefer.reject(new Error('Topolgy onDisconnect called for transient connection'))
}
}

// Register topology for protocol
await registrar.register(protocol, topology)

// remote peer connects
events.safeDispatchEvent('peer:identify', {
detail: {
peerId: remotePeerId,
protocols: [protocol],
connection: conn
}
})

await expect(Promise.any([
onConnectDefer.promise,
onDisconnectDefer.promise,
new Promise<void>((resolve) => {
setTimeout(() => {
resolve()
}, 1000)
})
])).to.eventually.not.be.rejected()
})

it('should call topology onConnect handler for transient connection when explicitly requested', async () => {
const onConnectDefer = pDefer()

// Setup connections before registrar
const remotePeerId = await createEd25519PeerId()
const conn = mockConnection(mockMultiaddrConnection(mockDuplex(), remotePeerId))

// connection is transient
conn.transient = true

// return connection from connection manager
connectionManager.getConnections.withArgs(matchPeerId(remotePeerId)).returns([conn])

const topology: Topology = {
notifyOnTransient: true,
onConnect: () => {
onConnectDefer.resolve()
}
}

// Register topology for protocol
await registrar.register(protocol, topology)

// remote peer connects
events.safeDispatchEvent('peer:identify', {
detail: {
peerId: remotePeerId,
protocols: [protocol],
connection: conn
}
})

await expect(onConnectDefer.promise).to.eventually.be.undefined()
})

it('should be able to register and unregister a handler', async () => {
const deferred = pDefer<Components>()

Expand Down
Loading