Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: opt-in to toplogy notifications on transient connections #2049

Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/interface/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ export interface IdentifyResult {
* If sent by the remote peer this is the deserialized signed peer record
*/
signedPeerRecord?: SignedPeerRecord

/**
* The connection that the identify protocol ran over
*/
connection: Connection
}

/**
Expand Down
15 changes: 15 additions & 0 deletions packages/interface/src/topology/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ export interface Topology {
min?: number
max?: number

/**
* If true, invoke `onConnect` for this topology on transient (e.g. short-lived
* and/or data-limited) connections. (default: false)
*/
notifyOnTransient?: boolean

/**
* Invoked when a new connection is opened to a peer that supports the
* registered protocol
*/
onConnect?(peerId: PeerId, conn: Connection): void

/**
* Invoked when the last connection to a peer that supports the registered
* protocol closes
*/
onDisconnect?(peerId: PeerId): void
}
1 change: 1 addition & 0 deletions packages/libp2p/src/circuit-relay/transport/discovery.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class RelayDiscovery extends EventEmitter<RelayDiscoveryEvents> implement
// register a topology listener for when new peers are encountered
// that support the hop protocol
this.topologyId = await this.registrar.register(RELAY_V2_HOP_CODEC, {
notifyOnTransient: true,
onConnect: (peerId) => {
this.safeDispatchEvent('relay:discover', { detail: peerId })
}
Expand Down
1 change: 1 addition & 0 deletions packages/libp2p/src/dcutr/dcutr.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export class DefaultDCUtRService implements Startable {
// register for notifications of when peers that support DCUtR connect
// nb. requires the identify service to be enabled
this.topologyId = await this.registrar.register(multicodec, {
notifyOnTransient: true,
onConnect: (peerId, connection) => {
if (!connection.transient) {
// the connection is already direct, no upgrade is required
Expand Down
47 changes: 23 additions & 24 deletions packages/libp2p/src/identify/identify.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,22 +318,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
this.addressManager.addObservedAddr(cleanObservedAddr)
}

const signedPeerRecord = await this.#consumeIdentifyMessage(connection.remotePeer, message)

const result: IdentifyResult = {
peerId: id,
protocolVersion: message.protocolVersion,
agentVersion: message.agentVersion,
publicKey: message.publicKey,
listenAddrs: message.listenAddrs.map(buf => multiaddr(buf)),
observedAddr: message.observedAddr == null ? undefined : multiaddr(message.observedAddr),
protocols: message.protocols,
signedPeerRecord
}

this.events.safeDispatchEvent('peer:identify', { detail: result })

return result
return this.#consumeIdentifyMessage(connection, message)
}

/**
Expand Down Expand Up @@ -411,7 +396,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
const message = await pb.read(options)
await stream.close(options)

await this.#consumeIdentifyMessage(connection.remotePeer, message)
await this.#consumeIdentifyMessage(connection, message)
} catch (err: any) {
log.error('received invalid message', err)
stream.abort(err)
Expand All @@ -421,8 +406,8 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
log('handled push from %p', connection.remotePeer)
}

async #consumeIdentifyMessage (remotePeer: PeerId, message: Identify): Promise<SignedPeerRecord | undefined> {
log('received identify from %p', remotePeer)
async #consumeIdentifyMessage (connection: Connection, message: Identify): Promise<IdentifyResult> {
log('received identify from %p', connection.remotePeer)

if (message == null) {
throw new Error('Message was null or undefined')
Expand All @@ -442,7 +427,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {

// if the peer record has been sent, prefer the addresses in the record as they are signed by the remote peer
if (message.signedPeerRecord != null) {
log('received signedPeerRecord in push from %p', remotePeer)
log('received signedPeerRecord in push from %p', connection.remotePeer)

let peerRecordEnvelope = message.signedPeerRecord
const envelope = await RecordEnvelope.openAndCertify(peerRecordEnvelope, PeerRecord.DOMAIN)
Expand All @@ -454,7 +439,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
}

// Make sure remote peer is the one sending the record
if (!remotePeer.equals(peerRecord.peerId)) {
if (!connection.remotePeer.equals(peerRecord.peerId)) {
throw new Error('signing key does not match remote PeerId')
}

Expand Down Expand Up @@ -500,7 +485,7 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
addresses: peerRecord.multiaddrs
}
} else {
log('%p did not send a signed peer record', remotePeer)
log('%p did not send a signed peer record', connection.remotePeer)
}

if (message.agentVersion != null) {
Expand All @@ -511,9 +496,23 @@ export class DefaultIdentifyService implements Startable, IdentifyService {
peer.metadata.set('ProtocolVersion', uint8ArrayFromString(message.protocolVersion))
}

await this.peerStore.patch(remotePeer, peer)
await this.peerStore.patch(connection.remotePeer, peer)

return output
const result: IdentifyResult = {
peerId: connection.remotePeer,
protocolVersion: message.protocolVersion,
agentVersion: message.agentVersion,
publicKey: message.publicKey,
listenAddrs: message.listenAddrs.map(buf => multiaddr(buf)),
observedAddr: message.observedAddr == null ? undefined : multiaddr(message.observedAddr),
protocols: message.protocols,
signedPeerRecord: output,
connection
}

this.events.safeDispatchEvent('peer:identify', { detail: result })

return result
}
}

Expand Down
55 changes: 7 additions & 48 deletions packages/libp2p/src/registrar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ export class DefaultRegistrar implements Registrar {

this._onDisconnect = this._onDisconnect.bind(this)
this._onPeerUpdate = this._onPeerUpdate.bind(this)
this._onConnect = this._onConnect.bind(this)

this.components.events.addEventListener('peer:disconnect', this._onDisconnect)
this.components.events.addEventListener('peer:connect', this._onConnect)
this.components.events.addEventListener('peer:update', this._onPeerUpdate)
}

Expand Down Expand Up @@ -183,47 +181,7 @@ export class DefaultRegistrar implements Registrar {
}

/**
* On peer connected if we already have their protocols. Usually used for reconnects
* as change:protocols event won't be emitted due to identical protocols.
*/
_onConnect (evt: CustomEvent<PeerId>): void {
const remotePeer = evt.detail

void this.components.peerStore.get(remotePeer)
.then(peer => {
const connection = this.components.connectionManager.getConnections(peer.id)[0]

if (connection == null) {
log('peer %p connected but the connection manager did not have a connection', peer)
// peer disconnected while we were loading their details from the peer store
return
}

for (const protocol of peer.protocols) {
const topologies = this.topologies.get(protocol)

if (topologies == null) {
// no topologies are interested in this protocol
continue
}

for (const topology of topologies.values()) {
topology.onConnect?.(remotePeer, connection)
}
}
})
.catch(err => {
if (err.code === codes.ERR_NOT_FOUND) {
// peer has not completed identify so they are not in the peer store
return
}

log.error('could not inform topologies of connecting peer %p', remotePeer, err)
})
}

/**
* Check if a new peer support the multicodecs for this topology
* Check if a new peer supports the multicodecs for this topology
*/
_onPeerUpdate (evt: CustomEvent<PeerUpdate>): void {
const { peer, previous } = evt.detail
Expand Down Expand Up @@ -251,13 +209,14 @@ export class DefaultRegistrar implements Registrar {
continue
}

for (const topology of topologies.values()) {
const connection = this.components.connectionManager.getConnections(peer.id)[0]
for (const connection of this.components.connectionManager.getConnections(peer.id)) {
for (const topology of topologies.values()) {
if (connection.transient && topology.notifyOnTransient !== true) {
continue
}

if (connection == null) {
continue
topology.onConnect?.(peer.id, connection)
}
topology.onConnect?.(peer.id, connection)
}
}
}
Expand Down
26 changes: 15 additions & 11 deletions packages/libp2p/test/identify/service.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,13 +65,15 @@ describe('identify', () => {
expect(connection).to.exist()

// wait for identify to run on the new connection
await eventPromise
const identifyResult = await eventPromise

// should have run on the new connection
expect(identifyResult).to.have.nested.property('detail.connection', connection)

// assert we have received certified announce addresses
const peer = await libp2p.peerStore.get(remoteLibp2p.peerId)
expect(peer.addresses).to.have.lengthOf(1)
expect(peer.addresses[0].isCertified).to.be.true('did not receive certified address via identify')
expect(peer.addresses[0].multiaddr.toString()).to.startWith('/dns4/localhost/', 'did not receive announce address via identify')
expect(identifyResult).to.have.deep.nested.property('detail.signedPeerRecord.addresses', [
multiaddr(`/dns4/localhost/tcp/${REMOTE_PORT}`)
], 'did not receive announce address via identify')
})

it('should run identify automatically for inbound connections', async () => {
Expand All @@ -88,14 +90,16 @@ describe('identify', () => {
const connection = await remoteLibp2p.dial(multiaddr(`/ip4/127.0.0.1/tcp/${LOCAL_PORT}/p2p/${libp2p.peerId.toString()}`))
expect(connection).to.exist()

// wait for identify to run on the new connection
await eventPromise
// wait for identify to run
const identifyResult = await eventPromise

// should have run on the new connection
expect(identifyResult).to.have.nested.property('detail.connection', connection)

// assert we have received certified announce addresses
const peer = await libp2p.peerStore.get(remoteLibp2p.peerId)
expect(peer.addresses).to.have.lengthOf(1)
expect(peer.addresses[0].isCertified).to.be.true('did not receive certified address via identify')
expect(peer.addresses[0].multiaddr.toString()).to.startWith('/dns4/localhost/', 'did not receive announce address via identify')
expect(identifyResult).to.have.deep.nested.property('detail.signedPeerRecord.addresses', [
multiaddr(`/dns4/localhost/tcp/${LOCAL_PORT}`)
], 'did not receive announce address via identify')
})

it('should identify connection on dial and get proper announce addresses', async () => {
Expand Down
Loading
Loading