Skip to content

Commit

Permalink
fix: use optimistic protocol negotation (#2253)
Browse files Browse the repository at this point in the history
When negotiating connection encrypters, multiplexers or stream protocols, if we only have one protocol to negotiate there's no point sending it, then waiting for a response, then sending some data, we can just send it, and the data, then assuming the remote doesn't immediately close the negotiation channel, read the response at our leisure.

This saves a round trip for the first chunk of stream data and reduces our connection latency in the [libp2p perf tests](https://observablehq.com/@libp2p-workspace/performance-dashboard?branch=c8022cb77397759bd7e71a73e93f9074854989fe) from 0.45 to 0.3ms.

It changes stream behaviour a little, since we now don't start the protocol negotiation until we interact with the stream (e.g. try to read or write data) and most of our tests assume that negotiation has succeeded when the stream is returned so it's not been a straightforward fix.
  • Loading branch information
achingbrain authored Nov 23, 2023
1 parent 6b6ba9a commit 0b4a2ee
Show file tree
Hide file tree
Showing 19 changed files with 408 additions and 97 deletions.
2 changes: 1 addition & 1 deletion packages/integration-tests/test/circuit-relay.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -522,7 +522,7 @@ describe('circuit-relay', () => {
expect(conns).to.have.lengthOf(1)

// this should fail as the local peer has HOP disabled
await expect(conns[0].newStream(RELAY_V2_HOP_CODEC))
await expect(conns[0].newStream([RELAY_V2_HOP_CODEC, '/other/1.0.0']))
.to.be.rejected()

// we should still be connected to the relay
Expand Down
6 changes: 3 additions & 3 deletions packages/interface-compliance-tests/src/pubsub/two-nodes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export default (common: TestSetup<PubSub, PubSubArgs>): void => {
expect(psA.getTopics()).to.deep.equal([topic])
expect(psB.getPeers()).to.have.lengthOf(1)
expect(psB.getSubscribers(topic).map(p => p.toString())).to.deep.equal([componentsA.peerId.toString()])
expect(changedPeerId).to.deep.equal(psB.getPeers()[0])
expect(changedPeerId.toString()).to.equal(psB.getPeers()[0].toString())
expect(changedSubs).to.have.lengthOf(1)
expect(changedSubs[0].topic).to.equal(topic)
expect(changedSubs[0].subscribe).to.equal(true)
Expand Down Expand Up @@ -243,7 +243,7 @@ export default (common: TestSetup<PubSub, PubSubArgs>): void => {
const { peerId: changedPeerId, subscriptions: changedSubs } = evt.detail
expect(psB.getPeers()).to.have.lengthOf(1)
expect(psB.getTopics()).to.be.empty()
expect(changedPeerId).to.deep.equal(psB.getPeers()[0])
expect(changedPeerId.toString()).to.equal(psB.getPeers()[0].toString())
expect(changedSubs).to.have.lengthOf(1)
expect(changedSubs[0].topic).to.equal(topic)
expect(changedSubs[0].subscribe).to.equal(true)
Expand All @@ -252,7 +252,7 @@ export default (common: TestSetup<PubSub, PubSubArgs>): void => {
const { peerId: changedPeerId, subscriptions: changedSubs } = evt.detail
expect(psB.getPeers()).to.have.lengthOf(1)
expect(psB.getTopics()).to.be.empty()
expect(changedPeerId).to.deep.equal(psB.getPeers()[0])
expect(changedPeerId.toString()).to.equal(psB.getPeers()[0].toString())
expect(changedSubs).to.have.lengthOf(1)
expect(changedSubs[0].topic).to.equal(topic)
expect(changedSubs[0].subscribe).to.equal(false)
Expand Down
9 changes: 3 additions & 6 deletions packages/libp2p/.aegir.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export default {
const { plaintext } = await import('@libp2p/plaintext')
const { circuitRelayServer, circuitRelayTransport } = await import('@libp2p/circuit-relay-v2')
const { identify } = await import('@libp2p/identify')
const { echo, ECHO_PROTOCOL } = await import('./dist/test/fixtures/echo-service.js')

const peerId = await createEd25519PeerId()
const libp2p = await createLibp2p({
Expand Down Expand Up @@ -49,14 +50,10 @@ export default {
reservations: {
maxReservations: Infinity
}
})
}),
echo: echo()
}
})
// Add the echo protocol
await libp2p.handle('/echo/1.0.0', ({ stream }) => {
pipe(stream, stream)
.catch() // sometimes connections are closed before multistream-select finishes which causes an error
})

return {
libp2p,
Expand Down
2 changes: 1 addition & 1 deletion packages/libp2p/src/connection-manager/dial-queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ export class DialQueue {
// internal peer dial queue - only one dial per peer at a time
const peerDialQueue = new PQueue({ concurrency: 1 })
peerDialQueue.on('error', (err) => {
this.log.error('error dialing [%s] %o', pendingDial.multiaddrs, err)
this.log.error('error dialing %s %o', pendingDial.multiaddrs, err)
})

const conn = await Promise.any(pendingDial.multiaddrs.map(async (addr, i) => {
Expand Down
79 changes: 63 additions & 16 deletions packages/libp2p/src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,12 @@ export class DefaultUpgrader implements Upgrader {
private readonly muxers: Map<string, StreamMuxerFactory>
private readonly inboundUpgradeTimeout: number
private readonly events: TypedEventTarget<Libp2pEvents>
private readonly logger: ComponentLogger
private readonly log: Logger

constructor (components: DefaultUpgraderComponents, init: UpgraderInit) {
this.components = components
this.connectionEncryption = new Map()
this.log = components.logger.forComponent('libp2p:upgrader')
this.logger = components.logger

init.connectionEncryption.forEach(encrypter => {
this.connectionEncryption.set(encrypter.protocol, encrypter)
Expand Down Expand Up @@ -415,6 +413,21 @@ export class DefaultUpgrader implements Upgrader {
muxedStream.sink = stream.sink
muxedStream.protocol = protocol

// allow closing the write end of a not-yet-negotiated stream
if (stream.closeWrite != null) {
muxedStream.closeWrite = stream.closeWrite
}

// allow closing the read end of a not-yet-negotiated stream
if (stream.closeRead != null) {
muxedStream.closeRead = stream.closeRead
}

// make sure we don't try to negotiate a stream we are closing
if (stream.close != null) {
muxedStream.close = stream.close
}

// If a protocol stream has been successfully negotiated and is to be passed to the application,
// the peerstore should ensure that the peer is registered with that protocol
await this.components.peerStore.merge(remotePeer, {
Expand All @@ -426,7 +439,7 @@ export class DefaultUpgrader implements Upgrader {
this._onStream({ connection, stream: muxedStream, protocol })
})
.catch(async err => {
this.log.error('error handling incoming stream id %d', muxedStream.id, err.message, err.code, err.stack)
this.log.error('error handling incoming stream id %s', muxedStream.id, err.message, err.code, err.stack)

if (muxedStream.timeline.close == null) {
await muxedStream.close()
Expand All @@ -440,13 +453,13 @@ export class DefaultUpgrader implements Upgrader {
throw new CodeError('Stream is not multiplexed', codes.ERR_MUXER_UNAVAILABLE)
}

connection.log('starting new stream for protocols [%s]', protocols)
connection.log('starting new stream for protocols %s', protocols)
const muxedStream = await muxer.newStream()
connection.log.trace('starting new stream %s for protocols [%s]', muxedStream.id, protocols)
connection.log.trace('started new stream %s for protocols %s', muxedStream.id, protocols)

try {
if (options.signal == null) {
this.log('No abort signal was passed while trying to negotiate protocols [%s] falling back to default timeout', protocols)
this.log('No abort signal was passed while trying to negotiate protocols %s falling back to default timeout', protocols)

const signal = AbortSignal.timeout(DEFAULT_PROTOCOL_SELECT_TIMEOUT)
setMaxListeners(Infinity, signal)
Expand All @@ -457,13 +470,18 @@ export class DefaultUpgrader implements Upgrader {
}
}

const { stream, protocol } = await mss.select(muxedStream, protocols, {
muxedStream.log.trace('selecting protocol from protocols %s', protocols)

const {
stream,
protocol
} = await mss.select(muxedStream, protocols, {
...options,
log: muxedStream.log,
yieldBytes: false
yieldBytes: true
})

connection.log('negotiated protocol stream %s with id %s', protocol, muxedStream.id)
muxedStream.log('selected protocol %s', protocol)

const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.registrar, options)
const streamCount = countStreams(protocol, 'outbound', connection)
Expand All @@ -487,6 +505,21 @@ export class DefaultUpgrader implements Upgrader {
muxedStream.sink = stream.sink
muxedStream.protocol = protocol

// allow closing the write end of a not-yet-negotiated stream
if (stream.closeWrite != null) {
muxedStream.closeWrite = stream.closeWrite
}

// allow closing the read end of a not-yet-negotiated stream
if (stream.closeRead != null) {
muxedStream.closeRead = stream.closeRead
}

// make sure we don't try to negotiate a stream we are closing
if (stream.close != null) {
muxedStream.close = stream.close
}

this.components.metrics?.trackProtocolStream(muxedStream, connection)

return muxedStream
Expand Down Expand Up @@ -637,16 +670,23 @@ export class DefaultUpgrader implements Upgrader {
this.log('selecting outbound crypto protocol', protocols)

try {
const { stream, protocol } = await mss.select(connection, protocols, {
log: this.logger.forComponent('libp2p:mss:select')
connection.log.trace('selecting encrypter from %s', protocols)

const {
stream,
protocol
} = await mss.select(connection, protocols, {
log: connection.log,
yieldBytes: true
})

const encrypter = this.connectionEncryption.get(protocol)

if (encrypter == null) {
throw new Error(`no crypto module found for ${protocol}`)
}

this.log('encrypting outbound connection to %p', remotePeerId)
connection.log('encrypting outbound connection to %p using %p', remotePeerId)

return {
...await encrypter.secureOutbound(this.components.peerId, stream, remotePeerId),
Expand All @@ -665,15 +705,22 @@ export class DefaultUpgrader implements Upgrader {
const protocols = Array.from(muxers.keys())
this.log('outbound selecting muxer %s', protocols)
try {
const { stream, protocol } = await mss.select(connection, protocols, {
log: this.logger.forComponent('libp2p:mss:select')
connection.log.trace('selecting stream muxer from %s', protocols)

const {
stream,
protocol
} = await mss.select(connection, protocols, {
log: connection.log,
yieldBytes: true
})
this.log('%s selected as muxer protocol', protocol)

connection.log('selected %s as muxer protocol', protocol)
const muxerFactory = muxers.get(protocol)

return { stream, muxerFactory }
} catch (err: any) {
this.log.error('error multiplexing outbound stream', err)
connection.log.error('error multiplexing outbound stream', err)
throw new CodeError(String(err), codes.ERR_MUXER_UNAVAILABLE)
}
}
Expand Down
23 changes: 12 additions & 11 deletions packages/libp2p/test/connection-manager/direct.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { codes as ErrorCodes } from '../../src/errors.js'
import { createLibp2pNode, type Libp2pNode } from '../../src/libp2p.js'
import { DefaultTransportManager } from '../../src/transport-manager.js'
import { ECHO_PROTOCOL, echo } from '../fixtures/echo-service.js'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { TransportManager } from '@libp2p/interface-internal/transport-manager'
import type { Multiaddr } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -303,10 +304,10 @@ describe('libp2p.dialer (direct, TCP)', () => {
],
connectionEncryption: [
plaintext()
]
})
await remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => {
void pipe(stream, stream)
],
services: {
echo: echo()
}
})

await remoteLibp2p.start()
Expand Down Expand Up @@ -348,9 +349,9 @@ describe('libp2p.dialer (direct, TCP)', () => {

const connection = await libp2p.dial(remotePeerId)
expect(connection).to.exist()
const stream = await connection.newStream('/echo/1.0.0')
const stream = await connection.newStream(ECHO_PROTOCOL)
expect(stream).to.exist()
expect(stream).to.have.property('protocol', '/echo/1.0.0')
expect(stream).to.have.property('protocol', ECHO_PROTOCOL)
await connection.close()
})

Expand Down Expand Up @@ -388,7 +389,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
const connection = await libp2p.dial(remoteLibp2p.getMultiaddrs())

// Create local to remote streams
const stream = await connection.newStream('/echo/1.0.0')
const stream = await connection.newStream([ECHO_PROTOCOL, '/other/1.0.0'])
await connection.newStream('/stream-count/3')
await libp2p.dialProtocol(remoteLibp2p.peerId, '/stream-count/4')

Expand All @@ -398,8 +399,8 @@ describe('libp2p.dialer (direct, TCP)', () => {
source.push(uint8ArrayFromString('hello'))

// Create remote to local streams
await remoteLibp2p.dialProtocol(libp2p.peerId, '/stream-count/1')
await remoteLibp2p.dialProtocol(libp2p.peerId, '/stream-count/2')
await remoteLibp2p.dialProtocol(libp2p.peerId, ['/stream-count/1', '/other/1.0.0'])
await remoteLibp2p.dialProtocol(libp2p.peerId, ['/stream-count/2', '/other/1.0.0'])

// Verify stream count
const remoteConn = remoteLibp2p.getConnections(libp2p.peerId)
Expand Down Expand Up @@ -497,9 +498,9 @@ describe('libp2p.dialer (direct, TCP)', () => {

const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist()
const stream = await connection.newStream('/echo/1.0.0')
const stream = await connection.newStream(ECHO_PROTOCOL)
expect(stream).to.exist()
expect(stream).to.have.property('protocol', '/echo/1.0.0')
expect(stream).to.have.property('protocol', ECHO_PROTOCOL)
await connection.close()
expect(protectorProtectSpy.callCount).to.equal(1)
})
Expand Down
21 changes: 19 additions & 2 deletions packages/libp2p/test/connection-manager/index.node.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { start } from '@libp2p/interface/startable'
import { mockConnection, mockDuplex, mockMultiaddrConnection } from '@libp2p/interface-compliance-tests/mocks'
import { expect } from 'aegir/chai'
import delay from 'delay'
import all from 'it-all'
import { pipe } from 'it-pipe'
import pWaitFor from 'p-wait-for'
import sinon from 'sinon'
import { stubInterface } from 'sinon-ts'
Expand All @@ -13,6 +15,7 @@ import { DefaultConnectionManager } from '../../src/connection-manager/index.js'
import { codes } from '../../src/errors.js'
import { createBaseOptions } from '../fixtures/base-options.browser.js'
import { createNode, createPeerId } from '../fixtures/creators/peer.js'
import { ECHO_PROTOCOL, echo } from '../fixtures/echo-service.js'
import type { Libp2p } from '../../src/index.js'
import type { Libp2pNode } from '../../src/libp2p.js'
import type { ConnectionGater } from '@libp2p/interface/connection-gater'
Expand Down Expand Up @@ -401,6 +404,9 @@ describe('libp2p.connections', () => {
peerId: peerIds[1],
addresses: {
listen: ['/ip4/127.0.0.1/tcp/0/ws']
},
services: {
echo: echo()
}
})
})
Expand Down Expand Up @@ -591,16 +597,23 @@ describe('libp2p.connections', () => {
},
connectionGater: {
denyInboundUpgradedConnection
},
services: {
echo: echo()
}
})
})
await remoteLibp2p.peerStore.patch(libp2p.peerId, {
multiaddrs: libp2p.getMultiaddrs()
})
await remoteLibp2p.dial(libp2p.peerId)
const connection = await remoteLibp2p.dial(libp2p.peerId)
const stream = await connection.newStream(ECHO_PROTOCOL)
const input = [Uint8Array.from([0])]
const output = await pipe(input, stream, async (source) => all(source))

expect(denyInboundUpgradedConnection.called).to.be.true()
expect(denyInboundUpgradedConnection.getCall(0)).to.have.nested.property('args[0].multihash.digest').that.equalBytes(remoteLibp2p.peerId.multihash.digest)
expect(output.map(b => b.subarray())).to.deep.equal(input)
})

it('intercept outbound upgraded', async () => {
Expand All @@ -620,10 +633,14 @@ describe('libp2p.connections', () => {
await libp2p.peerStore.patch(remoteLibp2p.peerId, {
multiaddrs: remoteLibp2p.getMultiaddrs()
})
await libp2p.dial(remoteLibp2p.peerId)
const connection = await libp2p.dial(remoteLibp2p.peerId)
const stream = await connection.newStream(ECHO_PROTOCOL)
const input = [Uint8Array.from([0])]
const output = await pipe(input, stream, async (source) => all(source))

expect(denyOutboundUpgradedConnection.called).to.be.true()
expect(denyOutboundUpgradedConnection.getCall(0)).to.have.nested.property('args[0].multihash.digest').that.equalBytes(remoteLibp2p.peerId.multihash.digest)
expect(output.map(b => b.subarray())).to.deep.equal(input)
})
})
})
3 changes: 2 additions & 1 deletion packages/libp2p/test/content-routing/dht/utils.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export const subsystemMulticodecs = [
'/ipfs/lan/kad/1.0.0'
'/ipfs/lan/kad/1.0.0',
'/other/1.0.0'
]
Loading

0 comments on commit 0b4a2ee

Please sign in to comment.