Skip to content

Commit

Permalink
fix: notify topology listeners after identify
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Oct 31, 2023
1 parent a47d1dc commit b2f8d0d
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 37 deletions.
32 changes: 21 additions & 11 deletions packages/libp2p/src/registrar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { CodeError } from '@libp2p/interface/errors'
import { logger } from '@libp2p/logger'
import merge from 'merge-options'
import { codes } from './errors.js'
import type { Libp2pEvents, PeerUpdate } from '@libp2p/interface'
import type { IdentifyResult, Libp2pEvents, PeerUpdate } from '@libp2p/interface'
import type { TypedEventTarget } from '@libp2p/interface/events'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { PeerStore } from '@libp2p/interface/peer-store'
Expand Down Expand Up @@ -37,9 +37,11 @@ export class DefaultRegistrar implements Registrar {

this._onDisconnect = this._onDisconnect.bind(this)
this._onPeerUpdate = this._onPeerUpdate.bind(this)
this._onPeerIdentify = this._onPeerIdentify.bind(this)

this.components.events.addEventListener('peer:disconnect', this._onDisconnect)
this.components.events.addEventListener('peer:update', this._onPeerUpdate)
this.components.events.addEventListener('peer:identify', this._onPeerIdentify)
}

getProtocols (): string[] {
Expand Down Expand Up @@ -181,12 +183,12 @@ export class DefaultRegistrar implements Registrar {
}

/**
* Check if a new peer supports the multicodecs for this topology
* When a peer is updated, if they have removed supported protocols notify any
* topologies interested in the removed protocols.
*/
_onPeerUpdate (evt: CustomEvent<PeerUpdate>): void {
const { peer, previous } = evt.detail
const removed = (previous?.protocols ?? []).filter(protocol => !peer.protocols.includes(protocol))
const added = peer.protocols.filter(protocol => !(previous?.protocols ?? []).includes(protocol))

for (const protocol of removed) {
const topologies = this.topologies.get(protocol)
Expand All @@ -200,23 +202,31 @@ export class DefaultRegistrar implements Registrar {
topology.onDisconnect?.(peer.id)
}
}
}

for (const protocol of added) {
/**
* After identify has completed and we have received the list of supported
* protocols, notify any topologies interested in those protocols.
*/
_onPeerIdentify (evt: CustomEvent<IdentifyResult>): void {
const protocols = evt.detail.protocols
const connection = evt.detail.connection
const peerId = evt.detail.peerId

for (const protocol of protocols) {
const topologies = this.topologies.get(protocol)

if (topologies == null) {
// no topologies are interested in this protocol
continue
}

for (const connection of this.components.connectionManager.getConnections(peer.id)) {
for (const topology of topologies.values()) {
if (connection.transient && topology.notifyOnTransient !== true) {
continue
}

topology.onConnect?.(peer.id, connection)
for (const topology of topologies.values()) {
if (connection.transient && topology.notifyOnTransient !== true) {
continue
}

topology.onConnect?.(peerId, connection)
}
}
}
Expand Down
56 changes: 30 additions & 26 deletions packages/libp2p/test/registrar/registrar.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,12 +168,11 @@ describe('registrar', () => {
})

// remote peer connects
events.safeDispatchEvent('peer:update', {
events.safeDispatchEvent('peer:identify', {
detail: {
peer: {
id: remotePeerId,
protocols: [protocol]
}
peerId: remotePeerId,
protocols: [protocol],
connection: conn
}
})
await onConnectDefer.promise
Expand Down Expand Up @@ -210,12 +209,11 @@ describe('registrar', () => {
await registrar.register(protocol, topology)

// remote peer connects
events.safeDispatchEvent('peer:update', {
events.safeDispatchEvent('peer:identify', {
detail: {
peer: {
id: remotePeerId,
protocols: [protocol]
}
peerId: remotePeerId,
protocols: [protocol],
connection: conn
}
})

Expand Down Expand Up @@ -293,12 +291,11 @@ describe('registrar', () => {
await registrar.register(protocol, topology)

// remote peer connects
events.safeDispatchEvent('peer:update', {
events.safeDispatchEvent('peer:identify', {
detail: {
peer: {
id: remotePeerId,
protocols: [protocol]
}
peerId: remotePeerId,
protocols: [protocol],
connection: conn
}
})

Expand Down Expand Up @@ -337,12 +334,11 @@ describe('registrar', () => {
await registrar.register(protocol, topology)

// remote peer connects
events.safeDispatchEvent('peer:update', {
events.safeDispatchEvent('peer:identify', {
detail: {
peer: {
id: remotePeerId,
protocols: [protocol]
}
peerId: remotePeerId,
protocols: [protocol],
connection: conn
}
})

Expand Down Expand Up @@ -381,13 +377,21 @@ describe('registrar', () => {
nonTransientConnection
])

// remote peer connects
events.safeDispatchEvent('peer:update', {
// remote peer connects over transient connection
events.safeDispatchEvent('peer:identify', {
detail: {
peer: {
id: remotePeerId,
protocols: [protocol]
}
peerId: remotePeerId,
protocols: [protocol],
connection: transientConnection
}
})

// remote peer opens non-transient connection
events.safeDispatchEvent('peer:identify', {
detail: {
peerId: remotePeerId,
protocols: [protocol],
connection: nonTransientConnection
}
})

Expand Down

0 comments on commit b2f8d0d

Please sign in to comment.