Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add pending connection limit #1423

Merged
merged 3 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion doc/LIMITS.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ In order to prevent excessive resource consumption by a libp2p node it's importa

It's possible to limit the amount of incoming and outgoing connections a node is able to make. When this limit is reached and an attempt to open a new connection is made, existing connections may be closed to make room for the new connection.

We can also limit the number of connections in a "pending" state. These connections have been opened by a remote peer but peer IDs have yet to be exchanged and/or connection encryption and multiplexing negotiated. Once this limit is hit further connections will be closed unless the remote peer has an address in the [allow list](#allowdeny-lists).

```js
const node = await createLibp2pNode({
connectionManager: {
Expand All @@ -32,7 +34,12 @@ const node = await createLibp2pNode({
* If the number of open connections goes below this number, the node
* will try to connect to nearby peers from the peer store
*/
minConnections: 20
minConnections: 20,

/**
* How many connections can be open but not yet upgraded
*/
maxIncomingPendingConnections: 10
}
})
```
Expand Down
2 changes: 1 addition & 1 deletion examples/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"license": "MIT",
"dependencies": {
"@libp2p/pubsub-peer-discovery": "^6.0.2",
"@libp2p/floodsub": "^4.0.0",
"@libp2p/floodsub": "^4.0.1",
"@nodeutils/defaults-deep": "^1.1.0",
"execa": "^6.1.0",
"fs-extra": "^10.1.0",
Expand Down
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@
"@libp2p/interface-peer-info": "^1.0.3",
"@libp2p/interface-peer-routing": "^1.0.1",
"@libp2p/interface-peer-store": "^1.2.2",
"@libp2p/interface-pubsub": "^2.1.0",
"@libp2p/interface-pubsub": "^3.0.0",
"@libp2p/interface-registrar": "^2.0.3",
"@libp2p/interface-stream-muxer": "^3.0.0",
"@libp2p/interface-transport": "^2.0.0",
Expand Down Expand Up @@ -174,7 +174,7 @@
"@libp2p/bootstrap": "^4.0.0",
"@libp2p/daemon-client": "^3.0.1",
"@libp2p/daemon-server": "^3.0.1",
"@libp2p/floodsub": "^4.0.0",
"@libp2p/floodsub": "^4.0.1",
"@libp2p/interface-compliance-tests": "^3.0.2",
"@libp2p/interface-connection-encrypter-compliance-tests": "^2.0.2",
"@libp2p/interface-mocks": "^6.0.1",
Expand Down Expand Up @@ -211,4 +211,4 @@
"browser": {
"nat-api": false
}
}
}
24 changes: 22 additions & 2 deletions src/connection-manager/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ const defaultOptions: Partial<ConnectionManagerInit> = {
pollInterval: 2000,
autoDialInterval: 10000,
movingAverageInterval: 60000,
inboundConnectionThreshold: 5
inboundConnectionThreshold: 5,
maxIncomingPendingConnections: 10
}

const METRICS_SYSTEM = 'libp2p'
Expand Down Expand Up @@ -152,6 +153,12 @@ export interface ConnectionManagerInit {
* host, reject subsequent connections
*/
inboundConnectionThreshold?: number

/**
* The maximum number of parallel incoming connections allowed that have yet to
* complete the connection upgrade - e.g. choosing connection encryption, muxer, etc
*/
maxIncomingPendingConnections?: number
}

export interface ConnectionManagerEvents {
Expand All @@ -175,6 +182,7 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
private readonly allow: Multiaddr[]
private readonly deny: Multiaddr[]
private readonly inboundConnectionRateLimiter: RateLimiterMemory
private incomingPendingConnections: number

constructor (init: ConnectionManagerInit) {
super()
Expand Down Expand Up @@ -218,6 +226,8 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
points: this.opts.inboundConnectionThreshold,
duration: 1
})

this.incomingPendingConnections = 0
}

init (components: Components): void {
Expand Down Expand Up @@ -719,9 +729,17 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
})

if (allowConnection) {
this.incomingPendingConnections++

return true
}

// check pending connections
if (this.incomingPendingConnections === this.opts.maxIncomingPendingConnections) {
log('connection from %s refused - incomingPendingConnections exceeded by peer %s', maConn.remoteAddr)
return false
}

if (maConn.remoteAddr.isThinWaistAddress()) {
const host = maConn.remoteAddr.nodeAddress().address

Expand All @@ -734,6 +752,8 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}

if (this.getConnections().length < this.opts.maxConnections) {
this.incomingPendingConnections++

return true
}

Expand All @@ -742,6 +762,6 @@ export class DefaultConnectionManager extends EventEmitter<ConnectionManagerEven
}

afterUpgradeInbound () {

this.incomingPendingConnections--
}
}
4 changes: 3 additions & 1 deletion src/pubsub/dummy-pubsub.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { EventEmitter } from '@libp2p/interfaces/events'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PublishResult, PubSub, PubSubEvents, StrictNoSign, StrictSign } from '@libp2p/interface-pubsub'
import type { PublishResult, PubSub, PubSubEvents, StrictNoSign, StrictSign, TopicValidatorFn } from '@libp2p/interface-pubsub'
import errCode from 'err-code'
import { messages, codes } from '../errors.js'

export class DummyPubSub extends EventEmitter<PubSubEvents> implements PubSub {
public topicValidators = new Map<string, TopicValidatorFn>()

isStarted (): boolean {
return false
}
Expand Down
3 changes: 1 addition & 2 deletions src/upgrader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,6 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
const accept = await this.components.getConnectionManager().acceptIncomingConnection(maConn)

if (!accept) {
await maConn.close()
throw errCode(new Error('connection denied'), codes.ERR_CONNECTION_DENIED)
}

Expand Down Expand Up @@ -201,7 +200,6 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
}
} catch (err: any) {
log.error('Failed to upgrade inbound connection', err)
await maConn.close(err)
throw err
}

Expand All @@ -228,6 +226,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
remotePeer
})
} finally {
this.components.getConnectionManager().afterUpgradeInbound()
timeoutController.clear()
}
}
Expand Down
43 changes: 43 additions & 0 deletions test/connection-manager/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,47 @@ describe('Connection Manager', () => {
await expect(connectionManager.acceptIncomingConnection(maConn))
.to.eventually.be.true()
})

it('should limit the number of inbound pending connections', async () => {
const connectionManager = new DefaultConnectionManager({
...defaultOptions,
maxIncomingPendingConnections: 1
})

const dialer = stubInterface<Dialer>()
dialer.dial.resolves(stubInterface<Connection>())

const components = new Components({
dialer
})

// set mocks
connectionManager.init(components)

// start the upgrade
const maConn1 = mockMultiaddrConnection({
source: [],
sink: async () => {}
}, await createEd25519PeerId())

await expect(connectionManager.acceptIncomingConnection(maConn1))
.to.eventually.be.true()

// start the upgrade
const maConn2 = mockMultiaddrConnection({
source: [],
sink: async () => {}
}, await createEd25519PeerId())

// should be false because we have not completed the upgrade of maConn1
await expect(connectionManager.acceptIncomingConnection(maConn2))
.to.eventually.be.false()

// finish the maConn1 pending upgrade
connectionManager.afterUpgradeInbound()

// should be true because we have now completed the upgrade of maConn1
await expect(connectionManager.acceptIncomingConnection(maConn2))
.to.eventually.be.true()
})
})