Skip to content

Commit

Permalink
feat!: yield uint8arraylists
Browse files Browse the repository at this point in the history
The general pattern of stream muxers is to yield protocol stream data
framed by some additional metadata - stream id, flags, etc.

The frame data can be prepended/appended to the protocol stream data
by using a `Uint8ArrayList` instead of a `Uint8Array`, this removes the
need to copy the protocol data into a new `Uint8Array` for every frame.

The new `@libp2p/interface` version allows muxers to emit
`Uint8ArrayList`s as well as `Uint8Array`s so we can send protocol
stream data to a transport in a no-copy operation.
  • Loading branch information
achingbrain committed Nov 25, 2023
1 parent 018257f commit 8495ae3
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 26 deletions.
9 changes: 5 additions & 4 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,9 @@
"docs": "aegir docs"
},
"dependencies": {
"@libp2p/interface": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"@libp2p/interface": "next",
"@libp2p/logger": "next",
"@libp2p/utils": "next",
"get-iterator": "^2.0.1",
"it-foreach": "^2.0.3",
"it-pipe": "^3.0.1",
Expand All @@ -181,8 +182,8 @@
},
"devDependencies": {
"@dapplion/benchmark": "^0.2.4",
"@libp2p/interface-compliance-tests": "^4.0.0",
"@libp2p/mplex": "^9.0.0",
"@libp2p/interface-compliance-tests": "next",
"@libp2p/mplex": "next",
"aegir": "^40.0.1",
"it-drain": "^3.0.2",
"it-pair": "^2.0.6",
Expand Down
11 changes: 6 additions & 5 deletions src/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { setMaxListeners } from '@libp2p/interface/events'
import { logger, type Logger } from '@libp2p/logger'
import { getIterator } from 'get-iterator'
import { pushable, type Pushable } from 'it-pushable'
import { Uint8ArrayList } from 'uint8arraylist'
import { type Config, defaultConfig, verifyConfig } from './config.js'
import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js'
import { Decoder } from './decode.js'
Expand All @@ -13,7 +14,6 @@ import type { AbortOptions } from '@libp2p/interface'
import type { Stream } from '@libp2p/interface/connection'
import type { StreamMuxer, StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interface/stream-muxer'
import type { Sink, Source } from 'it-stream-types'
import type { Uint8ArrayList } from 'uint8arraylist'

const YAMUX_PROTOCOL_ID = '/yamux/1.0.0'
const CLOSE_TIMEOUT = 500
Expand Down Expand Up @@ -43,7 +43,7 @@ export interface CloseOptions extends AbortOptions {

export class YamuxMuxer implements StreamMuxer {
protocol = YAMUX_PROTOCOL_ID
source: Pushable<Uint8Array>
source: Pushable<Uint8ArrayList | Uint8Array>
sink: Sink<Source<Uint8ArrayList | Uint8Array>, Promise<void>>

private readonly config: Config
Expand Down Expand Up @@ -554,14 +554,15 @@ export class YamuxMuxer implements StreamMuxer {
this.onIncomingStream?.(stream)
}

private sendFrame (header: FrameHeader, data?: Uint8Array): void {
private sendFrame (header: FrameHeader, data?: Uint8ArrayList): void {
this.log?.trace('sending frame %o', header)
if (header.type === FrameType.Data) {
if (data === undefined) {
throw new CodeError('invalid frame', ERR_INVALID_FRAME)
}
this.source.push(encodeHeader(header))
this.source.push(data)
this.source.push(
new Uint8ArrayList(encodeHeader(header), data)
)
} else {
this.source.push(encodeHeader(header))
}
Expand Down
8 changes: 4 additions & 4 deletions src/stream.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { CodeError } from '@libp2p/interface/errors'
import { AbstractStream, type AbstractStreamInit } from '@libp2p/interface/stream-muxer/stream'
import { AbstractStream, type AbstractStreamInit } from '@libp2p/utils/abstract-stream'
import each from 'it-foreach'
import { ERR_RECV_WINDOW_EXCEEDED, ERR_STREAM_ABORT, INITIAL_STREAM_WINDOW } from './constants.js'
import { Flag, type FrameHeader, FrameType, HEADER_LENGTH } from './frame.js'
Expand All @@ -17,7 +17,7 @@ export enum StreamState {

export interface YamuxStreamInit extends AbstractStreamInit {
name?: string
sendFrame: (header: FrameHeader, body?: Uint8Array) => void
sendFrame: (header: FrameHeader, body?: Uint8ArrayList) => void
getRTT: () => number
config: Config
state: StreamState
Expand Down Expand Up @@ -49,7 +49,7 @@ export class YamuxStream extends AbstractStream {
private epochStart: number
private readonly getRTT: () => number

private readonly sendFrame: (header: FrameHeader, body?: Uint8Array) => void
private readonly sendFrame: (header: FrameHeader, body?: Uint8ArrayList) => void

constructor (init: YamuxStreamInit) {
super({
Expand Down Expand Up @@ -115,7 +115,7 @@ export class YamuxStream extends AbstractStream {
flag: flags,
streamID: this._id,
length: toSend
}, buf.subarray(0, toSend))
}, buf.sublist(0, toSend))

this.sendWindowCapacity -= toSend

Expand Down
16 changes: 10 additions & 6 deletions test/mplex.util.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import { defaultLogger } from '@libp2p/logger'
import { mplex } from '@libp2p/mplex'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { type Uint8ArrayList } from 'uint8arraylist'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interface/stream-muxer'
import type { Source, Transform } from 'it-stream-types'

const factory = mplex()()
const factory = mplex()({
logger: defaultLogger()
})

export function testYamuxMuxer (name: string, client: boolean, conf: StreamMuxerInit = {}): StreamMuxer {
return factory.createStreamMuxer({
Expand Down Expand Up @@ -54,14 +58,14 @@ export function testClientServer (conf: StreamMuxerInit = {}): {
unpauseWrite: () => void
}
} {
const pair = duplexPair<Uint8Array>()
const pair = duplexPair<Uint8Array | Uint8ArrayList>()
const client = testYamuxMuxer('libp2p:mplex:client', true, conf)
const server = testYamuxMuxer('libp2p:mplex:server', false, conf)

const clientReadTransform = pauseableTransform<Uint8Array>()
const clientWriteTransform = pauseableTransform<Uint8Array>()
const serverReadTransform = pauseableTransform<Uint8Array>()
const serverWriteTransform = pauseableTransform<Uint8Array>()
const clientReadTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const clientWriteTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const serverReadTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const serverWriteTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()

void pipe(pair[0], clientReadTransform.transform, client, clientWriteTransform.transform, pair[0])
void pipe(pair[1], serverReadTransform.transform, server, serverWriteTransform.transform, pair[1])
Expand Down
5 changes: 3 additions & 2 deletions test/muxer.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import { expect } from 'aegir/chai'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { type Uint8ArrayList } from 'uint8arraylist'
import { ERR_MUXER_LOCAL_CLOSED } from '../src/constants.js'
import { sleep, testClientServer, testYamuxMuxer, type YamuxFixture } from './util.js'

Expand All @@ -29,7 +30,7 @@ describe('muxer', () => {
})

it('test client<->client', async () => {
const pair = duplexPair<Uint8Array>()
const pair = duplexPair<Uint8Array | Uint8ArrayList>()
const client1 = testYamuxMuxer('libp2p:yamux:1', true)
const client2 = testYamuxMuxer('libp2p:yamux:2', true)
void pipe(pair[0], client1, pair[0])
Expand All @@ -44,7 +45,7 @@ describe('muxer', () => {
})

it('test server<->server', async () => {
const pair = duplexPair<Uint8Array>()
const pair = duplexPair<Uint8Array | Uint8ArrayList>()
const client1 = testYamuxMuxer('libp2p:yamux:1', false)
const client2 = testYamuxMuxer('libp2p:yamux:2', false)
void pipe(pair[0], client1, pair[0])
Expand Down
11 changes: 6 additions & 5 deletions test/util.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { logger } from '@libp2p/logger'
import { duplexPair } from 'it-pair/duplex'
import { pipe } from 'it-pipe'
import { type Uint8ArrayList } from 'uint8arraylist'
import { Yamux, YamuxMuxer, type YamuxMuxerInit } from '../src/muxer.js'
import type { Config } from '../src/config.js'
import type { Source, Transform } from 'it-stream-types'
Expand Down Expand Up @@ -79,14 +80,14 @@ export function testClientServer (conf: YamuxMuxerInit = {}): {
client: YamuxFixture
server: YamuxFixture
} {
const pair = duplexPair<Uint8Array>()
const pair = duplexPair<Uint8Array | Uint8ArrayList>()
const client = testYamuxMuxer('libp2p:yamux:client', true, conf)
const server = testYamuxMuxer('libp2p:yamux:server', false, conf)

const clientReadTransform = pauseableTransform<Uint8Array>()
const clientWriteTransform = pauseableTransform<Uint8Array>()
const serverReadTransform = pauseableTransform<Uint8Array>()
const serverWriteTransform = pauseableTransform<Uint8Array>()
const clientReadTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const clientWriteTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const serverReadTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()
const serverWriteTransform = pauseableTransform<Uint8Array | Uint8ArrayList>()

void pipe(pair[0], clientReadTransform.transform, client, clientWriteTransform.transform, pair[0])
void pipe(pair[1], serverReadTransform.transform, server, serverWriteTransform.transform, pair[1])
Expand Down

0 comments on commit 8495ae3

Please sign in to comment.