Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
feat: limit internal message buffer size (#174)
Browse files Browse the repository at this point in the history
Adds a new config option `maxStreamBufferSize` which controls how
large the input message buffer for a multiplexed stream is allowed to
grow - defaults to 4MB [similar to go-mplex](https://github.com/libp2p/go-mplex/blob/master/multiplex.go#L26).

If a stream handler does not process messages fast enough and the input
buffer reaches this size, the stream will be reset and an error thrown
to the handler.

See option 2 in the [spec implementation notes](https://github.com/libp2p/specs/blob/master/mplex/README.md#implementation-notes).
  • Loading branch information
achingbrain committed Jun 13, 2022
1 parent 4da0829 commit 0c8e1b0
Show file tree
Hide file tree
Showing 6 changed files with 118 additions and 4 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Creates a factory that can be used to create new muxers.

* `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB)
* `maxStreamsPerConnection` - a number that defines how many streams are allowed per connection (default: 1024)
* `maxStreamBufferSize` - a number that defines how large the message buffer is allowed to grow (default: 1024 * 1024 * 4 - e.g. 4MB)

### `const muxer = factory.createStreamMuxer(components, [options])`

Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
"any-signal": "^3.0.0",
"err-code": "^3.0.1",
"it-pipe": "^2.0.3",
"it-pushable": "^2.0.1",
"it-pushable": "^3.0.0",
"it-stream-types": "^1.0.4",
"uint8arraylist": "^1.4.0",
"uint8arrays": "^3.0.0",
Expand All @@ -158,6 +158,7 @@
"@types/varint": "^6.0.0",
"aegir": "^37.0.10",
"cborg": "^1.8.1",
"delay": "^5.0.0",
"iso-random-stream": "^2.0.2",
"it-all": "^1.0.6",
"it-drain": "^1.0.5",
Expand Down
7 changes: 7 additions & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@ export interface MplexInit {
* one time. An attempt to open more than this will throw.
*/
maxStreamsPerConnection?: number

/**
* Incoming stream messages are buffered until processed by the stream
* handler. If the buffer reaches this size in bytes the stream will
* be reset.
*/
maxStreamBufferSize?: number
}

export class Mplex implements StreamMuxerFactory {
Expand Down
22 changes: 21 additions & 1 deletion src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import type { MplexInit } from './index.js'
const log = logger('libp2p:mplex')

const MAX_STREAMS_PER_CONNECTION = 1024
const MAX_STREAM_BUFFER_SIZE = 1024 * 1024 * 4 // 4MB

function printMessage (msg: Message) {
const output: any = {
Expand Down Expand Up @@ -222,7 +223,10 @@ export class MplexStreamMuxer implements StreamMuxer {
}
}
}
const source = pushableV<Message>({ onEnd })
const source = pushableV<Message>({
objectMode: true,
onEnd
})

return Object.assign(encode(source), {
push: source.push,
Expand Down Expand Up @@ -258,9 +262,25 @@ export class MplexStreamMuxer implements StreamMuxer {
return
}

const maxBufferSize = this._init.maxStreamBufferSize ?? MAX_STREAM_BUFFER_SIZE

switch (type) {
case MessageTypes.MESSAGE_INITIATOR:
case MessageTypes.MESSAGE_RECEIVER:
if (stream.source.readableLength > maxBufferSize) {
// Stream buffer has got too large, reset the stream
this._source.push({
id: message.id,
type: type === MessageTypes.MESSAGE_INITIATOR ? MessageTypes.RESET_RECEIVER : MessageTypes.RESET_INITIATOR
})

// Inform the stream consumer they are not fast enough
const error = errCode(new Error('Input buffer full - increase Mplex maxBufferSize to accomodate slow consumers'), 'ERR_STREAM_INPUT_BUFFER_FULL')
stream.abort(error)

return
}

// We got data from the remote, push it into our local stream
stream.source.push(message.data.slice())
break
Expand Down
2 changes: 1 addition & 1 deletion src/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ export function createStream (options: Options): MplexStream {
onSinkEnd()
},

source: pushable<Uint8Array>({
source: pushable({
onEnd: onSourceEnd
}),

Expand Down
87 changes: 86 additions & 1 deletion test/mplex.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,15 @@
import { expect } from 'aegir/chai'
import { Mplex } from '../src/index.js'
import { Components } from '@libp2p/interfaces/components'
import type { NewStreamMessage } from '../src/message-types.js'
import { CloseInitiatorMessage, Message, MessageInitiatorMessage, MessageTypes, NewStreamMessage } from '../src/message-types.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
import { encode } from '../src/encode.js'
import all from 'it-all'
import type { Source } from 'it-stream-types'
import delay from 'delay'
import pDefer from 'p-defer'
import { decode } from '../src/decode.js'

describe('mplex', () => {
it('should restrict number of initiator streams per connection', async () => {
Expand Down Expand Up @@ -54,4 +58,85 @@ describe('mplex', () => {

await expect(all(muxer.source)).to.eventually.be.rejected.with.property('code', 'ERR_TOO_MANY_STREAMS')
})

it('should reset a stream that fills the message buffer', async () => {
let sent = 0
const streamSourceError = pDefer<Error>()
const maxStreamBufferSize = 1024 * 1024 // 1MB
const id = 17

// simulate a new incoming stream that sends lots of data
const input: Source<Message> = (async function * send () {
const newStreamMessage: NewStreamMessage = {
id,
type: MessageTypes.NEW_STREAM,
data: new Uint8Array(1024)
}
yield newStreamMessage

await delay(10)

for (let i = 0; i < 100; i++) {
const dataMessage: MessageInitiatorMessage = {
id,
type: MessageTypes.MESSAGE_INITIATOR,
data: new Uint8Array(1024 * 1024)
}
yield dataMessage

sent++

await delay(10)
}

await delay(10)

const closeMessage: CloseInitiatorMessage = {
id,
type: MessageTypes.CLOSE_INITIATOR
}
yield closeMessage
})()

// create the muxer
const factory = new Mplex({
maxStreamBufferSize
})
const muxer = factory.createStreamMuxer(new Components(), {
onIncomingStream () {
// do nothing with the stream so the buffer fills up
},
onStreamEnd (stream) {
void all(stream.source)
.then(() => {
streamSourceError.reject(new Error('Stream source did not error'))
})
.catch(err => {
// should have errored before all messages were sent
expect(sent).to.equal(2)
streamSourceError.resolve(err)
})
}
})

// collect outgoing mplex messages
const muxerFinished = pDefer()
let messages: Message[][] = []
void Promise.resolve().then(async () => {
messages = await all(decode(muxer.source))
muxerFinished.resolve()
})

// the muxer processes the messages
await muxer.sink(encode(input))

// source should have errored with appropriate code
const err = await streamSourceError.promise
expect(err).to.have.property('code', 'ERR_STREAM_INPUT_BUFFER_FULL')

// should have sent reset message to peer for this stream
await muxerFinished.promise
expect(messages).to.have.nested.property('[0][0].id', id)
expect(messages).to.have.nested.property('[0][0].type', MessageTypes.RESET_RECEIVER)
})
})

0 comments on commit 0c8e1b0

Please sign in to comment.