Skip to content

Commit

Permalink
feat!: yield uint8arraylists
Browse files Browse the repository at this point in the history
The general pattern of stream muxers is to yield protocol stream data
framed by some additional metadata - stream id, flags, etc.

The frame data can be prepended/appended to the protocol stream data
by using a `Uint8ArrayList` instead of a `Uint8Array`, this removes the
need to copy the protocol data into a new `Uint8Array` for every frame.

The new `@libp2p/interface` version allows muxers to emit
`Uint8ArrayList`s as well as `Uint8Array`s so we can send protocol
stream data to a transport in a no-copy operation.
  • Loading branch information
achingbrain committed Nov 29, 2023
1 parent f2492e9 commit 9a48fd3
Show file tree
Hide file tree
Showing 10 changed files with 66 additions and 58 deletions.
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -168,8 +168,8 @@
"docs": "aegir docs"
},
"dependencies": {
"@libp2p/interface": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"@libp2p/interface": "^1.0.0",
"@libp2p/utils": "^5.0.0",
"get-iterator": "^2.0.1",
"it-foreach": "^2.0.3",
"it-pipe": "^3.0.1",
Expand All @@ -178,8 +178,9 @@
},
"devDependencies": {
"@dapplion/benchmark": "^0.2.4",
"@libp2p/interface-compliance-tests": "^4.0.0",
"@libp2p/mplex": "^9.0.0",
"@libp2p/interface-compliance-tests": "^5.0.0",
"@libp2p/logger": "^4.0.0",
"@libp2p/mplex": "^10.0.0",
"aegir": "^41.1.10",
"it-drain": "^3.0.2",
"it-pair": "^2.0.6",
Expand Down
9 changes: 0 additions & 9 deletions src/config.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,8 @@
import { CodeError } from '@libp2p/interface/errors'
import { logger, type Logger } from '@libp2p/logger'
import { ERR_INVALID_CONFIG, INITIAL_STREAM_WINDOW, MAX_STREAM_WINDOW } from './constants.js'

// TOOD use config items or delete them
export interface Config {
/**
* Used to control the log destination
*
* It can be disabled by explicitly setting to `undefined`
*/
log?: Logger

/**
* Used to do periodic keep alive messages using a ping.
*/
Expand Down Expand Up @@ -55,7 +47,6 @@ export interface Config {
}

export const defaultConfig: Config = {
log: logger('libp2p:yamux'),
enableKeepAlive: true,
keepAliveInterval: 30_000,
maxInboundStreams: 1_000,
Expand Down
2 changes: 1 addition & 1 deletion src/decode.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { CodeError } from '@libp2p/interface/errors'
import { CodeError } from '@libp2p/interface'
import { Uint8ArrayList } from 'uint8arraylist'
import { ERR_DECODE_INVALID_VERSION, ERR_DECODE_IN_PROGRESS } from './constants.js'
import { type FrameHeader, FrameType, HEADER_LENGTH, YAMUX_VERSION } from './frame.js'
Expand Down
10 changes: 7 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,15 @@

import { Yamux } from './muxer.js'
import type { YamuxMuxerInit } from './muxer.js'
import type { StreamMuxerFactory } from '@libp2p/interface/stream-muxer'
import type { ComponentLogger, StreamMuxerFactory } from '@libp2p/interface'

export { GoAwayCode, type FrameHeader, type FrameType } from './frame.js'
export type { YamuxMuxerInit }

export function yamux (init: YamuxMuxerInit = {}): () => StreamMuxerFactory {
return () => new Yamux(init)
export interface YamuxMuxerComponents {
logger: ComponentLogger
}

export function yamux (init: YamuxMuxerInit = {}): (components: YamuxMuxerComponents) => StreamMuxerFactory {
return (components) => new Yamux(components, init)
}
34 changes: 18 additions & 16 deletions src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
import { CodeError } from '@libp2p/interface/errors'
import { setMaxListeners } from '@libp2p/interface/events'
import { logger, type Logger } from '@libp2p/logger'
import { CodeError, setMaxListeners } from '@libp2p/interface'
import { getIterator } from 'get-iterator'
import { pushable, type Pushable } from 'it-pushable'
import { Uint8ArrayList } from 'uint8arraylist'
import { type Config, defaultConfig, verifyConfig } from './config.js'
import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js'
import { Decoder } from './decode.js'
import { encodeHeader } from './encode.js'
import { Flag, type FrameHeader, FrameType, GoAwayCode } from './frame.js'
import { StreamState, YamuxStream } from './stream.js'
import type { AbortOptions } from '@libp2p/interface'
import type { Stream } from '@libp2p/interface/connection'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer'
import type { YamuxMuxerComponents } from './index.js'
import type { AbortOptions, ComponentLogger, Logger, Stream, StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface'
import type { Sink, Source } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

const YAMUX_PROTOCOL_ID = '/yamux/1.0.0'
const CLOSE_TIMEOUT = 500
Expand All @@ -23,14 +20,16 @@ export interface YamuxMuxerInit extends StreamMuxerInit, Partial<Config> {

export class Yamux implements StreamMuxerFactory {
protocol = YAMUX_PROTOCOL_ID
private readonly _components: YamuxMuxerComponents
private readonly _init: YamuxMuxerInit

constructor (init: YamuxMuxerInit = {}) {
constructor (components: YamuxMuxerComponents, init: YamuxMuxerInit = {}) {
this._components = components
this._init = init
}

createStreamMuxer (init?: YamuxMuxerInit): YamuxMuxer {
return new YamuxMuxer({
return new YamuxMuxer(this._components, {
...this._init,
...init
})
Expand All @@ -43,11 +42,12 @@ export interface CloseOptions extends AbortOptions {

export class YamuxMuxer implements StreamMuxer {
protocol = YAMUX_PROTOCOL_ID
source: Pushable<Uint8Array>
source: Pushable<Uint8ArrayList | Uint8Array>
sink: Sink<Source<Uint8ArrayList | Uint8Array>, Promise<void>>

private readonly config: Config
private readonly log?: Logger
private readonly logger: ComponentLogger

/** Used to close the muxer from either the sink or source */
private readonly closeController: AbortController
Expand Down Expand Up @@ -78,10 +78,11 @@ export class YamuxMuxer implements StreamMuxer {
private readonly onIncomingStream?: (stream: Stream) => void
private readonly onStreamEnd?: (stream: Stream) => void

constructor (init: YamuxMuxerInit) {
constructor (components: YamuxMuxerComponents, init: YamuxMuxerInit) {
this.client = init.direction === 'outbound'
this.config = { ...defaultConfig, ...init }
this.log = this.config.log
this.logger = components.logger
this.log = this.logger.forComponent('libp2p:yamux')
verifyConfig(this.config)

this.closeController = new AbortController()
Expand Down Expand Up @@ -363,7 +364,7 @@ export class YamuxMuxer implements StreamMuxer {
this.closeStream(id)
this.onStreamEnd?.(stream)
},
log: logger(`libp2p:yamux:${direction}:${id}`),
log: this.logger.forComponent(`libp2p:yamux:${direction}:${id}`),
config: this.config,
getRTT: this.getRTT.bind(this)
})
Expand Down Expand Up @@ -554,14 +555,15 @@ export class YamuxMuxer implements StreamMuxer {
this.onIncomingStream?.(stream)
}

private sendFrame (header: FrameHeader, data?: Uint8Array): void {
private sendFrame (header: FrameHeader, data?: Uint8ArrayList): void {
this.log?.trace('sending frame %o', header)
if (header.type === FrameType.Data) {
if (data === undefined) {
throw new CodeError('invalid frame', ERR_INVALID_FRAME)
}
this.source.push(encodeHeader(header))
this.source.push(data)
this.source.push(
new Uint8ArrayList(encodeHeader(header), data)
)
} else {
this.source.push(encodeHeader(header))
}
Expand Down
10 changes: 5 additions & 5 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CodeError } from '@libp2p/interface/errors'
import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface/stream-muxer/stream'
import { CodeError } from '@libp2p/interface'
import { AbstractStream, type AbstractStreamInit } from '@libp2p/utils/abstract-stream'
import each from 'it-foreach'
import { ERR_RECV_WINDOW_EXCEEDED, ERR_STREAM_ABORT, INITIAL_STREAM_WINDOW } from './constants.js'
import { Flag, type FrameHeader, FrameType, HEADER_LENGTH } from './frame.js'
Expand All @@ -17,7 +17,7 @@ export enum StreamState {

export interface YamuxStreamInit extends AbstractStreamInit {
name?: string
sendFrame(header: FrameHeader, body?: Uint8Array): void
sendFrame(header: FrameHeader, body?: Uint8ArrayList): void
getRTT(): number
config: Config
state: StreamState
Expand Down Expand Up @@ -49,7 +49,7 @@ export class YamuxStream extends AbstractStream {
private epochStart: number
private readonly getRTT: () => number

private readonly sendFrame: (header: FrameHeader, body?: Uint8Array) => void
private readonly sendFrame: (header: FrameHeader, body?: Uint8ArrayList) => void

constructor (init: YamuxStreamInit) {
super({
Expand Down Expand Up @@ -115,7 +115,7 @@ export class YamuxStream extends AbstractStream {
flag: flags,
streamID: this._id,
length: toSend
}, buf.subarray(0, toSend))
}, buf.sublist(0, toSend))

this.sendWindowCapacity -= toSend

Expand Down
5 changes: 4 additions & 1 deletion test/compliance.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
/* eslint-env mocha */

import tests from '@libp2p/interface-compliance-tests/stream-muxer'
import { defaultLogger } from '@libp2p/logger'
import { TestYamux } from './util.js'

describe('compliance', () => {
tests({
async setup () {
return new TestYamux({})
return new TestYamux({
logger: defaultLogger()
})
},
async teardown () {}
})
Expand Down
16 changes: 10 additions & 6 deletions test/mplex.util.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { defaultLogger } from '@libp2p/logger'
import { mplex } from '@libp2p/mplex'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { type Uint8ArrayList } from 'uint8arraylist'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface/stream-muxer'
import type { Source, Transform } from 'it-stream-types'

const factory = mplex()()
const factory = mplex()({
logger: defaultLogger()
})

export function testYamuxMuxer (name: string, client: boolean, conf: StreamMuxerInit = {}): StreamMuxer {
return factory.createStreamMuxer({
Expand Down Expand Up @@ -54,14 +58,14 @@ export function testClientServer (conf: StreamMuxerInit = {}): {
unpauseWrite(): void
}
} {
const pair = duplexPair<Uint8Array>()
const pair = duplexPair<Uint8Array | Uint8ArrayList>()
const client = testYamuxMuxer('libp2p:mplex:client', true, conf)
const server = testYamuxMuxer('libp2p:mplex:server', false, conf)

const clientReadTransform = pauseableTransform<Uint8Array>()
const clientWriteTransform = pauseableTransform<Uint8Array>()
const serverReadTransform = pauseableTransform<Uint8Array>()
const serverWriteTransform = pauseableTransform<Uint8Array>()
const clientReadTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const clientWriteTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const serverReadTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const serverWriteTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()

void pipe(pair[0], clientReadTransform.transform, client, clientWriteTransform.transform, pair[0])
void pipe(pair[1], serverReadTransform.transform, server, serverWriteTransform.transform, pair[1])
Expand Down
5 changes: 3 additions & 2 deletions test/muxer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import { expect } from 'aegir/chai'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { type Uint8ArrayList } from 'uint8arraylist'
import { ERR_MUXER_LOCAL_CLOSED } from '../src/constants.js'
import { sleep, testClientServer, testYamuxMuxer, type YamuxFixture } from './util.js'

Expand All @@ -29,7 +30,7 @@ describe('muxer', () => {
})

it('test client<->client', async () => {
const pair = duplexPair<Uint8Array>()
const pair = duplexPair<Uint8Array | Uint8ArrayList>()
const client1 = testYamuxMuxer('libp2p:yamux:1', true)
const client2 = testYamuxMuxer('libp2p:yamux:2', true)
void pipe(pair[0], client1, pair[0])
Expand All @@ -44,7 +45,7 @@ describe('muxer', () => {
})

it('test server<->server', async () => {
const pair = duplexPair<Uint8Array>()
const pair = duplexPair<Uint8Array | Uint8ArrayList>()
const client1 = testYamuxMuxer('libp2p:yamux:1', false)
const client2 = testYamuxMuxer('libp2p:yamux:2', false)
void pipe(pair[0], client1, pair[0])
Expand Down
24 changes: 13 additions & 11 deletions test/util.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { logger } from '@libp2p/logger'
import { prefixLogger } from '@libp2p/logger'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { type Uint8ArrayList } from 'uint8arraylist'
import { Yamux, YamuxMuxer, type YamuxMuxerInit } from '../src/muxer.js'
import type { Config } from '../src/config.js'
import type { Source, Transform } from 'it-stream-types'
Expand Down Expand Up @@ -28,16 +29,17 @@ export const testConf: Partial<Config> = {
export class TestYamux extends Yamux {
createStreamMuxer (init?: YamuxMuxerInit): YamuxMuxer {
const client = isClient()
return super.createStreamMuxer({ ...testConf, ...init, direction: client ? 'outbound' : 'inbound', log: logger(`libp2p:yamux${client ? 1 : 2}`) })
return super.createStreamMuxer({ ...testConf, ...init, direction: client ? 'outbound' : 'inbound' })
}
}

export function testYamuxMuxer (name: string, client: boolean, conf: YamuxMuxerInit = {}): YamuxMuxer {
return new YamuxMuxer({
logger: prefixLogger(name)
}, {
...testConf,
...conf,
direction: client ? 'outbound' : 'inbound',
log: logger(name)
direction: client ? 'outbound' : 'inbound'
})
}

Expand Down Expand Up @@ -79,14 +81,14 @@ export function testClientServer (conf: YamuxMuxerInit = {}): {
client: YamuxFixture
server: YamuxFixture
} {
const pair = duplexPair<Uint8Array>()
const client = testYamuxMuxer('libp2p:yamux:client', true, conf)
const server = testYamuxMuxer('libp2p:yamux:server', false, conf)
const pair = duplexPair<Uint8Array | Uint8ArrayList>()
const client = testYamuxMuxer('client', true, conf)
const server = testYamuxMuxer('server', false, conf)

const clientReadTransform = pauseableTransform<Uint8Array>()
const clientWriteTransform = pauseableTransform<Uint8Array>()
const serverReadTransform = pauseableTransform<Uint8Array>()
const serverWriteTransform = pauseableTransform<Uint8Array>()
const clientReadTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const clientWriteTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const serverReadTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const serverWriteTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()

void pipe(pair[0], clientReadTransform.transform, client, clientWriteTransform.transform, pair[0])
void pipe(pair[1], serverReadTransform.transform, server, serverWriteTransform.transform, pair[1])
Expand Down

0 comments on commit 9a48fd3

Please sign in to comment.