-
Notifications
You must be signed in to change notification settings - Fork 445
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* feat: peer-store v0 * feat: registrar * chore: apply suggestions from code review Co-Authored-By: Jacob Heun <jacobheun@gmail.com> * chore: address review * chore: support multiple conns * chore: address review * fix: no remote peer from topology on disconnect
- Loading branch information
1 parent
f3e276e
commit 797d8f0
Showing
8 changed files
with
594 additions
and
8 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
'use strict' | ||
|
||
const assert = require('assert') | ||
|
||
class Topology { | ||
/** | ||
* @param {Object} props | ||
* @param {number} props.min minimum needed connections (default: 0) | ||
* @param {number} props.max maximum needed connections (default: Infinity) | ||
* @param {Array<string>} props.multicodecs protocol multicodecs | ||
* @param {Object} props.handlers | ||
* @param {function} props.handlers.onConnect protocol "onConnect" handler | ||
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler | ||
* @constructor | ||
*/ | ||
constructor ({ | ||
min = 0, | ||
max = Infinity, | ||
multicodecs, | ||
handlers | ||
}) { | ||
assert(multicodecs, 'one or more multicodec should be provided') | ||
assert(handlers, 'the handlers should be provided') | ||
assert(handlers.onConnect && typeof handlers.onConnect === 'function', | ||
'the \'onConnect\' handler must be provided') | ||
assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function', | ||
'the \'onDisconnect\' handler must be provided') | ||
|
||
this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs] | ||
this.min = min | ||
this.max = max | ||
|
||
// Handlers | ||
this._onConnect = handlers.onConnect | ||
this._onDisconnect = handlers.onDisconnect | ||
|
||
this.peers = new Map() | ||
this._registrar = undefined | ||
|
||
this._onProtocolChange = this._onProtocolChange.bind(this) | ||
} | ||
|
||
set registrar (registrar) { | ||
this._registrar = registrar | ||
this._registrar.peerStore.on('change:protocols', this._onProtocolChange) | ||
|
||
// Update topology peers | ||
this._updatePeers(this._registrar.peerStore.peers.values()) | ||
} | ||
|
||
/** | ||
* Update topology. | ||
* @param {Array<PeerInfo>} peerInfoIterable | ||
* @returns {void} | ||
*/ | ||
_updatePeers (peerInfoIterable) { | ||
for (const peerInfo of peerInfoIterable) { | ||
if (this.multicodecs.filter(multicodec => peerInfo.protocols.has(multicodec))) { | ||
// Add the peer regardless of whether or not there is currently a connection | ||
this.peers.set(peerInfo.id.toB58String(), peerInfo) | ||
// If there is a connection, call _onConnect | ||
const connection = this._registrar.getConnection(peerInfo) | ||
connection && this._onConnect(peerInfo, connection) | ||
} else { | ||
// Remove any peers we might be tracking that are no longer of value to us | ||
this.peers.delete(peerInfo.id.toB58String()) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Notify protocol of peer disconnected. | ||
* @param {PeerInfo} peerInfo | ||
* @param {Error} [error] | ||
* @returns {void} | ||
*/ | ||
disconnect (peerInfo, error) { | ||
this._onDisconnect(peerInfo, error) | ||
} | ||
|
||
/** | ||
* Check if a new peer support the multicodecs for this topology. | ||
* @param {Object} props | ||
* @param {PeerInfo} props.peerInfo | ||
* @param {Array<string>} props.protocols | ||
*/ | ||
_onProtocolChange ({ peerInfo, protocols }) { | ||
const existingPeer = this.peers.get(peerInfo.id.toB58String()) | ||
const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol)) | ||
|
||
// Not supporting the protocol anymore? | ||
if (existingPeer && hasProtocol.length === 0) { | ||
this._onDisconnect({ | ||
peerInfo | ||
}) | ||
} | ||
|
||
// New to protocol support | ||
for (const protocol of protocols) { | ||
if (this.multicodecs.includes(protocol)) { | ||
this._updatePeers([peerInfo]) | ||
return | ||
} | ||
} | ||
} | ||
} | ||
|
||
module.exports = Topology |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
'use strict' | ||
|
||
const assert = require('assert') | ||
const debug = require('debug') | ||
const log = debug('libp2p:peer-store') | ||
log.error = debug('libp2p:peer-store:error') | ||
|
||
const { Connection } = require('libp2p-interfaces/src/connection') | ||
const PeerInfo = require('peer-info') | ||
const Toplogy = require('./connection-manager/topology') | ||
|
||
/** | ||
* Responsible for notifying registered protocols of events in the network. | ||
*/ | ||
class Registrar { | ||
/** | ||
* @param {Object} props | ||
* @param {PeerStore} props.peerStore | ||
* @constructor | ||
*/ | ||
constructor ({ peerStore }) { | ||
this.peerStore = peerStore | ||
|
||
/** | ||
* Map of connections per peer | ||
* TODO: this should be handled by connectionManager | ||
* @type {Map<string, Array<conn>>} | ||
*/ | ||
this.connections = new Map() | ||
|
||
/** | ||
* Map of topologies | ||
* | ||
* @type {Map<string, object>} | ||
*/ | ||
this.topologies = new Map() | ||
|
||
this._handle = undefined | ||
} | ||
|
||
get handle () { | ||
return this._handle | ||
} | ||
|
||
set handle (handle) { | ||
this._handle = handle | ||
} | ||
|
||
/** | ||
* Add a new connected peer to the record | ||
* TODO: this should live in the ConnectionManager | ||
* @param {PeerInfo} peerInfo | ||
* @param {Connection} conn | ||
* @returns {void} | ||
*/ | ||
onConnect (peerInfo, conn) { | ||
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') | ||
assert(Connection.isConnection(conn), 'conn must be an instance of interface-connection') | ||
|
||
const id = peerInfo.id.toB58String() | ||
const storedConn = this.connections.get(id) | ||
|
||
if (storedConn) { | ||
storedConn.push(conn) | ||
} else { | ||
this.connections.set(id, [conn]) | ||
} | ||
} | ||
|
||
/** | ||
* Remove a disconnected peer from the record | ||
* TODO: this should live in the ConnectionManager | ||
* @param {PeerInfo} peerInfo | ||
* @param {Connection} connection | ||
* @param {Error} [error] | ||
* @returns {void} | ||
*/ | ||
onDisconnect (peerInfo, connection, error) { | ||
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') | ||
|
||
const id = peerInfo.id.toB58String() | ||
let storedConn = this.connections.get(id) | ||
|
||
if (storedConn && storedConn.length > 1) { | ||
storedConn = storedConn.filter((conn) => conn.id === connection.id) | ||
} else if (storedConn) { | ||
for (const [, topology] of this.topologies) { | ||
topology.disconnect(peerInfo, error) | ||
} | ||
|
||
this.connections.delete(peerInfo.id.toB58String()) | ||
} | ||
} | ||
|
||
/** | ||
* Get a connection with a peer. | ||
* @param {PeerInfo} peerInfo | ||
* @returns {Connection} | ||
*/ | ||
getConnection (peerInfo) { | ||
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') | ||
|
||
// TODO: what should we return | ||
return this.connections.get(peerInfo.id.toB58String())[0] | ||
} | ||
|
||
/** | ||
* Register handlers for a set of multicodecs given | ||
* @param {Object} topologyProps properties for topology | ||
* @param {Array<string>|string} topologyProps.multicodecs | ||
* @param {Object} topologyProps.handlers | ||
* @param {function} topologyProps.handlers.onConnect | ||
* @param {function} topologyProps.handlers.onDisconnect | ||
* @return {string} registrar identifier | ||
*/ | ||
register (topologyProps) { | ||
// Create multicodec topology | ||
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() | ||
const topology = new Toplogy(topologyProps) | ||
|
||
this.topologies.set(id, topology) | ||
|
||
// Set registrar | ||
topology.registrar = this | ||
|
||
return id | ||
} | ||
|
||
/** | ||
* Unregister topology. | ||
* @param {string} id registrar identifier | ||
* @return {boolean} unregistered successfully | ||
*/ | ||
unregister (id) { | ||
return this.topologies.delete(id) | ||
} | ||
} | ||
|
||
module.exports = Registrar |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
'use strict' | ||
/* eslint-env mocha */ | ||
|
||
const chai = require('chai') | ||
chai.use(require('dirty-chai')) | ||
const { expect } = chai | ||
const sinon = require('sinon') | ||
|
||
const mergeOptions = require('merge-options') | ||
|
||
const multiaddr = require('multiaddr') | ||
const Libp2p = require('../../src') | ||
|
||
const baseOptions = require('../utils/base-options') | ||
const peerUtils = require('../utils/creators/peer') | ||
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') | ||
|
||
describe('registrar on dial', () => { | ||
let peerInfo | ||
let remotePeerInfo | ||
let libp2p | ||
let remoteLibp2p | ||
let remoteAddr | ||
|
||
before(async () => { | ||
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2) | ||
remoteLibp2p = new Libp2p(mergeOptions(baseOptions, { | ||
peerInfo: remotePeerInfo | ||
})) | ||
|
||
await remoteLibp2p.transportManager.listen([listenAddr]) | ||
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0] | ||
}) | ||
|
||
after(async () => { | ||
sinon.restore() | ||
await remoteLibp2p.stop() | ||
libp2p && await libp2p.stop() | ||
}) | ||
|
||
it('should inform registrar of a new connection', async () => { | ||
libp2p = new Libp2p(mergeOptions(baseOptions, { | ||
peerInfo | ||
})) | ||
|
||
sinon.spy(remoteLibp2p.registrar, 'onConnect') | ||
|
||
await libp2p.dial(remoteAddr) | ||
expect(remoteLibp2p.registrar.onConnect.callCount).to.equal(1) | ||
|
||
const libp2pConn = libp2p.registrar.getConnection(remotePeerInfo) | ||
expect(libp2pConn).to.exist() | ||
|
||
const remoteConn = remoteLibp2p.registrar.getConnection(peerInfo) | ||
expect(remoteConn).to.exist() | ||
}) | ||
}) |
Oops, something went wrong.