Skip to content

Commit

Permalink
feat: invoke progress events during dialing (#2596)
Browse files Browse the repository at this point in the history
If the user passes an `onProgress` callback to a dial request,
invoke the callback with progress events during the dial.
  • Loading branch information
achingbrain authored Jul 2, 2024
1 parent 15eb664 commit 6573cb8
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 22 deletions.
1 change: 1 addition & 0 deletions packages/interface-internal/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
"@libp2p/interface": "^1.5.0",
"@libp2p/peer-collections": "^5.2.4",
"@multiformats/multiaddr": "^12.2.3",
"progress-events": "^1.0.0",
"uint8arraylist": "^2.4.8"
},
"devDependencies": {
Expand Down
13 changes: 11 additions & 2 deletions packages/interface-internal/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
import type { AbortOptions, PendingDial, Connection, MultiaddrConnection, PeerId, IsDialableOptions } from '@libp2p/interface'
import type { TransportManagerDialProgressEvents } from '../transport-manager/index.js'
import type { AbortOptions, PendingDial, Connection, MultiaddrConnection, PeerId, IsDialableOptions, Address } from '@libp2p/interface'
import type { PeerMap } from '@libp2p/peer-collections'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ProgressOptions, ProgressEvent } from 'progress-events'

export interface OpenConnectionOptions extends AbortOptions {
export type OpenConnectionProgressEvents =
TransportManagerDialProgressEvents |
ProgressEvent<'dial:already-connected'> |
ProgressEvent<'dial:already-in-dial-queue'> |
ProgressEvent<'dial:add-to-dial-queue'> |
ProgressEvent<'dial:calculate-addresses', Address[]>

export interface OpenConnectionOptions extends AbortOptions, ProgressOptions<OpenConnectionProgressEvents> {
/**
* Connection requests with a higher priority will be executed before those
* with a lower priority. (default: 50)
Expand Down
12 changes: 10 additions & 2 deletions packages/interface-internal/src/transport-manager/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
import type { Connection, Listener, Transport } from '@libp2p/interface'
import type { AbortOptions, Connection, Listener, Transport } from '@libp2p/interface'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { ProgressOptions, ProgressEvent } from 'progress-events'

export type TransportManagerDialProgressEvents =
ProgressEvent<'dial:selected-transport', string>

export interface TransportManagerDialOptions extends AbortOptions, ProgressOptions<TransportManagerDialProgressEvents> {

}

export interface TransportManager {
/**
Expand All @@ -13,7 +21,7 @@ export interface TransportManager {
* a multiaddr, you may want to call openConnection on the connection manager
* instead.
*/
dial(ma: Multiaddr, options?: any): Promise<Connection>
dial(ma: Multiaddr, options?: TransportManagerDialOptions): Promise<Connection>

/**
* Return all addresses currently being listened on
Expand Down
1 change: 1 addition & 0 deletions packages/libp2p/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
"merge-options": "^3.0.4",
"multiformats": "^13.1.0",
"p-defer": "^4.0.1",
"progress-events": "^1.0.0",
"race-event": "^1.3.0",
"race-signal": "^1.0.2",
"uint8arrays": "^5.1.0"
Expand Down
22 changes: 12 additions & 10 deletions packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { type Multiaddr, type Resolver, resolvers, multiaddr } from '@multiforma
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
import { Circuit } from '@multiformats/multiaddr-matcher'
import { type ClearableSignal, anySignal } from 'any-signal'
import { CustomProgressEvent } from 'progress-events'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
Expand All @@ -19,21 +20,17 @@ import {
} from './constants.js'
import { resolveMultiaddrs } from './utils.js'
import { DEFAULT_DIAL_PRIORITY } from './index.js'
import type { AddressSorter, AbortOptions, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting, IsDialableOptions } from '@libp2p/interface'
import type { TransportManager } from '@libp2p/interface-internal'
import type { AddressSorter, ComponentLogger, Logger, Connection, ConnectionGater, Metrics, PeerId, Address, PeerStore, PeerRouting, IsDialableOptions } from '@libp2p/interface'
import type { OpenConnectionOptions, OpenConnectionProgressEvents, TransportManager } from '@libp2p/interface-internal'
import type { DNS } from '@multiformats/dns'
import type { ProgressOptions } from 'progress-events'

export interface PendingDialTarget {
resolve(value: any): void
reject(err: Error): void
}

export interface DialOptions extends AbortOptions {
priority?: number
force?: boolean
}

interface DialQueueJobOptions extends PriorityQueueJobOptions {
interface DialQueueJobOptions extends PriorityQueueJobOptions, ProgressOptions<OpenConnectionProgressEvents> {
peerId?: PeerId
multiaddrs: Set<string>
}
Expand Down Expand Up @@ -134,7 +131,7 @@ export class DialQueue {
* The dial to the first address that is successfully able to upgrade a
* connection will be used, all other dials will be aborted when that happens.
*/
async dial (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: DialOptions = {}): Promise<Connection> {
async dial (peerIdOrMultiaddr: PeerId | Multiaddr | Multiaddr[], options: OpenConnectionOptions = {}): Promise<Connection> {
const { peerId, multiaddrs } = getPeerAddress(peerIdOrMultiaddr)

// make sure we don't have an existing connection to any of the addresses we
Expand All @@ -155,6 +152,7 @@ export class DialQueue {

if (existingConnection != null) {
this.log('already connected to %a', existingConnection.remoteAddr)
options.onProgress?.(new CustomProgressEvent('dial:already-connected'))
return existingConnection
}

Expand Down Expand Up @@ -189,6 +187,7 @@ export class DialQueue {
existingDial.options.multiaddrs.add(multiaddr.toString())
}

options.onProgress?.(new CustomProgressEvent('dial:already-in-dial-queue'))
return existingDial.join(options)
}

Expand All @@ -198,6 +197,7 @@ export class DialQueue {

this.log('creating dial target for %p', peerId, multiaddrs.map(ma => ma.toString()))

options.onProgress?.(new CustomProgressEvent('dial:add-to-dial-queue'))
return this.queue.add(async (options) => {
// create abort conditions - need to do this before `calculateMultiaddrs` as
// we may be about to resolve a dns addr which can time out
Expand All @@ -212,6 +212,8 @@ export class DialQueue {
signal
})

options?.onProgress?.(new CustomProgressEvent<Address[]>('dial:calculate-addresses', addrsToDial))

addrsToDial.map(({ multiaddr }) => multiaddr.toString()).forEach(addr => {
options?.multiaddrs.add(addr)
})
Expand Down Expand Up @@ -299,7 +301,7 @@ export class DialQueue {
}

// eslint-disable-next-line complexity
private async calculateMultiaddrs (peerId?: PeerId, multiaddrs: Set<string> = new Set<string>(), options: DialOptions = {}): Promise<Address[]> {
private async calculateMultiaddrs (peerId?: PeerId, multiaddrs: Set<string> = new Set<string>(), options: OpenConnectionOptions = {}): Promise<Address[]> {
const addrs: Address[] = [...multiaddrs].map(ma => ({
multiaddr: multiaddr(ma),
isCertified: false
Expand Down
2 changes: 2 additions & 0 deletions packages/libp2p/src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { defaultAddressSort } from '@libp2p/utils/address-sort'
import { RateLimiter } from '@libp2p/utils/rate-limiter'
import { type Multiaddr, type Resolver, multiaddr } from '@multiformats/multiaddr'
import { dnsaddrResolver } from '@multiformats/multiaddr/resolvers'
import { CustomProgressEvent } from 'progress-events'
import { codes } from '../errors.js'
import { getPeerAddress } from '../get-peer.js'
import { AutoDial } from './auto-dial.js'
Expand Down Expand Up @@ -509,6 +510,7 @@ export class DefaultConnectionManager implements ConnectionManager, Startable {
if (existingConnection != null) {
this.log('had an existing non-transient connection to %p', peerId)

options.onProgress?.(new CustomProgressEvent('dial:already-connected'))
return existingConnection
}
}
Expand Down
9 changes: 6 additions & 3 deletions packages/libp2p/src/transport-manager.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { CodeError, FaultTolerance } from '@libp2p/interface'
import { trackedMap } from '@libp2p/utils/tracked-map'
import { CustomProgressEvent } from 'progress-events'
import { codes } from './errors.js'
import type { Libp2pEvents, AbortOptions, ComponentLogger, Logger, Connection, TypedEventTarget, Metrics, Startable, Listener, Transport, Upgrader } from '@libp2p/interface'
import type { AddressManager, TransportManager } from '@libp2p/interface-internal'
import type { Libp2pEvents, ComponentLogger, Logger, Connection, TypedEventTarget, Metrics, Startable, Listener, Transport, Upgrader } from '@libp2p/interface'
import type { AddressManager, TransportManager, TransportManagerDialOptions } from '@libp2p/interface-internal'
import type { Multiaddr } from '@multiformats/multiaddr'

export interface TransportManagerInit {
Expand Down Expand Up @@ -107,13 +108,15 @@ export class DefaultTransportManager implements TransportManager, Startable {
/**
* Dials the given Multiaddr over it's supported transport
*/
async dial (ma: Multiaddr, options?: AbortOptions): Promise<Connection> {
async dial (ma: Multiaddr, options?: TransportManagerDialOptions): Promise<Connection> {
const transport = this.dialTransportForMultiaddr(ma)

if (transport == null) {
throw new CodeError(`No transport available for address ${String(ma)}`, codes.ERR_TRANSPORT_UNAVAILABLE)
}

options?.onProgress?.(new CustomProgressEvent<string>('dial:selected-transport', transport[Symbol.toStringTag]))

try {
return await transport.dial(ma, {
...options,
Expand Down
10 changes: 5 additions & 5 deletions packages/libp2p/test/connection-manager/direct.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,11 @@ describe('dialing (direct, WebSockets)', () => {
})

sinon.stub(localTM, 'dial').callsFake(async (addr, options) => {
expect(options.signal).to.exist()
expect(options.signal.aborted).to.equal(false)
expect(options?.signal).to.exist()
expect(options?.signal?.aborted).to.equal(false)
expect(addr.toString()).to.eql(remoteAddr.toString())
await delay(60)
expect(options.signal.aborted).to.equal(true)
expect(options?.signal?.aborted).to.equal(true)
throw new AbortError()
})

Expand Down Expand Up @@ -235,10 +235,10 @@ describe('dialing (direct, WebSockets)', () => {
sinon.stub(localTM, 'dial').callsFake(async (_, options) => {
const deferredDial = pDefer<Connection>()
const onAbort = (): void => {
options.signal.removeEventListener('abort', onAbort)
options?.signal?.removeEventListener('abort', onAbort)
deferredDial.reject(new AbortError())
}
options.signal.addEventListener('abort', onAbort)
options?.signal?.addEventListener('abort', onAbort)
return deferredDial.promise
})

Expand Down

0 comments on commit 6573cb8

Please sign in to comment.