diff --git a/doc/API.md b/doc/API.md index d0c5b604db..651be03d00 100644 --- a/doc/API.md +++ b/doc/API.md @@ -22,6 +22,7 @@ * [`pubsub.publish`](#pubsubpublish) * [`pubsub.subscribe`](#pubsubsubscribe) * [`pubsub.unsubscribe`](#pubsubunsubscribe) + * [`connectionManager.setPeerValue`](#connectionmanagersetpeervalue) * [`metrics.global`](#metricsglobal) * [`metrics.peers`](#metricspeers) * [`metrics.protocols`](#metricsprotocols) @@ -92,7 +93,7 @@ Required keys in the `options` object: -Once you have a libp2p instance, you are able to listen to several events it emmits, so that you can be noticed of relevant network events. +Once you have a libp2p instance, you are able to listen to several events it emits, so that you can be noticed of relevant network events.
Events @@ -666,12 +667,8 @@ Enables users to change the value of certain peers in a range of 0 to 1. Peers w #### Example ```js -const topic = 'topic' -const handler = (msg) => { - // msg.data - pubsub data received -} - -libp2p.pubsub.unsubscribe(topic, handler) +libp2p.connectionManager.setPeerValue(highPriorityPeerId, 1) +libp2p.connectionManager.setPeerValue(lowPriorityPeerId, 0) ``` ### metrics.global diff --git a/doc/DIALER.md b/doc/DIALER.md index 920372b1c9..bba39d5b70 100644 --- a/doc/DIALER.md +++ b/doc/DIALER.md @@ -1,6 +1,7 @@ # js-libp2p Dialer **Synopsis** +* Parallel dials to the same peer will yield the same connection/error when the first dial settles. * All Dial Requests in js-libp2p must request a token(s) from the Dialer. * The number of tokens requested should be between 1 and the MAX_PER_PEER_DIALS max set in the Dialer. * If the number of available tokens is less than requested, the Dialer may return less than requested. diff --git a/src/circuit/index.js b/src/circuit/index.js index d5d293bfef..e55846d68d 100644 --- a/src/circuit/index.js +++ b/src/circuit/index.js @@ -109,7 +109,7 @@ class Circuit { let disconnectOnFailure = false let relayConnection = this._registrar.getConnection(new PeerInfo(relayPeer)) if (!relayConnection) { - relayConnection = await this._dialer.connectToMultiaddr(relayAddr, options) + relayConnection = await this._dialer.connectToPeer(relayAddr, options) disconnectOnFailure = true } diff --git a/src/circuit/listener.js b/src/circuit/listener.js index 2569c50039..3d48d685fe 100644 --- a/src/circuit/listener.js +++ b/src/circuit/listener.js @@ -24,7 +24,7 @@ module.exports = (circuit) => { listener.listen = async (addr) => { const [addrString] = String(addr).split('/p2p-circuit').slice(-1) - const relayConn = await circuit._dialer.connectToMultiaddr(multiaddr(addrString)) + const relayConn = await circuit._dialer.connectToPeer(multiaddr(addrString)) const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit') listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr) diff --git a/src/dialer/index.js b/src/dialer/index.js index 224f4e309c..b51938a17f 100644 --- a/src/dialer/index.js +++ b/src/dialer/index.js @@ -4,6 +4,8 @@ const multiaddr = require('multiaddr') const errCode = require('err-code') const TimeoutController = require('timeout-abort-controller') const anySignal = require('any-signal') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') const debug = require('debug') const log = debug('libp2p:dialer') log.error = debug('libp2p:dialer:error') @@ -38,7 +40,7 @@ class Dialer { this.timeout = timeout this.perPeerLimit = perPeerLimit this.tokens = [...new Array(concurrency)].map((_, index) => index) - this._pendingDials = new Set() + this._pendingDials = new Map() } /** @@ -56,72 +58,111 @@ class Dialer { } /** - * Connects to the first success of a given list of `Multiaddr`. `addrs` should - * include the id of the peer being dialed, it will be used for encryption verification. + * Connects to a given `PeerId` or `Multiaddr` by dialing all of its known addresses. + * The dial to the first address that is successfully able to upgrade a connection + * will be used. * - * @param {Array|Multiaddr} addrs + * @param {PeerInfo|Multiaddr} peer The peer to dial * @param {object} [options] * @param {AbortSignal} [options.signal] An AbortController signal * @returns {Promise} */ - async connectToMultiaddr (addrs, options = {}) { - if (!Array.isArray(addrs)) addrs = [multiaddr(addrs)] - - const dialAction = (addr, options) => { - if (options.signal.aborted) throw errCode(new Error('already aborted'), 'ERR_ALREADY_ABORTED') - return this.transportManager.dial(addr, options) + async connectToPeer (peer, options = {}) { + const dialTarget = this._createDialTarget(peer) + if (dialTarget.addrs.length === 0) { + throw errCode(new Error('The dial request has no addresses'), 'ERR_NO_DIAL_MULTIADDRS') } - const dialRequest = new DialRequest({ - addrs, - dialAction, - dialer: this - }) - - // Combine the timeout signal and options.signal, if provided - const timeoutController = new TimeoutController(this.timeout) - const signals = [timeoutController.signal] - options.signal && signals.push(options.signal) - const signal = anySignal(signals) - - const dial = { - dialRequest, - controller: timeoutController - } - this._pendingDials.add(dial) + const pendingDial = this._pendingDials.get(dialTarget.id) || this._createPendingDial(dialTarget, options) try { - const dialResult = await dialRequest.run({ ...options, signal }) - log('dial succeeded to %s', dialResult.remoteAddr) - return dialResult + const connection = await pendingDial.promise + log('dial succeeded to %s', dialTarget.id) + return connection } catch (err) { // Error is a timeout - if (timeoutController.signal.aborted) { + if (pendingDial.controller.signal.aborted) { err.code = codes.ERR_TIMEOUT } log.error(err) throw err } finally { - timeoutController.clear() - this._pendingDials.delete(dial) + pendingDial.destroy() } } /** - * Connects to a given `PeerInfo` or `PeerId` by dialing all of its known addresses. - * The dial to the first address that is successfully able to upgrade a connection - * will be used. - * - * @param {PeerId} peerId The remote peer id to dial + * @typedef DialTarget + * @property {string} id + * @property {Multiaddr[]} addrs + */ + + /** + * Creates a DialTarget. The DialTarget is used to create and track + * the DialRequest to a given peer. + * @private + * @param {PeerInfo|Multiaddr} peer A PeerId or Multiaddr + * @returns {DialTarget} + */ + _createDialTarget (peer) { + const dialable = Dialer.getDialable(peer) + if (multiaddr.isMultiaddr(dialable)) { + return { + id: dialable.toString(), + addrs: [dialable] + } + } + const addrs = this.peerStore.multiaddrsForPeer(dialable) + return { + id: dialable.id.toString(), + addrs + } + } + + /** + * @typedef PendingDial + * @property {DialRequest} dialRequest + * @property {TimeoutController} controller + * @property {Promise} promise + * @property {function():void} destroy + */ + + /** + * Creates a PendingDial that wraps the underlying DialRequest + * @private + * @param {DialTarget} dialTarget * @param {object} [options] * @param {AbortSignal} [options.signal] An AbortController signal - * @returns {Promise} + * @returns {PendingDial} */ - connectToPeer (peerId, options = {}) { - const addrs = this.peerStore.multiaddrsForPeer(peerId) + _createPendingDial (dialTarget, options) { + const dialAction = (addr, options) => { + if (options.signal.aborted) throw errCode(new Error('already aborted'), 'ERR_ALREADY_ABORTED') + return this.transportManager.dial(addr, options) + } - // TODO: ensure the peer id is on the multiaddr + const dialRequest = new DialRequest({ + addrs: dialTarget.addrs, + dialAction, + dialer: this + }) + + // Combine the timeout signal and options.signal, if provided + const timeoutController = new TimeoutController(this.timeout) + const signals = [timeoutController.signal] + options.signal && signals.push(options.signal) + const signal = anySignal(signals) - return this.connectToMultiaddr(addrs, options) + const pendingDial = { + dialRequest, + controller: timeoutController, + promise: dialRequest.run({ ...options, signal }), + destroy: () => { + timeoutController.clear() + this._pendingDials.delete(dialTarget.id) + } + } + this._pendingDials.set(dialTarget.id, pendingDial) + return pendingDial } getTokens (num) { @@ -137,6 +178,37 @@ class Dialer { log('token %d released', token) this.tokens.push(token) } + + /** + * Converts the given `peer` into a `PeerInfo` or `Multiaddr`. + * @static + * @param {PeerInfo|PeerId|Multiaddr|string} peer + * @returns {PeerInfo|Multiaddr} + */ + static getDialable (peer) { + if (PeerInfo.isPeerInfo(peer)) return peer + if (typeof peer === 'string') { + peer = multiaddr(peer) + } + + let addr + if (multiaddr.isMultiaddr(peer)) { + addr = peer + try { + peer = PeerId.createFromCID(peer.getPeerId()) + } catch (err) { + // Couldn't get the PeerId, just use the address + return peer + } + } + + if (PeerId.isPeerId(peer)) { + peer = new PeerInfo(peer) + } + + addr && peer.multiaddrs.add(addr) + return peer + } } module.exports = Dialer diff --git a/src/index.js b/src/index.js index c84b0c6e3d..3fc56692ac 100644 --- a/src/index.js +++ b/src/index.js @@ -6,12 +6,11 @@ const log = debug('libp2p') log.error = debug('libp2p:error') const PeerInfo = require('peer-info') -const multiaddr = require('multiaddr') const peerRouting = require('./peer-routing') const contentRouting = require('./content-routing') const pubsub = require('./pubsub') -const { getPeerInfo, getPeerInfoRemote } = require('./get-peer-info') +const { getPeerInfo } = require('./get-peer-info') const { validate: validateConfig } = require('./config') const { codes } = require('./errors') @@ -51,8 +50,6 @@ class Libp2p extends EventEmitter { this._transport = [] // Transport instances/references this._discovery = new Map() // Discovery service instances/references - this.peerStore = new PeerStore() - if (this._options.metrics.enabled) { this.metrics = new Metrics(this._options.metrics) } @@ -62,7 +59,7 @@ class Libp2p extends EventEmitter { localPeer: this.peerInfo.id, metrics: this.metrics, onConnection: (connection) => { - const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer)) + const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer), { silent: true }) this.registrar.onConnect(peerInfo, connection) this.connectionManager.onConnect(connection) this.emit('peer:connect', peerInfo) @@ -74,7 +71,7 @@ class Libp2p extends EventEmitter { } }, onConnectionEnd: (connection) => { - const peerInfo = getPeerInfo(connection.remotePeer) + const peerInfo = Dialer.getDialable(connection.remotePeer) this.registrar.onDisconnect(peerInfo, connection) this.connectionManager.onDisconnect(connection) @@ -266,27 +263,22 @@ class Libp2p extends EventEmitter { * @returns {Promise} */ async dialProtocol (peer, protocols, options) { + const dialable = Dialer.getDialable(peer) let connection - if (multiaddr.isMultiaddr(peer)) { - connection = await this.dialer.connectToMultiaddr(peer, options) - } else { - peer = await getPeerInfoRemote(peer, this) - connection = await this.dialer.connectToPeer(peer.id, options) + if (PeerInfo.isPeerInfo(dialable)) { + this.peerStore.put(dialable, { silent: true }) + connection = this.registrar.getConnection(dialable) } - const peerInfo = getPeerInfo(connection.remotePeer) + if (!connection) { + connection = await this.dialer.connectToPeer(dialable, options) + } // If a protocol was provided, create a new stream if (protocols) { - const stream = await connection.newStream(protocols) - - peerInfo.protocols.add(stream.protocol) - this.peerStore.put(peerInfo) - - return stream + return connection.newStream(protocols) } - this.peerStore.put(peerInfo) return connection } @@ -428,11 +420,10 @@ class Libp2p extends EventEmitter { // If auto dialing is on and we have no connection to the peer, check if we should dial if (this._config.peerDiscovery.autoDial === true && !this.registrar.getConnection(peerInfo)) { const minPeers = this._options.connectionManager.minPeers || 0 - // TODO: This does not account for multiple connections to a peer - if (minPeers > this.registrar.connections.size) { - log('connecting to discovered peer') + if (minPeers > this.connectionManager._connections.size) { + log('connecting to discovered peer %s', peerInfo.id.toString()) try { - await this.dialer.connectToPeer(peerInfo.id) + await this.dialer.connectToPeer(peerInfo) } catch (err) { log.error('could not connect to discovered peer', err) } diff --git a/src/peer-store/index.js b/src/peer-store/index.js index 321fd204e9..72a7667512 100644 --- a/src/peer-store/index.js +++ b/src/peer-store/index.js @@ -36,11 +36,16 @@ class PeerStore extends EventEmitter { /** * Stores the peerInfo of a new peer. - * If already exist, its info is updated. + * If already exist, its info is updated. If `silent` is set to + * true, no 'peer' event will be emitted. This can be useful if you + * are already in the process of dialing the peer. The peer is technically + * known, but may not have been added to the PeerStore yet. * @param {PeerInfo} peerInfo + * @param {object} [options] + * @param {boolean} [options.silent] (Default=false) * @return {PeerInfo} */ - put (peerInfo) { + put (peerInfo, options = { silent: false }) { assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') let peer @@ -50,8 +55,8 @@ class PeerStore extends EventEmitter { } else { peer = this.add(peerInfo) - // Emit the new peer found - this.emit('peer', peerInfo) + // Emit the peer if silent = false + !options.silent && this.emit('peer', peerInfo) } return peer } @@ -219,13 +224,12 @@ class PeerStore extends EventEmitter { } /** - * Returns the known multiaddrs for a given `PeerId` - * @param {PeerId} peerId + * Returns the known multiaddrs for a given `PeerInfo` + * @param {PeerInfo} peer * @returns {Array} */ - multiaddrsForPeer (peerId) { - const peerInfo = this.get(peerId.toB58String()) - return peerInfo.multiaddrs.toArray() + multiaddrsForPeer (peer) { + return this.put(peer, true).multiaddrs.toArray() } } diff --git a/src/registrar.js b/src/registrar.js index f4c3868069..d91f2bba17 100644 --- a/src/registrar.js +++ b/src/registrar.js @@ -119,8 +119,11 @@ class Registrar { assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info') const connections = this.connections.get(peerInfo.id.toString()) - // TODO: what should we return - return connections ? connections[0] : null + // Return the first, open connection + if (connections) { + return connections.find(connection => connection.stat.status === 'open') + } + return null } /** diff --git a/src/upgrader.js b/src/upgrader.js index afef9d192c..146735b90f 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -98,7 +98,6 @@ class Upgrader { } catch (err) { log.error('Failed to upgrade inbound connection', err) await maConn.close(err) - // TODO: We shouldn't throw here, as there isn't anything to catch the failure throw err } @@ -248,9 +247,11 @@ class Upgrader { pipe(muxedConnection, muxer, muxedConnection) maConn.timeline.upgraded = Date.now() - const timelineProxy = new Proxy(maConn.timeline, { + const _timeline = maConn.timeline + maConn.timeline = new Proxy(_timeline, { set: (...args) => { - if (args[1] === 'close' && args[2]) { + if (args[1] === 'close' && args[2] && !_timeline.close) { + connection.stat.status = 'closed' this.onConnectionEnd(connection) } @@ -266,7 +267,7 @@ class Upgrader { remotePeer: remotePeer, stat: { direction, - timeline: timelineProxy, + timeline: maConn.timeline, multiplexer: Muxer.multicodec, encryption: cryptoProtocol }, diff --git a/test/dialing/direct.node.js b/test/dialing/direct.node.js index fd4ec73b78..d6b4588d09 100644 --- a/test/dialing/direct.node.js +++ b/test/dialing/direct.node.js @@ -14,8 +14,10 @@ const PeerId = require('peer-id') const PeerInfo = require('peer-info') const delay = require('delay') const pDefer = require('p-defer') +const pSettle = require('p-settle') const pipe = require('it-pipe') const AggregateError = require('aggregate-error') +const { Connection } = require('libp2p-interfaces/src/connection') const { AbortError } = require('libp2p-interfaces/src/transport/errors') const Libp2p = require('../../src') @@ -29,6 +31,7 @@ const swarmKeyBuffer = Buffer.from(require('../fixtures/swarm.key')) const mockUpgrader = require('../utils/mockUpgrader') const createMockConnection = require('../utils/mockConnection') const Peers = require('../fixtures/peers') +const { createPeerInfo } = require('../utils/creators/peer') const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') const unsupportedAddr = multiaddr('/ip4/127.0.0.1/tcp/9999/ws') @@ -65,7 +68,7 @@ describe('Dialing (direct, TCP)', () => { it('should be able to connect to a remote node via its multiaddr', async () => { const dialer = new Dialer({ transportManager: localTM }) - const connection = await dialer.connectToMultiaddr(remoteAddr) + const connection = await dialer.connectToPeer(remoteAddr) expect(connection).to.exist() await connection.close() }) @@ -73,7 +76,8 @@ describe('Dialing (direct, TCP)', () => { it('should be able to connect to a remote node via its stringified multiaddr', async () => { const dialer = new Dialer({ transportManager: localTM }) - const connection = await dialer.connectToMultiaddr(remoteAddr.toString()) + const dialable = Dialer.getDialable(remoteAddr.toString()) + const connection = await dialer.connectToPeer(dialable) expect(connection).to.exist() await connection.close() }) @@ -81,7 +85,7 @@ describe('Dialing (direct, TCP)', () => { it('should fail to connect to an unsupported multiaddr', async () => { const dialer = new Dialer({ transportManager: localTM }) - await expect(dialer.connectToMultiaddr(unsupportedAddr)) + await expect(dialer.connectToPeer(unsupportedAddr)) .to.eventually.be.rejectedWith(AggregateError) .and.to.have.nested.property('._errors[0].code', ErrorCodes.ERR_TRANSPORT_UNAVAILABLE) }) @@ -113,7 +117,7 @@ describe('Dialing (direct, TCP)', () => { peerInfo.multiaddrs.add(remoteAddr) peerStore.put(peerInfo) - const connection = await dialer.connectToPeer(peerId) + const connection = await dialer.connectToPeer(peerInfo) expect(connection).to.exist() await connection.close() }) @@ -147,15 +151,23 @@ describe('Dialing (direct, TCP)', () => { throw new AbortError() }) - await expect(dialer.connectToMultiaddr(remoteAddr)) + await expect(dialer.connectToPeer(remoteAddr)) .to.eventually.be.rejectedWith(Error) .and.to.have.property('code', ErrorCodes.ERR_TIMEOUT) }) it('should dial to the max concurrency', async () => { + const addrs = [ + '/ip4/0.0.0.0/tcp/8000', + '/ip4/0.0.0.0/tcp/8001', + '/ip4/0.0.0.0/tcp/8002' + ] const dialer = new Dialer({ transportManager: localTM, - concurrency: 2 + concurrency: 2, + peerStore: { + multiaddrsForPeer: () => addrs + } }) expect(dialer.tokens).to.have.length(2) @@ -163,8 +175,10 @@ describe('Dialing (direct, TCP)', () => { const deferredDial = pDefer() sinon.stub(localTM, 'dial').callsFake(() => deferredDial.promise) + const [peerInfo] = await createPeerInfo() + // Perform 3 multiaddr dials - dialer.connectToMultiaddr([remoteAddr, remoteAddr, remoteAddr]) + dialer.connectToPeer(peerInfo) // Let the call stack run await delay(0) @@ -206,9 +220,10 @@ describe('Dialing (direct, TCP)', () => { connEncryption: [Crypto] } }) + remoteLibp2p.peerInfo.multiaddrs.add(listenAddr) remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => pipe(stream, stream)) - await remoteLibp2p.transportManager.listen([listenAddr]) + await remoteLibp2p.start() remoteAddr = remoteLibp2p.transportManager.getAddrs()[0] }) @@ -230,7 +245,7 @@ describe('Dialing (direct, TCP)', () => { } }) - sinon.spy(libp2p.dialer, 'connectToMultiaddr') + sinon.spy(libp2p.dialer, 'connectToPeer') const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() @@ -238,7 +253,7 @@ describe('Dialing (direct, TCP)', () => { expect(stream).to.exist() expect(protocol).to.equal('/echo/1.0.0') await connection.close() - expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1) + expect(libp2p.dialer.connectToPeer.callCount).to.equal(1) }) it('should use the dialer for connecting to a peer', async () => { @@ -251,7 +266,7 @@ describe('Dialing (direct, TCP)', () => { } }) - sinon.spy(libp2p.dialer, 'connectToMultiaddr') + sinon.spy(libp2p.dialer, 'connectToPeer') const remotePeer = new PeerInfo(remoteLibp2p.peerInfo.id) remotePeer.multiaddrs.add(remoteAddr) @@ -261,7 +276,7 @@ describe('Dialing (direct, TCP)', () => { expect(stream).to.exist() expect(protocol).to.equal('/echo/1.0.0') await connection.close() - expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1) + expect(libp2p.dialer.connectToPeer.callCount).to.equal(1) }) it('should be able to use hangup to close connections', async () => { @@ -296,7 +311,7 @@ describe('Dialing (direct, TCP)', () => { sinon.spy(libp2p.upgrader.protector, 'protect') sinon.stub(remoteLibp2p.upgrader, 'protector').value(new Protector(swarmKeyBuffer)) - const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) + const connection = await libp2p.dialer.connectToPeer(remoteAddr) expect(connection).to.exist() const { stream, protocol } = await connection.newStream('/echo/1.0.0') expect(stream).to.exist() @@ -304,5 +319,95 @@ describe('Dialing (direct, TCP)', () => { await connection.close() expect(libp2p.upgrader.protector.protect.callCount).to.equal(1) }) + + it('should coalesce parallel dials to the same peer (no id in multiaddr)', async () => { + libp2p = new Libp2p({ + peerInfo, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } + }) + const dials = 10 + + const dialResults = await Promise.all([...new Array(dials)].map((_, index) => { + if (index % 2 === 0) return libp2p.dial(remoteLibp2p.peerInfo) + return libp2p.dial(remoteLibp2p.peerInfo.multiaddrs.toArray()[0]) + })) + + // All should succeed and we should have ten results + expect(dialResults).to.have.length(10) + for (const connection of dialResults) { + expect(Connection.isConnection(connection)).to.equal(true) + } + + // We will have two connections, since the multiaddr dial doesn't have a peer id + expect(libp2p.connectionManager._connections.size).to.equal(2) + expect(remoteLibp2p.connectionManager._connections.size).to.equal(2) + }) + + it('should coalesce parallel dials to the same peer (id in multiaddr)', async () => { + libp2p = new Libp2p({ + peerInfo, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } + }) + const dials = 10 + + const fullAddress = remoteAddr.encapsulate(`/p2p/${remoteLibp2p.peerInfo.id.toString()}`) + const dialResults = await Promise.all([...new Array(dials)].map((_, index) => { + if (index % 2 === 0) return libp2p.dial(remoteLibp2p.peerInfo) + return libp2p.dial(fullAddress) + })) + + // All should succeed and we should have ten results + expect(dialResults).to.have.length(10) + for (const connection of dialResults) { + expect(Connection.isConnection(connection)).to.equal(true) + } + + // 1 connection, because we know the peer in the multiaddr + expect(libp2p.connectionManager._connections.size).to.equal(1) + expect(remoteLibp2p.connectionManager._connections.size).to.equal(1) + }) + + it('should coalesce parallel dials to the same error on failure', async () => { + libp2p = new Libp2p({ + peerInfo, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } + }) + const dials = 10 + const error = new Error('Boom') + sinon.stub(libp2p.transportManager, 'dial').callsFake(() => Promise.reject(error)) + + const fullAddress = remoteAddr.encapsulate(`/p2p/${remoteLibp2p.peerInfo.id.toString()}`) + const dialResults = await pSettle([...new Array(dials)].map((_, index) => { + if (index % 2 === 0) return libp2p.dial(remoteLibp2p.peerInfo) + return libp2p.dial(fullAddress) + })) + + // All should succeed and we should have ten results + expect(dialResults).to.have.length(10) + for (const result of dialResults) { + expect(result).to.have.property('isRejected', true) + expect(result.reason).to.be.an.instanceof(AggregateError) + // All errors should be the exact same as `error` + for (const err of result.reason) { + expect(err).to.equal(error) + } + } + + // 1 connection, because we know the peer in the multiaddr + expect(libp2p.connectionManager._connections.size).to.equal(0) + expect(remoteLibp2p.connectionManager._connections.size).to.equal(0) + }) }) }) diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index 5281acd574..e7cff9c979 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -28,6 +28,7 @@ const Peers = require('../fixtures/peers') const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser') const mockUpgrader = require('../utils/mockUpgrader') const createMockConnection = require('../utils/mockConnection') +const { createPeerId } = require('../utils/creators/peer') const unsupportedAddr = multiaddr('/ip4/127.0.0.1/tcp/9999/ws') const remoteAddr = MULTIADDRS_WEBSOCKETS[0] @@ -80,17 +81,27 @@ describe('Dialing (direct, WebSockets)', () => { }) it('should be able to connect to a remote node via its multiaddr', async () => { - const dialer = new Dialer({ transportManager: localTM }) + const dialer = new Dialer({ + transportManager: localTM, + peerStore: { + multiaddrsForPeer: () => [remoteAddr] + } + }) - const connection = await dialer.connectToMultiaddr(remoteAddr) + const connection = await dialer.connectToPeer(remoteAddr) expect(connection).to.exist() await connection.close() }) it('should be able to connect to a remote node via its stringified multiaddr', async () => { - const dialer = new Dialer({ transportManager: localTM }) + const dialer = new Dialer({ + transportManager: localTM, + peerStore: { + multiaddrsForPeer: () => [remoteAddr] + } + }) - const connection = await dialer.connectToMultiaddr(remoteAddr.toString()) + const connection = await dialer.connectToPeer(remoteAddr.toString()) expect(connection).to.exist() await connection.close() }) @@ -98,7 +109,7 @@ describe('Dialing (direct, WebSockets)', () => { it('should fail to connect to an unsupported multiaddr', async () => { const dialer = new Dialer({ transportManager: localTM }) - await expect(dialer.connectToMultiaddr(unsupportedAddr)) + await expect(dialer.connectToPeer(unsupportedAddr)) .to.eventually.be.rejectedWith(AggregateError) .and.to.have.nested.property('._errors[0].code', ErrorCodes.ERR_TRANSPORT_DIAL_FAILED) }) @@ -134,7 +145,10 @@ describe('Dialing (direct, WebSockets)', () => { it('should abort dials on queue task timeout', async () => { const dialer = new Dialer({ transportManager: localTM, - timeout: 50 + timeout: 50, + peerStore: { + multiaddrsForPeer: () => [remoteAddr] + } }) sinon.stub(localTM, 'dial').callsFake(async (addr, options) => { expect(options.signal).to.exist() @@ -145,7 +159,7 @@ describe('Dialing (direct, WebSockets)', () => { throw new AbortError() }) - await expect(dialer.connectToMultiaddr(remoteAddr)) + await expect(dialer.connectToPeer(remoteAddr)) .to.eventually.be.rejected() .and.to.have.property('code', ErrorCodes.ERR_TIMEOUT) }) @@ -153,7 +167,10 @@ describe('Dialing (direct, WebSockets)', () => { it('should dial to the max concurrency', async () => { const dialer = new Dialer({ transportManager: localTM, - concurrency: 2 + concurrency: 2, + peerStore: { + multiaddrsForPeer: () => [remoteAddr, remoteAddr, remoteAddr] + } }) expect(dialer.tokens).to.have.length(2) @@ -161,8 +178,9 @@ describe('Dialing (direct, WebSockets)', () => { const deferredDial = pDefer() sinon.stub(localTM, 'dial').callsFake(() => deferredDial.promise) + const [peerId] = await createPeerId() // Perform 3 multiaddr dials - dialer.connectToMultiaddr([remoteAddr, remoteAddr, remoteAddr]) + dialer.connectToPeer(peerId) // Let the call stack run await delay(0) @@ -185,7 +203,10 @@ describe('Dialing (direct, WebSockets)', () => { it('.destroy should abort pending dials', async () => { const dialer = new Dialer({ transportManager: localTM, - concurrency: 2 + concurrency: 2, + peerStore: { + multiaddrsForPeer: () => [remoteAddr, remoteAddr, remoteAddr] + } }) expect(dialer.tokens).to.have.length(2) @@ -201,7 +222,8 @@ describe('Dialing (direct, WebSockets)', () => { }) // Perform 3 multiaddr dials - const dialPromise = dialer.connectToMultiaddr([remoteAddr, remoteAddr, remoteAddr]) + const [peerId] = await createPeerId() + const dialPromise = dialer.connectToPeer(peerId) // Let the call stack run await delay(0) @@ -265,7 +287,8 @@ describe('Dialing (direct, WebSockets)', () => { } }) - sinon.spy(libp2p.dialer, 'connectToMultiaddr') + sinon.spy(libp2p.dialer, 'connectToPeer') + sinon.spy(libp2p.peerStore, 'put') const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() @@ -273,7 +296,8 @@ describe('Dialing (direct, WebSockets)', () => { expect(stream).to.exist() expect(protocol).to.equal('/echo/1.0.0') await connection.close() - expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1) + expect(libp2p.dialer.connectToPeer.callCount).to.equal(1) + expect(libp2p.peerStore.put.callCount).to.be.at.least(1) }) it('should run identify automatically after connecting', async () => { @@ -290,7 +314,7 @@ describe('Dialing (direct, WebSockets)', () => { sinon.spy(libp2p.peerStore, 'replace') sinon.spy(libp2p.upgrader, 'onConnection') - const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) + const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() // Wait for onConnection to be called diff --git a/test/dialing/relay.node.js b/test/dialing/relay.node.js index 4f477680a5..c1b42e8598 100644 --- a/test/dialing/relay.node.js +++ b/test/dialing/relay.node.js @@ -42,13 +42,19 @@ describe('Dialing (via relay, TCP)', () => { // Reset multiaddrs and start libp2p.peerInfo.multiaddrs.clear() libp2p.peerInfo.multiaddrs.add('/ip4/0.0.0.0/tcp/0') - libp2p.start() + return libp2p.start() })) }) afterEach(() => { // Stop each node - return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(libp2p => libp2p.stop())) + return Promise.all([srcLibp2p, relayLibp2p, dstLibp2p].map(async libp2p => { + await libp2p.stop() + // Clear the peer stores + for (const peerId of libp2p.peerStore.peers.keys()) { + libp2p.peerStore.remove(peerId) + } + })) }) it('should be able to connect to a peer over a relay with active connections', async () => { diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index 5eede683db..70563566ca 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -200,7 +200,7 @@ describe('Identify', () => { sinon.spy(libp2p.identifyService, 'identify') sinon.spy(libp2p.peerStore, 'replace') - const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) + const connection = await libp2p.dialer.connectToPeer(remoteAddr) expect(connection).to.exist() // Wait for nextTick to trigger the identify call await delay(1) @@ -221,7 +221,7 @@ describe('Identify', () => { sinon.spy(libp2p.identifyService, 'push') sinon.spy(libp2p.peerStore, 'update') - const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) + const connection = await libp2p.dialer.connectToPeer(remoteAddr) expect(connection).to.exist() // Wait for nextTick to trigger the identify call await delay(1) diff --git a/test/peer-store/peer-store.spec.js b/test/peer-store/peer-store.spec.js index 95be069f99..5988615eff 100644 --- a/test/peer-store/peer-store.spec.js +++ b/test/peer-store/peer-store.spec.js @@ -7,18 +7,12 @@ const { expect } = chai const sinon = require('sinon') const pDefer = require('p-defer') -const mergeOptions = require('merge-options') -const Libp2p = require('../../src') const PeerStore = require('../../src/peer-store') const multiaddr = require('multiaddr') - -const baseOptions = require('../utils/base-options') const peerUtils = require('../utils/creators/peer') -const mockConnection = require('../utils/mockConnection') const addr = multiaddr('/ip4/127.0.0.1/tcp/8000') -const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0') describe('peer-store', () => { let peerStore @@ -168,51 +162,6 @@ describe('peer-store', () => { }) }) -describe('peer-store on dial', () => { - let peerInfo - let remotePeerInfo - let libp2p - let remoteLibp2p - - before(async () => { - [peerInfo, remotePeerInfo] = await peerUtils.createPeerInfo({ number: 2 }) - remoteLibp2p = new Libp2p(mergeOptions(baseOptions, { - peerInfo: remotePeerInfo - })) - }) - - after(async () => { - sinon.restore() - await remoteLibp2p.stop() - libp2p && await libp2p.stop() - }) - - it('should put the remote peerInfo after dial and emit event', async () => { - const remoteId = remotePeerInfo.id.toB58String() - - libp2p = new Libp2p(mergeOptions(baseOptions, { - peerInfo - })) - - sinon.spy(libp2p.peerStore, 'put') - sinon.spy(libp2p.peerStore, 'add') - sinon.spy(libp2p.peerStore, 'update') - sinon.stub(libp2p.dialer, 'connectToMultiaddr').returns(mockConnection({ - remotePeer: remotePeerInfo.id - })) - - const connection = await libp2p.dial(listenAddr) - await connection.close() - - expect(libp2p.peerStore.put.callCount).to.equal(1) - expect(libp2p.peerStore.add.callCount).to.equal(1) - expect(libp2p.peerStore.update.callCount).to.equal(0) - - const storedPeer = libp2p.peerStore.get(remoteId) - expect(storedPeer).to.exist() - }) -}) - describe('peer-store on discovery', () => { // TODO: implement with discovery })