diff --git a/package.json b/package.json index 6b374ed..baf84d2 100644 --- a/package.json +++ b/package.json @@ -42,10 +42,11 @@ "dirty-chai": "^2.0.1", "interface-transport": "~0.3.6", "lodash.isfunction": "^3.0.9", - "pull-stream": "^3.6.9" + "pull-stream": "^3.6.9", + "sinon": "^7.3.1" }, "dependencies": { - "async-iterator-to-pull-stream": "^1.3.0", + "abort-controller": "^3.0.0", "class-is": "^1.1.0", "debug": "^3.1.0", "err-code": "^1.1.2", diff --git a/src/adapter.js b/src/adapter.js index f1cfa6f..e206a8b 100644 --- a/src/adapter.js +++ b/src/adapter.js @@ -1,79 +1,16 @@ 'use strict' -const { Connection } = require('interface-connection') +const { Adapter } = require('interface-transport') const withIs = require('class-is') -const toPull = require('async-iterator-to-pull-stream') -const error = require('pull-stream/sources/error') -const drain = require('pull-stream/sinks/drain') -const TCP = require('./') -const noop = () => {} - -function callbackify (fn) { - return async function (...args) { - let cb = args.pop() - if (typeof cb !== 'function') { - args.push(cb) - cb = noop - } - let res - try { - res = await fn(...args) - } catch (err) { - return cb(err) - } - cb(null, res) - } -} +const TCP = require('.') // Legacy adapter to old transport & connection interface -class TcpAdapter extends TCP { - dial (ma, options, callback) { - if (typeof options === 'function') { - callback = options - options = {} - } - - callback = callback || noop - - const conn = new Connection() - - super.dial(ma, options) - .then(socket => { - conn.setInnerConn(toPull.duplex(socket)) - conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) - conn.close = callbackify(socket.close.bind(socket)) - callback(null, conn) - }) - .catch(err => { - conn.setInnerConn({ sink: drain(), source: error(err) }) - callback(err) - }) - - return conn - } - - createListener (options, handler) { - if (typeof options === 'function') { - handler = options - options = {} - } - - const server = super.createListener(options, socket => { - const conn = new Connection(toPull.duplex(socket)) - conn.getObservedAddrs = callbackify(socket.getObservedAddrs.bind(socket)) - handler(conn) - }) - - const proxy = { - listen: callbackify(server.listen.bind(server)), - close: callbackify(server.close.bind(server)), - getAddrs: callbackify(server.getAddrs.bind(server)), - getObservedAddrs: callbackify(() => server.getObservedAddrs()) - } - - return new Proxy(server, { get: (_, prop) => proxy[prop] || server[prop] }) +class TcpAdapter extends Adapter { + constructor () { + super(new TCP()) } } + module.exports = withIs(TcpAdapter, { className: 'TCP', symbolName: '@libp2p/js-libp2p-tcp/tcp' diff --git a/src/index.js b/src/index.js index 51a8706..e59da74 100644 --- a/src/index.js +++ b/src/index.js @@ -11,6 +11,7 @@ const log = debug('libp2p:tcp:dial') const Libp2pSocket = require('./socket') const createListener = require('./listener') +const { AbortError } = require('interface-transport') function noop () {} @@ -19,12 +20,16 @@ class TCP { const cOpts = ma.toOptions() log('Connecting to %s:%s', cOpts.host, cOpts.port) - const rawSocket = await this._connect(cOpts) + const rawSocket = await this._connect(cOpts, options) return new Libp2pSocket(rawSocket, ma, options) } - _connect (cOpts) { + _connect (cOpts, options = {}) { return new Promise((resolve, reject) => { + if ((options.signal || {}).aborted) { + return reject(new AbortError()) + } + const start = Date.now() const rawSocket = net.connect(cOpts) @@ -44,10 +49,17 @@ class TCP { done(null, rawSocket) } + const onAbort = () => { + log('Connect to %s:%s aborted', cOpts.host, cOpts.port) + rawSocket.destroy() + done(new AbortError()) + } + const done = (err, res) => { rawSocket.removeListener('error', onError) rawSocket.removeListener('timeout', onTimeout) rawSocket.removeListener('connect', onConnect) + options.signal && options.signal.removeEventListener(onAbort) err ? reject(err) : resolve(res) } @@ -55,6 +67,7 @@ class TCP { rawSocket.once('error', onError) rawSocket.once('timeout', onTimeout) rawSocket.once('connect', onConnect) + options.signal && options.signal.addEventListener('abort', onAbort) }) } diff --git a/test/compliance.spec.js b/test/compliance.spec.js index 54cedee..11fd244 100644 --- a/test/compliance.spec.js +++ b/test/compliance.spec.js @@ -1,8 +1,10 @@ /* eslint-env mocha */ 'use strict' +const sinon = require('sinon') const tests = require('interface-transport') const multiaddr = require('multiaddr') +const net = require('net') const TCP = require('../src') describe('interface-transport compliance', () => { @@ -15,7 +17,27 @@ describe('interface-transport compliance', () => { multiaddr('/ip4/127.0.0.1/tcp/9093'), multiaddr('/dns4/ipfs.io') ] - return { transport: tcp, addrs } + + // Used by the dial tests to simulate a delayed connect + const connector = { + delay (delayMs) { + const netConnect = net.connect + sinon.replace(net, 'connect', (opts) => { + const socket = netConnect(opts) + const socketEmit = socket.emit.bind(socket) + sinon.replace(socket, 'emit', (...args) => { + const time = args[0] === 'connect' ? delayMs : 0 + setTimeout(() => socketEmit(...args), time) + }) + return socket + }) + }, + restore () { + sinon.restore() + } + } + + return { transport: tcp, addrs, connector } } }) })