Skip to content

Commit

Permalink
fix: close streams when protocol limits are reached (#1301)
Browse files Browse the repository at this point in the history
- If a stream is opened that exceeds inbound/outbound limits, reset that stream (if it is incoming) or abort and throw (if it is outgoing)
- Make the error message more helpful (say which protocol has breached the limit)
- Increase the default stream limits so we don't trigger this by accident when a remote dials us with a protocol we don't support
  • Loading branch information
achingbrain authored Jul 22, 2022
1 parent 54450d4 commit 3c0fb13
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 17 deletions.
6 changes: 3 additions & 3 deletions examples/delegated-routing/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@
"version": "0.1.0",
"private": true,
"dependencies": {
"@chainsafe/libp2p-noise": "^6.2.0",
"@chainsafe/libp2p-noise": "^7.0.1",
"ipfs-core": "^0.14.1",
"libp2p": "../../",
"@libp2p/delegated-content-routing": "^2.0.1",
"@libp2p/delegated-peer-routing": "^2.0.1",
"@libp2p/kad-dht": "^3.0.0",
"@libp2p/mplex": "^3.0.0",
"@libp2p/webrtc-star": "^2.0.1",
"@libp2p/mplex": "^4.0.1",
"@libp2p/webrtc-star": "^3.0.0",
"@libp2p/websockets": "^3.0.0",
"react": "^17.0.2",
"react-dom": "^17.0.2",
Expand Down
6 changes: 3 additions & 3 deletions examples/libp2p-in-the-browser/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@
},
"license": "ISC",
"dependencies": {
"@chainsafe/libp2p-noise": "^6.2.0",
"@chainsafe/libp2p-noise": "^7.0.1",
"@libp2p/bootstrap": "^2.0.0",
"@libp2p/mplex": "^3.0.0",
"@libp2p/webrtc-star": "^2.0.1",
"@libp2p/mplex": "^4.0.1",
"@libp2p/webrtc-star": "^3.0.0",
"@libp2p/websockets": "^3.0.0",
"libp2p": "../../"
},
Expand Down
4 changes: 2 additions & 2 deletions examples/webrtc-direct/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
"license": "ISC",
"dependencies": {
"@libp2p/webrtc-direct": "^2.0.0",
"@chainsafe/libp2p-noise": "^6.2.0",
"@chainsafe/libp2p-noise": "^7.0.1",
"@libp2p/bootstrap": "^2.0.0",
"@libp2p/mplex": "^3.0.0",
"@libp2p/mplex": "^4.0.1",
"libp2p": "../../",
"wrtc": "^0.4.7"
},
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -171,16 +171,16 @@
"@libp2p/bootstrap": "^2.0.0",
"@libp2p/daemon-client": "^2.0.0",
"@libp2p/daemon-server": "^2.0.0",
"@libp2p/delegated-content-routing": "^2.0.0",
"@libp2p/delegated-peer-routing": "^2.0.0",
"@libp2p/delegated-content-routing": "^2.0.1",
"@libp2p/delegated-peer-routing": "^2.0.1",
"@libp2p/floodsub": "^3.0.0",
"@libp2p/interface-compliance-tests": "^3.0.1",
"@libp2p/interface-connection-encrypter-compliance-tests": "^1.0.0",
"@libp2p/interface-mocks": "^3.0.1",
"@libp2p/interop": "^2.0.0",
"@libp2p/kad-dht": "^3.0.0",
"@libp2p/mdns": "^3.0.0",
"@libp2p/mplex": "^4.0.0",
"@libp2p/mplex": "^4.0.1",
"@libp2p/pubsub": "^3.0.1",
"@libp2p/tcp": "^3.0.0",
"@libp2p/topology": "^3.0.0",
Expand Down
4 changes: 2 additions & 2 deletions src/registrar.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import type { Components } from '@libp2p/components'

const log = logger('libp2p:registrar')

export const DEFAULT_MAX_INBOUND_STREAMS = 1
export const DEFAULT_MAX_OUTBOUND_STREAMS = 1
export const DEFAULT_MAX_INBOUND_STREAMS = 32
export const DEFAULT_MAX_OUTBOUND_STREAMS = 64

/**
* Responsible for notifying registered protocols of events in the network.
Expand Down
9 changes: 7 additions & 2 deletions src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
const streamCount = countStreams(protocol, 'inbound', connection)

if (streamCount === incomingLimit) {
throw errCode(new Error('Too many incoming protocol streams'), codes.ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS)
muxedStream.abort(errCode(new Error(`Too many inbound protocol streams for protocol "${protocol}" - limit ${incomingLimit}`), codes.ERR_TOO_MANY_INBOUND_PROTOCOL_STREAMS))

return
}

muxedStream.stat.protocol = protocol
Expand Down Expand Up @@ -430,7 +432,10 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
const streamCount = countStreams(protocol, 'outbound', connection)

if (streamCount === outgoingLimit) {
throw errCode(new Error('Too many outgoing protocol streams'), codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)
const err = errCode(new Error(`Too many outbound protocol streams for protocol "${protocol}" - limit ${outgoingLimit}`), codes.ERR_TOO_MANY_OUTBOUND_PROTOCOL_STREAMS)
muxedStream.abort(err)

throw err
}

muxedStream.stat.protocol = protocol
Expand Down
10 changes: 9 additions & 1 deletion test/identify/service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,19 @@ describe('libp2p.dialer.identifyService', () => {
await identityServiceIdentifySpy.firstCall.returnValue
sinon.stub(libp2p, 'isStarted').returns(true)

// Cause supported protocols to change
await libp2p.handle('/echo/2.0.0', () => {})

// Wait for push to complete
await pWaitFor(() => identityServicePushSpy.callCount === 1)
await identityServicePushSpy.firstCall.returnValue

// Cause supported protocols to change back
await libp2p.unhandle('/echo/2.0.0')

// the protocol change event listener in the identity service is async
// Wait for push to complete a second time
await pWaitFor(() => identityServicePushSpy.callCount === 2)
await identityServicePushSpy.secondCall.returnValue

// Verify the remote peer is notified of both changes
expect(identityServicePushSpy.callCount).to.equal(2)
Expand Down
2 changes: 1 addition & 1 deletion test/upgrading/upgrader.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ describe('libp2p.upgrader', () => {
expect(streamCount).to.equal(1)

await expect(localToRemote.newStream(protocol)).to.eventually.be.rejected()
.with.property('code', 'ERR_UNDER_READ')
.with.property('code', 'ERR_MPLEX_STREAM_RESET')
})

it('should limit the number of outgoing streams that can be opened using a protocol', async () => {
Expand Down

0 comments on commit 3c0fb13

Please sign in to comment.