Skip to content

Commit

Permalink
fix: remove abortable iterator from muxer (#63)
Browse files Browse the repository at this point in the history
* fix: remove abortable iterator from muxer

We use an abortable source for every connection - this introduces
a lot of latency.

This PR removes abortable iterator to just listen on the abort event
on the shutdown controller's abort signal and close the connection's
iterator.

This is the same thing that abortable source does except we don't
race every `next` from the source.

I've added a simple benchmark - in my testing it increases
throughput by 30% or so, this seems to translate to 10-15% in "real world"
performance using the `@libp2p/perf` testing protocol.

* chore: log error if causing sink to return fails
  • Loading branch information
achingbrain committed Nov 12, 2023
1 parent fe69086 commit 064bf1c
Show file tree
Hide file tree
Showing 4 changed files with 99 additions and 15 deletions.
15 changes: 15 additions & 0 deletions benchmark/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"name": "yamux-benchmarks",
"version": "0.0.1",
"description": "Benchmark for Libp2p's Yamux specification in Js",
"main": "benchmark.js",
"type": "module",
"author": "",
"license": "ISC",
"dependencies": {
"it-drain": "^3.0.3",
"it-pair": "^2.0.6",
"it-pipe": "^3.0.1",
"uint8arraylist": "^2.4.3"
}
}
56 changes: 56 additions & 0 deletions benchmark/stream-transfer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import drain from 'it-drain'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { Uint8ArrayList } from 'uint8arraylist'
import { yamux } from '../dist/src/index.js'

const DATA_LENGTH = 1024 * 1024 * 1024 * 5
const CHUNK_SIZE = (1024 * 1024) / 4
const ITERATIONS = 10

const results = []

for (let i = 0; i < ITERATIONS; i++) {
const p = duplexPair()
const muxerA = yamux()().createStreamMuxer({
direction: 'outbound'
})
const muxerB = yamux()().createStreamMuxer({
direction: 'inbound',
onIncomingStream: (stream) => {
// echo stream back to itself
pipe(stream, stream)
}
})

// pipe data through muxers
pipe(p[0], muxerA, p[0])
pipe(p[1], muxerB, p[1])

const stream = await muxerA.newStream()

const start = Date.now()

await pipe(
async function * () {
for (let i = 0; i < DATA_LENGTH; i += CHUNK_SIZE) {
yield * new Uint8ArrayList(new Uint8Array(CHUNK_SIZE))
}
},
stream,
(source) => drain(source)
)

const finish = Date.now() - start

muxerA.close()
muxerB.close()

results.push(finish)
}

const megs = DATA_LENGTH / (1024 * 1024)
const secs = (results.reduce((acc, curr) => acc + curr, 0) / results.length) / 1000

// eslint-disable-next-line no-console
console.info((megs / secs).toFixed(2), 'MB/s')
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@
"dependencies": {
"@libp2p/interface": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"abortable-iterator": "^5.0.1",
"get-iterator": "^2.0.1",
"it-foreach": "^2.0.3",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.0",
Expand Down
41 changes: 27 additions & 14 deletions src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger, type Logger } from '@libp2p/logger'
import { abortableSource } from 'abortable-iterator'
import { pipe } from 'it-pipe'
import { getIterator } from 'get-iterator'
import { pushable, type Pushable } from 'it-pushable'
import { type Config, defaultConfig, verifyConfig } from './config.js'
import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js'
Expand Down Expand Up @@ -104,23 +103,33 @@ export class YamuxMuxer implements StreamMuxer {
})

this.sink = async (source: Source<Uint8ArrayList | Uint8Array>): Promise<void> => {
source = abortableSource(
source,
this.closeController.signal,
{ returnOnAbort: true }
)
const shutDownListener = (): void => {
const iterator = getIterator(source)

if (iterator.return != null) {
const res = iterator.return()

if (isPromise(res)) {
res.catch(err => {
this.log?.('could not cause sink source to return', err)
})
}
}
}

let reason, error
try {
const decoder = new Decoder(source)
await pipe(
decoder.emitFrames.bind(decoder),
async source => {
for await (const { header, readData } of source) {
await this.handleFrame(header, readData)
}

try {
this.closeController.signal.addEventListener('abort', shutDownListener)

for await (const frame of decoder.emitFrames()) {
await this.handleFrame(frame.header, frame.readData)
}
)
} finally {
this.closeController.signal.removeEventListener('abort', shutDownListener)
}

reason = GoAwayCode.NormalTermination
} catch (err: unknown) {
Expand Down Expand Up @@ -580,3 +589,7 @@ export class YamuxMuxer implements StreamMuxer {
})
}
}

function isPromise <T = unknown> (thing: any): thing is Promise<T> {
return thing != null && typeof thing.then === 'function'
}

0 comments on commit 064bf1c

Please sign in to comment.