diff --git a/package.json b/package.json index 9a2728c..d825884 100644 --- a/package.json +++ b/package.json @@ -174,6 +174,7 @@ "@libp2p/interface": "^0.1.0", "@libp2p/logger": "^3.0.0", "abortable-iterator": "^5.0.1", + "it-batched-bytes": "^2.0.3", "it-foreach": "^2.0.3", "it-pipe": "^3.0.1", "it-pushable": "^3.2.0", diff --git a/src/config.ts b/src/config.ts index 887e461..16ed35d 100644 --- a/src/config.ts +++ b/src/config.ts @@ -52,6 +52,18 @@ export interface Config { * This ensures that a single stream doesn't hog a connection. */ maxMessageSize: number + + /** + * Each byte array written into a multiplexed stream is converted to one or + * more messages which are sent as byte arrays to the remote node. Sending + * lots of small messages can be expensive - use this setting to batch up + * the serialized bytes of all messages sent during the current tick up to + * this limit to send in one go similar to Nagle's algorithm. N.b. you + * should benchmark your application carefully when using this setting as it + * may cause the opposite of the desired effect. Omit this setting to send + * all messages as they become available. (default: 1) + */ + minSendBytes: number } export const defaultConfig: Config = { @@ -62,7 +74,8 @@ export const defaultConfig: Config = { maxOutboundStreams: 1_000, initialStreamWindowSize: INITIAL_STREAM_WINDOW, maxStreamWindowSize: MAX_STREAM_WINDOW, - maxMessageSize: 64 * 1024 + maxMessageSize: 64 * 1024, + minSendBytes: 1 } export function verifyConfig (config: Config): void { @@ -87,4 +100,7 @@ export function verifyConfig (config: Config): void { if (config.maxMessageSize < 1024) { throw new CodeError('MaxMessageSize must be greater than a kilobyte', ERR_INVALID_CONFIG) } + if (config.minSendBytes <= 0) { + throw new CodeError('MinSendBytes must be greater than 0', ERR_INVALID_CONFIG) + } } diff --git a/src/muxer.ts b/src/muxer.ts index 9f3f6ac..0bad656 100644 --- a/src/muxer.ts +++ b/src/muxer.ts @@ -1,8 +1,9 @@ import { CodeError } from '@libp2p/interface/errors' import { logger, type Logger } from '@libp2p/logger' import { abortableSource } from 'abortable-iterator' +import batchedBytes from 'it-batched-bytes' import { pipe } from 'it-pipe' -import { pushable, type Pushable } from 'it-pushable' +import { pushableV, type PushableV } from 'it-pushable' import { type Config, defaultConfig, verifyConfig } from './config.js' import { ERR_BOTH_CLIENTS, ERR_INVALID_FRAME, ERR_MAX_OUTBOUND_STREAMS_EXCEEDED, ERR_MUXER_LOCAL_CLOSED, ERR_MUXER_REMOTE_CLOSED, ERR_NOT_MATCHING_PING, ERR_STREAM_ALREADY_EXISTS, ERR_UNREQUESTED_PING, PROTOCOL_ERRORS } from './constants.js' import { Decoder } from './decode.js' @@ -43,12 +44,13 @@ export interface CloseOptions extends AbortOptions { export class YamuxMuxer implements StreamMuxer { protocol = YAMUX_PROTOCOL_ID - source: Pushable + source: AsyncGenerator sink: Sink, Promise> private readonly config: Config private readonly log?: Logger + private readonly _source: PushableV /** Used to close the muxer from either the sink or source */ private readonly closeController: AbortController @@ -80,7 +82,7 @@ export class YamuxMuxer implements StreamMuxer { constructor (init: YamuxMuxerInit) { this.client = init.direction === 'outbound' - this.config = { ...defaultConfig, ...init } + const config = this.config = { ...defaultConfig, ...init } this.log = this.config.log verifyConfig(this.config) @@ -91,7 +93,7 @@ export class YamuxMuxer implements StreamMuxer { this._streams = new Map() - this.source = pushable({ + this._source = pushableV({ onEnd: (): void => { this.log?.trace('muxer source ended') @@ -100,6 +102,16 @@ export class YamuxMuxer implements StreamMuxer { }) } }) + this.source = pipe( + this._source, + async function * (source) { + yield * batchedBytes(source, { + size: config.minSendBytes, + serialize: (bufs, list) => { list.appendAll(bufs) } + + }) + } + ) this.sink = async (source: Source): Promise => { source = abortableSource( @@ -322,7 +334,7 @@ export class YamuxMuxer implements StreamMuxer { this.closeController.abort() // stop the source - this.source.end() + this._source.end() } /** Create a new stream */ @@ -538,10 +550,10 @@ export class YamuxMuxer implements StreamMuxer { if (data === undefined) { throw new CodeError('invalid frame', ERR_INVALID_FRAME) } - this.source.push(encodeHeader(header)) - this.source.push(data) + this._source.push(encodeHeader(header)) + this._source.push(data) } else { - this.source.push(encodeHeader(header)) + this._source.push(encodeHeader(header)) } }