Skip to content

Commit

Permalink
feat: add message byte batching
Browse files Browse the repository at this point in the history
  • Loading branch information
wemeetagain committed Aug 14, 2023
1 parent e4fac56 commit 0902fdb
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 9 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@
"@libp2p/interface": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"abortable-iterator": "^5.0.1",
"it-batched-bytes": "^2.0.3",
"it-foreach": "^2.0.3",
"it-pipe": "^3.0.1",
"it-pushable": "^3.2.0",
Expand Down
18 changes: 17 additions & 1 deletion src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ export interface Config {
* This ensures that a single stream doesn't hog a connection.
*/
maxMessageSize: number

/**
* Each byte array written into a multiplexed stream is converted to one or
* more messages which are sent as byte arrays to the remote node. Sending
* lots of small messages can be expensive - use this setting to batch up
* the serialized bytes of all messages sent during the current tick up to
* this limit to send in one go similar to Nagle's algorithm. N.b. you
* should benchmark your application carefully when using this setting as it
* may cause the opposite of the desired effect. Omit this setting to send
* all messages as they become available. (default: 1)
*/
minSendBytes: number
}

export const defaultConfig: Config = {
Expand All @@ -62,7 +74,8 @@ export const defaultConfig: Config = {
maxOutboundStreams: 1_000,
initialStreamWindowSize: INITIAL_STREAM_WINDOW,
maxStreamWindowSize: MAX_STREAM_WINDOW,
maxMessageSize: 64 * 1024
maxMessageSize: 64 * 1024,
minSendBytes: 1
}

export function verifyConfig (config: Config): void {
Expand All @@ -87,4 +100,7 @@ export function verifyConfig (config: Config): void {
if (config.maxMessageSize < 1024) {
throw new CodeError('MaxMessageSize must be greater than a kilobyte', ERR_INVALID_CONFIG)
}
if (config.minSendBytes <= 0) {
throw new CodeError('MinSendBytes must be greater than 0', ERR_INVALID_CONFIG)
}
}
28 changes: 20 additions & 8 deletions src/muxer.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { CodeError } from '@libp2p/interface/errors'
import { logger, type Logger } from '@libp2p/logger'
import { abortableSource } from 'abortable-iterator'
import batchedBytes from 'it-batched-bytes'
import { pipe } from 'it-pipe'
import { pushable, type Pushable } from 'it-pushable'
import { pushableV, type PushableV } from 'it-pushable'
import { type Config, defaultConfig, verifyConfig } from './config.js'
import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js'
import { Decoder } from './decode.js'
Expand Down Expand Up @@ -43,12 +44,13 @@ export interface CloseOptions extends AbortOptions {

export class YamuxMuxer implements StreamMuxer {
protocol = YAMUX_PROTOCOL_ID
source: Pushable<Uint8Array>
source: AsyncGenerator<Uint8Array>
sink: Sink<Source<Uint8ArrayList | Uint8Array>, Promise<void>>

private readonly config: Config
private readonly log?: Logger

private readonly _source: PushableV<Uint8Array>
/** Used to close the muxer from either the sink or source */
private readonly closeController: AbortController

Expand Down Expand Up @@ -80,7 +82,7 @@ export class YamuxMuxer implements StreamMuxer {

constructor (init: YamuxMuxerInit) {
this.client = init.direction === 'outbound'
this.config = { ...defaultConfig, ...init }
const config = this.config = { ...defaultConfig, ...init }
this.log = this.config.log
verifyConfig(this.config)

Expand All @@ -91,7 +93,7 @@ export class YamuxMuxer implements StreamMuxer {

this._streams = new Map()

this.source = pushable({
this._source = pushableV({
onEnd: (): void => {
this.log?.trace('muxer source ended')

Expand All @@ -100,6 +102,16 @@ export class YamuxMuxer implements StreamMuxer {
})
}
})
this.source = pipe(
this._source,
async function * (source) {
yield * batchedBytes(source, {
size: config.minSendBytes,
serialize: (bufs, list) => { list.appendAll(bufs) }

})
}
)

this.sink = async (source: Source<Uint8ArrayList | Uint8Array>): Promise<void> => {
source = abortableSource(
Expand Down Expand Up @@ -322,7 +334,7 @@ export class YamuxMuxer implements StreamMuxer {
this.closeController.abort()

// stop the source
this.source.end()
this._source.end()
}

/** Create a new stream */
Expand Down Expand Up @@ -538,10 +550,10 @@ export class YamuxMuxer implements StreamMuxer {
if (data === undefined) {
throw new CodeError('invalid frame', ERR_INVALID_FRAME)
}
this.source.push(encodeHeader(header))
this.source.push(data)
this._source.push(encodeHeader(header))
this._source.push(data)
} else {
this.source.push(encodeHeader(header))
this._source.push(encodeHeader(header))
}
}

Expand Down

0 comments on commit 0902fdb

Please sign in to comment.