From 5009c2ca49f5e3f1ce8e8fe5f0bd88edbc13b5d8 Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Mon, 22 Apr 2019 16:00:06 +0200 Subject: [PATCH] feat: listen to array of multiaddrs (#104) * feat: support listen on array * chore: fix missing deps * chore: update interface version * docs: update readme for array listen * test: use port 0 * docs: add some more jsdocs * chore: fix travis support for ip6 on linux * refactor: clean up some code --- .travis.yml | 11 +- README.md | 6 +- package.json | 11 +- src/index.js | 2 +- src/listener.js | 289 ++++++++++++++++++++----------- test/adapter/listen-dial.spec.js | 4 +- test/listen-dial.spec.js | 16 +- 7 files changed, 215 insertions(+), 124 deletions(-) diff --git a/.travis.yml b/.travis.yml index 99fe0ef..d6481b1 100644 --- a/.travis.yml +++ b/.travis.yml @@ -9,20 +9,23 @@ stages: node_js: - '10' -os: - - linux - - osx - script: npx nyc -s npm run test:node -- --bail after_success: npx nyc report --reporter=text-lcov > coverage.lcov && npx codecov jobs: include: + - os: linux + sudo: false + before_script: sudo sh -c 'echo 0 > /proc/sys/net/ipv6/conf/all/disable_ipv6' + - os: windows filter_secrets: false cache: false + - os: osx + - stage: check + os: linux script: - npx aegir build --bundlesize - npx aegir commitlint --travis diff --git a/README.md b/README.md index 60519ca..b01f9a2 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,7 @@ const multiaddr = require('multiaddr') const pipe = require('it-pipe') const { collect } = require('streaming-iterables') -const mh = multiaddr('/ip4/127.0.0.1/tcp/9090') +const addr = multiaddr('/ip4/127.0.0.1/tcp/9090') const tcp = new TCP() @@ -56,10 +56,10 @@ const listener = tcp.createListener((socket) => { ) }) -await listener.listen(mh) +await listener.listen([addr]) console.log('listening') -const socket = await tcp.dial(mh) +const socket = await tcp.dial(addr) const values = await pipe( socket, collect diff --git a/package.json b/package.json index baf84d2..defd978 100644 --- a/package.json +++ b/package.json @@ -40,7 +40,6 @@ "aegir": "^18.1.1", "chai": "^4.1.2", "dirty-chai": "^2.0.1", - "interface-transport": "~0.3.6", "lodash.isfunction": "^3.0.9", "pull-stream": "^3.6.9", "sinon": "^7.3.1" @@ -48,15 +47,17 @@ "dependencies": { "abort-controller": "^3.0.0", "class-is": "^1.1.0", - "debug": "^3.1.0", + "debug": "^4.1.1", "err-code": "^1.1.2", "interface-connection": "~0.3.3", - "interface-transport": "libp2p/interface-transport#feat/async-await", + "interface-transport": "~0.4.0", "ip-address": "^5.8.9", + "it-pipe": "^1.0.0", "lodash.includes": "^4.3.0", "lodash.isfunction": "^3.0.9", - "mafmt": "^6.0.2", - "multiaddr": "^5.0.0" + "mafmt": "^6.0.7", + "multiaddr": "^6.0.6", + "streaming-iterables": "^4.1.0" }, "contributors": [ "Alan Shaw ", diff --git a/src/index.js b/src/index.js index e59da74..288c41e 100644 --- a/src/index.js +++ b/src/index.js @@ -78,7 +78,7 @@ class TCP { } handler = handler || noop - return createListener(handler) + return createListener(options, handler) } filter (multiaddrs) { diff --git a/src/listener.js b/src/listener.js index b7764b9..ec40998 100644 --- a/src/listener.js +++ b/src/listener.js @@ -2,11 +2,12 @@ const multiaddr = require('multiaddr') const os = require('os') -const includes = require('lodash.includes') const net = require('net') const EventEmitter = require('events').EventEmitter +const { AllListenersFailedError } = require('interface-transport') const debug = require('debug') const log = debug('libp2p:tcp:listen') +log.error = debug('libp2p:tcp:listen:error') const Libp2pSocket = require('./socket') const getMultiaddr = require('./get-multiaddr') @@ -14,144 +15,228 @@ const c = require('./constants') function noop () {} -module.exports = (handler) => { - const listener = new EventEmitter() +class Listener extends EventEmitter { + /** + * @constructor + * @param {object} options + * @param {function(Connection)} handler + */ + constructor (options, handler) { + super() + this._options = options + this._connectionHandler = handler + this._servers = new Set() + this.__connections = new Map() + } - const server = net.createServer((socket) => { - // Avoid uncaught errors caused by unstable connections - socket.on('error', noop) + /** + * Whether or not there is currently at least 1 server listening + * @private + * @returns {boolean} + */ + _isListening () { + return [...this._servers].filter(server => server.listening).length > 0 + } - const addr = getMultiaddr(socket) - if (!addr) { - if (socket.remoteAddress === undefined) { - log('connection closed before p2p connection made') - } else { - log('error interpreting incoming p2p connection') - } + /** + * Closes all open servers + * @param {object} options + * @param {number} options.timeout how long before closure is forced, defaults to 2000 ms + * @returns {Promise} + */ + close (options = {}) { + if (!this._isListening()) { return } - log('new connection', addr.toString()) - - const s = new Libp2pSocket(socket, addr) - trackSocket(server, socket) + // Close all running servers in parallel + return Promise.all( + [...this._servers].map(server => { + return new Promise((resolve) => { + const start = Date.now() + + // Attempt to stop the server. If it takes longer than the timeout, + // resolve the promise. Any remaining connections will be destroyed after + const timeout = setTimeout(() => { + log('Timeout closing server after %dms, destroying connections manually', Date.now() - start) + resolve() + }, options.timeout || c.CLOSE_TIMEOUT) + + // Just clear the timeout, cleanup listeners are added on server creation + server.once('close', () => clearTimeout(timeout)) + server.close((err) => { + // log the error and resolve so we don't exit early + err && log.error('an error occurred closing the server', err) + resolve() + }) + }) + }) + ).then(() => { + // Destroy all remaining connections + this.__connections.forEach((connection, key) => { + log('destroying %s', key) + connection.destroy() + }) + this.__connections.clear() + this._servers.clear() + }) + } - handler && handler(s) - listener.emit('connection', s) - }) + /** + * Creates servers listening on the given `addrs` + * @async + * @param {Array} addrs + */ + async listen (addrs) { + addrs = Array.isArray(addrs) ? addrs : [addrs] + + let listeners = [] + let errors = [] + + // Filter out duplicate ports, unless it's port 0 + addrs = uniqueBy(addrs, (addr) => { + const port = Number(addr.toOptions().port) + return isNaN(port) || port === 0 ? addr.toString() : port + }) - server.on('listening', () => listener.emit('listening')) - server.on('error', (err) => listener.emit('error', err)) - server.on('close', () => listener.emit('close')) + for (const ma of addrs) { + const lOpts = ma.toOptions() - // Keep track of open connections to destroy in case of timeout - server.__connections = {} + listeners.push( + new Promise((resolve) => { + const server = net.createServer(this._onSocket.bind(this)) + this._servers.add(server) - listener.close = (options = {}) => { - if (!server.listening) { - return - } + server.on('listening', () => this.emit('listening')) + server.on('close', () => { + this._removeServer(server) + }) + server.on('error', (err) => this.emit('error', err)) - return new Promise((resolve, reject) => { - const start = Date.now() + server.listen(lOpts.port, lOpts.host, (err) => { + if (err) { + errors.push(err) + return resolve() + } - // Attempt to stop the server. If it takes longer than the timeout, - // destroy all the underlying sockets manually. - const timeout = setTimeout(() => { - log('Timeout closing server after %dms, destroying connections manually', Date.now() - start) - Object.keys(server.__connections).forEach((key) => { - log('destroying %s', key) - server.__connections[key].destroy() + log('Listening on %s %s', lOpts.port, lOpts.host) + resolve() + }) }) - resolve() - }, options.timeout || c.CLOSE_TIMEOUT) - - server.once('close', () => clearTimeout(timeout)) - - server.close((err) => err ? reject(err) : resolve()) - }) - } - - let ipfsId - let listeningAddr - - listener.listen = (ma) => { - listeningAddr = ma - if (includes(ma.protoNames(), 'ipfs')) { - ipfsId = getIpfsId(ma) - listeningAddr = ma.decapsulate('ipfs') + ) } - const lOpts = listeningAddr.toOptions() - return new Promise((resolve, reject) => { - server.listen(lOpts.port, lOpts.host, (err) => { - if (err) { - return reject(err) - } + return Promise.all(listeners) + .then(() => { + errors.forEach((err) => { + log.error('received an error while attempting to listen', err) + }) - log('Listening on %s %s', lOpts.port, lOpts.host) - resolve() + // All servers failed to listen, throw an error + if (errors.length === listeners.length) { + throw new AllListenersFailedError() + } }) - }) } - listener.getAddrs = () => { - const multiaddrs = [] - const address = server.address() - - if (!address) { - throw new Error('Listener is not ready yet') + /** + * Removes the server from tracking and performs cleanup. + * If all servers have been closed, `close` will be emitted by + * the listener. + * @private + * @param {net.Server} server + */ + _removeServer (server) { + // only emit if we're not listening + if (!this._isListening()) { + this.emit('close') } + this._servers.delete(server) + server.removeAllListeners() + } - // Because TCP will only return the IPv6 version - // we need to capture from the passed multiaddr - if (listeningAddr.toString().indexOf('ip4') !== -1) { - let m = listeningAddr.decapsulate('tcp') - m = m.encapsulate('/tcp/' + address.port) - if (ipfsId) { - m = m.encapsulate('/ipfs/' + ipfsId) - } + /** + * Return the addresses we are listening on + * @throws + * @returns {Array} + */ + getAddrs () { + const multiaddrs = [] + this._servers.forEach(server => { + const address = server.address() - if (m.toString().indexOf('0.0.0.0') !== -1) { + if (address.address === '0.0.0.0') { const netInterfaces = os.networkInterfaces() Object.keys(netInterfaces).forEach((niKey) => { netInterfaces[niKey].forEach((ni) => { - if (ni.family === 'IPv4') { - multiaddrs.push(multiaddr(m.toString().replace('0.0.0.0', ni.address))) + if (address.family === ni.family) { + multiaddrs.push( + multiaddr.fromNodeAddress({ + ...address, + address: ni.address + }, 'tcp') + ) } }) }) } else { - multiaddrs.push(m) - } - } - - if (address.family === 'IPv6') { - let ma = multiaddr('/ip6/' + address.address + '/tcp/' + address.port) - if (ipfsId) { - ma = ma.encapsulate('/ipfs/' + ipfsId) + multiaddrs.push(multiaddr.fromNodeAddress(address, 'tcp')) } + }) - multiaddrs.push(ma) + if (multiaddrs.length === 0) { + throw new Error('Listener is not ready yet') } return multiaddrs } - return listener -} + /** + * Handler for new sockets from `net.createServer` + * @param {net.Socket} socket + */ + _onSocket (socket) { + // Avoid uncaught errors caused by unstable connections + socket.on('error', noop) + + const addr = getMultiaddr(socket) + if (!addr) { + if (socket.remoteAddress === undefined) { + log('connection closed before p2p connection made') + } else { + log('error interpreting incoming p2p connection') + } + return + } -function getIpfsId (ma) { - return ma.stringTuples().filter((tuple) => { - return tuple[0] === c.IPFS_MA_CODE - })[0][1] + log('new connection', addr.toString()) + + const s = new Libp2pSocket(socket, addr) + + // Track the connection + const key = `${socket.remoteAddress}:${socket.remotePort}` + this.__connections.set(key, socket) + socket.once('close', () => { + this.__connections.delete(key) + socket.removeAllListeners() + }) + + this._connectionHandler(s) + this.emit('connection', s) + } } -function trackSocket (server, socket) { - const key = `${socket.remoteAddress}:${socket.remotePort}` - server.__connections[key] = socket +module.exports = (options, handler) => { + return new Listener(options, handler) +} - socket.once('close', () => { - delete server.__connections[key] - }) +/** + * Get unique values from `arr` using `getValue` to determine + * what is used for uniqueness + * @param {Array} arr The array to get unique values for + * @param {function(value)} getValue The function to determine what is compared + * @returns {Array} + */ +function uniqueBy (arr, getValue) { + return [...new Map(arr.map((i) => [getValue(i), i])).values()] } diff --git a/test/adapter/listen-dial.spec.js b/test/adapter/listen-dial.spec.js index 97e6980..72c3cfc 100644 --- a/test/adapter/listen-dial.spec.js +++ b/test/adapter/listen-dial.spec.js @@ -114,14 +114,14 @@ describe('listen', () => { }) }) - it('getAddrs preserves IPFS Id', (done) => { + it('getAddrs does not preserve IPFS Id', (done) => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = tcp.createListener((conn) => {}) listener.listen(mh, () => { listener.getAddrs((err, multiaddrs) => { expect(err).to.not.exist() expect(multiaddrs.length).to.equal(1) - expect(multiaddrs[0]).to.deep.equal(mh) + expect(multiaddrs[0]).to.deep.equal(mh.decapsulate('ipfs')) listener.close(done) }) }) diff --git a/test/listen-dial.spec.js b/test/listen-dial.spec.js index e14e759..83a1876 100644 --- a/test/listen-dial.spec.js +++ b/test/listen-dial.spec.js @@ -38,7 +38,7 @@ describe('listen', () => { await new Promise((resolve) => { socket1.on('connect', async () => { - await listener.close() + await listener.close({ timeout: 100 }) resolve() }) }) @@ -115,14 +115,14 @@ describe('listen', () => { await listener.close() }) - it('getAddrs preserves IPFS Id', async () => { + it('getAddrs does not preserve IPFS Id', async () => { const mh = multiaddr('/ip4/127.0.0.1/tcp/9090/ipfs/Qmb6owHp6eaWArVbcJJbQSyifyJBttMMjYV76N2hMbf5Vw') const listener = tcp.createListener((conn) => {}) await listener.listen(mh) const multiaddrs = listener.getAddrs() expect(multiaddrs.length).to.equal(1) - expect(multiaddrs[0]).to.deep.equal(mh) + expect(multiaddrs[0]).to.deep.equal(mh.decapsulate('ipfs')) await listener.close() }) @@ -183,7 +183,7 @@ describe('dial', () => { handled = resolve }) - const ma = multiaddr('/ip6/::/tcp/9067') + const ma = multiaddr('/ip6/::/tcp/0') const listener = tcp.createListener(async (conn) => { await pipe( @@ -194,7 +194,8 @@ describe('dial', () => { }) await listener.listen(ma) - await pipe(await tcp.dial(ma)) + const addrs = listener.getAddrs() + await pipe(await tcp.dial(addrs[0])) await handledPromise await listener.close() @@ -210,7 +211,7 @@ describe('dial', () => { handled = resolve }) - const ma = multiaddr('/ip6/::/tcp/9068') + const ma = multiaddr('/ip6/::/tcp/0') const listener = tcp.createListener(async (conn) => { // pull(conn, pull.onEnd(destroyed)) @@ -219,7 +220,8 @@ describe('dial', () => { }) await listener.listen(ma) - await pipe([], await tcp.dial(ma)) + const addrs = listener.getAddrs() + await pipe([], await tcp.dial(addrs[0])) await handledPromise await listener.close()