Skip to content

Commit

Permalink
fix: do not find peer when DHT yields peers without multiaddrs
Browse files Browse the repository at this point in the history
go-libp2p recently reduced the length of time it stores multiaddrs
for peers in case they go out of date.  This means that queries have
started finding provdiers/closer peers without multiaddrs as those
addreses expire.

We switched to running a find peer when these addressless peers are
discovered but it is incredibly expensive since a single find peer
can result in multiple dials.

Instead honor the peer without multiaddrs, a separate PR will be
opened to allow doing the find peer at dial time if the user choses
to dial the discovered peer.
  • Loading branch information
achingbrain committed Jan 6, 2024
1 parent 581574d commit 49e232a
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 521 deletions.
92 changes: 12 additions & 80 deletions packages/kad-dht/src/kad-dht.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { CodeError, CustomEvent, TypedEventEmitter, contentRoutingSymbol, peerDiscoverySymbol, peerRoutingSymbol } from '@libp2p/interface'
import drain from 'it-drain'
import map from 'it-map'
import parallel from 'it-parallel'
import pDefer from 'p-defer'
import { PROTOCOL } from './constants.js'
import { ContentFetching } from './content-fetching/index.js'
Expand All @@ -22,70 +20,29 @@ import {
removePrivateAddressesMapper
} from './utils.js'
import type { KadDHTComponents, KadDHTInit, Validators, Selectors, KadDHT as KadDHTInterface, QueryEvent, PeerInfoMapper } from './index.js'
import type { AbortOptions, ContentRouting, Logger, PeerDiscovery, PeerDiscoveryEvents, PeerId, PeerInfo, PeerRouting, RoutingOptions, Startable } from '@libp2p/interface'
import type { ContentRouting, Logger, PeerDiscovery, PeerDiscoveryEvents, PeerId, PeerInfo, PeerRouting, RoutingOptions, Startable } from '@libp2p/interface'
import type { CID } from 'multiformats/cid'

async function * ensurePeerInfoHasMultiaddrs (source: AsyncGenerator<PeerInfo>, peerRouting: PeerRouting, log: Logger, options: AbortOptions = {}): AsyncGenerator<() => Promise<PeerInfo | undefined>, void, undefined> {
yield * map(source, prov => {
return async () => {
if (prov.multiaddrs.length > 0) {
return prov
}

try {
return await peerRouting.findPeer(prov.id, {
...options,
useCache: false
})
} catch (err) {
log.error('could not find peer', err)
}
}
})
}

/**
* Wrapper class to convert events into returned values
*/
class DHTContentRouting implements ContentRouting {
private readonly dht: KadDHTInterface
private readonly peerInfoMapper: PeerInfoMapper
private readonly peerRouting: PeerRouting
private readonly log: Logger

constructor (dht: KadDHTInterface, peerInfoMapper: PeerInfoMapper, peerRouting: PeerRouting, log: Logger) {
constructor (dht: KadDHTInterface) {
this.dht = dht
this.peerInfoMapper = peerInfoMapper
this.peerRouting = peerRouting
this.log = log
}

async provide (cid: CID, options: RoutingOptions = {}): Promise<void> {
await drain(this.dht.provide(cid, options))
}

async * findProviders (cid: CID, options: RoutingOptions = {}): AsyncGenerator<PeerInfo, void, undefined> {
const self = this
const source = async function * (): AsyncGenerator<PeerInfo, void, undefined> {
for await (const event of self.dht.findProviders(cid, options)) {
if (event.name === 'PROVIDER') {
yield * event.providers
}
for await (const event of this.dht.findProviders(cid, options)) {
if (event.name === 'PROVIDER') {
yield * event.providers
}
}
for await (let peerInfo of parallel(ensurePeerInfoHasMultiaddrs(source(), this.peerRouting, this.log, options))) {
if (peerInfo == null) {
continue
}

peerInfo = this.peerInfoMapper(peerInfo)

if (peerInfo.multiaddrs.length === 0) {
continue
}

yield peerInfo
}
}

async put (key: Uint8Array, value: Uint8Array, options?: RoutingOptions): Promise<void> {
Expand All @@ -108,51 +65,26 @@ class DHTContentRouting implements ContentRouting {
*/
class DHTPeerRouting implements PeerRouting {
private readonly dht: KadDHTInterface
private readonly peerInfoMapper: PeerInfoMapper
private readonly log: Logger

constructor (dht: KadDHTInterface, peerInfoMapper: PeerInfoMapper, log: Logger) {
constructor (dht: KadDHTInterface) {
this.dht = dht
this.peerInfoMapper = peerInfoMapper
this.log = log
}

async findPeer (peerId: PeerId, options: RoutingOptions = {}): Promise<PeerInfo> {
for await (const event of this.dht.findPeer(peerId, options)) {
if (event.name === 'FINAL_PEER') {
const peer = this.peerInfoMapper(event.peer)

if (peer.multiaddrs.length > 0) {
return event.peer
}
return event.peer
}
}

throw new CodeError('Not found', 'ERR_NOT_FOUND')
}

async * getClosestPeers (key: Uint8Array, options: RoutingOptions = {}): AsyncIterable<PeerInfo> {
const self = this
const source = async function * (): AsyncGenerator<PeerInfo, void, undefined> {
for await (const event of self.dht.getClosestPeers(key, options)) {
if (event.name === 'FINAL_PEER') {
yield event.peer
}
}
}

for await (let peerInfo of parallel(ensurePeerInfoHasMultiaddrs(source(), this, this.log, options))) {
if (peerInfo == null) {
continue
}

peerInfo = this.peerInfoMapper(peerInfo)

if (peerInfo.multiaddrs.length === 0) {
continue
for await (const event of this.dht.getClosestPeers(key, options)) {
if (event.name === 'FINAL_PEER') {
yield event.peer
}

yield peerInfo
}
}
}
Expand Down Expand Up @@ -347,8 +279,8 @@ export class KadDHT extends TypedEventEmitter<PeerDiscoveryEvents> implements Ka
})
})

this.dhtPeerRouting = new DHTPeerRouting(this, this.peerInfoMapper, this.log)
this.dhtContentRouting = new DHTContentRouting(this, this.peerInfoMapper, this.dhtPeerRouting, this.log)
this.dhtPeerRouting = new DHTPeerRouting(this)
this.dhtContentRouting = new DHTContentRouting(this)

// if client mode has not been explicitly specified, auto-switch to server
// mode when the node's peer data is updated with publicly dialable
Expand Down
Loading

0 comments on commit 49e232a

Please sign in to comment.