Skip to content
This repository has been archived by the owner on Jul 21, 2023. It is now read-only.

Commit

Permalink
fix: add per-connection stream limit (#173)
Browse files Browse the repository at this point in the history
Limits the total number of streams that can be opened to 1024 per connection.

This can be overridden by passing an option to the constructor.
  • Loading branch information
achingbrain authored Jun 8, 2022
1 parent 00b8cf1 commit 21371e7
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 10 deletions.
15 changes: 13 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ npm install @libp2p/mplex
import { Mplex } from '@libp2p/mplex'
import { pipe } from 'it-pipe'

const muxer = new Mplex({
const factory = new Mplex()

const muxer = factory.createStreamMuxer(components, {
onStream: stream => { // Receive a duplex stream from the remote
// ...receive data from the remote and optionally send data back
},
Expand All @@ -46,7 +48,16 @@ pipe([1, 2, 3], stream)

## API

### `const muxer = new Mplex([options])`
### `const factory = new Mplex([options])`

Creates a factory that can be used to create new muxers.

`options` is an optional `Object` that may have the following properties:

* `maxMsgSize` - a number that defines how large mplex data messages can be in bytes, if messages are larger than this they will be sent as multiple messages (default: 1048576 - e.g. 1MB)
* `maxStreamsPerConnection` - a number that defines how many streams are allowed per connection (default: 1024)

### `const muxer = factory.createStreamMuxer(components, [options])`

Create a new _duplex_ stream that can be piped together with a connection in order to allow multiplexed communications.

Expand Down
25 changes: 22 additions & 3 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,33 @@ import type { Components } from '@libp2p/interfaces/components'
import type { StreamMuxerFactory, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer'
import { MplexStreamMuxer } from './mplex.js'

export interface MplexInit extends StreamMuxerInit {
export interface MplexInit {
/**
* The maximum size of message that can be sent in one go in bytes.
* Messages larger than this will be split into multiple smaller
* messages.
*/
maxMsgSize?: number

/**
* The maximum number of multiplexed streams that can be open at any
* one time. An attempt to open more than this will throw.
*/
maxStreamsPerConnection?: number
}

export class Mplex implements StreamMuxerFactory {
public protocol = '/mplex/6.7.0'
private readonly init: MplexInit

constructor (init: MplexInit = {}) {
this.init = init
}

createStreamMuxer (components: Components, init?: MplexInit) {
return new MplexStreamMuxer(components, init)
createStreamMuxer (components: Components, init: StreamMuxerInit = {}) {
return new MplexStreamMuxer(components, {
...init,
...this.init
})
}
}
17 changes: 12 additions & 5 deletions src/mplex.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@ import type { Components } from '@libp2p/interfaces/components'
import type { Sink } from 'it-stream-types'
import type { StreamMuxer, StreamMuxerInit } from '@libp2p/interfaces/stream-muxer'
import type { Stream } from '@libp2p/interfaces/connection'
import type { MplexInit } from './index.js'

const log = logger('libp2p:mplex')

const MAX_STREAMS_PER_CONNECTION = 1024

function printMessage (msg: Message) {
const output: any = {
...msg,
Expand All @@ -38,9 +41,7 @@ export interface MplexStream extends Stream {
source: Pushable<Uint8Array>
}

export interface MplexInit extends StreamMuxerInit {
maxMsgSize?: number
}
interface MplexStreamMuxerInit extends MplexInit, StreamMuxerInit {}

export class MplexStreamMuxer implements StreamMuxer {
public protocol = '/mplex/6.7.0'
Expand All @@ -50,10 +51,10 @@ export class MplexStreamMuxer implements StreamMuxer {

private _streamId: number
private readonly _streams: { initiators: Map<number, MplexStream>, receivers: Map<number, MplexStream> }
private readonly _init: MplexInit
private readonly _init: MplexStreamMuxerInit
private readonly _source: { push: (val: Message) => void, end: (err?: Error) => void }

constructor (components: Components, init?: MplexInit) {
constructor (components: Components, init?: MplexStreamMuxerInit) {
init = init ?? {}

this._streamId = 0
Expand Down Expand Up @@ -122,6 +123,12 @@ export class MplexStreamMuxer implements StreamMuxer {
}

_newStream (options: { id: number, name: string, type: 'initiator' | 'receiver', registry: Map<number, MplexStream> }) {
const maxStreams = this._init.maxStreamsPerConnection ?? MAX_STREAMS_PER_CONNECTION

if ((this._streams.initiators.size + this._streams.receivers.size) === maxStreams) {
throw errCode(new Error('To many streams open'), 'ERR_TOO_MANY_STREAMS')
}

const { id, name, type, registry } = options

log('new %s stream %s %s', type, id, name)
Expand Down
57 changes: 57 additions & 0 deletions test/mplex.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/* eslint-env mocha */
/* eslint max-nested-callbacks: ["error", 5] */

import { expect } from 'aegir/chai'
import { Mplex } from '../src/index.js'
import { Components } from '@libp2p/interfaces/components'
import type { NewStreamMessage } from '../src/message-types.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
import { encode } from '../src/encode.js'
import all from 'it-all'

describe('mplex', () => {
it('should restrict number of initiator streams per connection', async () => {
const maxStreamsPerConnection = 10
const factory = new Mplex({
maxStreamsPerConnection
})
const components = new Components()
const muxer = factory.createStreamMuxer(components)

// max out the streams for this connection
for (let i = 0; i < maxStreamsPerConnection; i++) {
muxer.newStream()
}

// open one more
expect(() => muxer.newStream()).to.throw().with.property('code', 'ERR_TOO_MANY_STREAMS')
})

it('should restrict number of recipient streams per connection', async () => {
const maxStreamsPerConnection = 10
const factory = new Mplex({
maxStreamsPerConnection
})
const components = new Components()
const muxer = factory.createStreamMuxer(components)

// max out the streams for this connection
for (let i = 0; i < maxStreamsPerConnection; i++) {
muxer.newStream()
}

// simulate a new incoming stream
const source: NewStreamMessage[] = [{
id: 17,
type: 0,
data: uint8ArrayFromString('17')
}]

const data = uint8ArrayConcat(await all(encode(source)))

await muxer.sink([data])

await expect(all(muxer.source)).to.eventually.be.rejected.with.property('code', 'ERR_TOO_MANY_STREAMS')
})
})

0 comments on commit 21371e7

Please sign in to comment.