From 3f3ce8a998b42b15623aaed3ed80606cf5ed91cc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Antunes?= Date: Sun, 25 Mar 2018 03:54:20 +0100 Subject: [PATCH] feat: streamable ping and optional packet number (#723) --- README.md | 4 +- package.json | 1 + src/ping-pull-stream.js | 29 +++++ src/ping-readable-stream.js | 33 +++++ src/ping.js | 24 ++-- src/utils/load-commands.js | 2 + test/ping.spec.js | 251 +++++++++++++++++++++++++++++++++--- test/sub-modules.spec.js | 4 + 8 files changed, 314 insertions(+), 34 deletions(-) create mode 100644 src/ping-pull-stream.js create mode 100644 src/ping-readable-stream.js diff --git a/README.md b/README.md index 58c48048e..9e9a92a14 100644 --- a/README.md +++ b/README.md @@ -252,7 +252,9 @@ $ ipfs config --json API.HTTPHeaders.Access-Control-Allow-Methods "[\"PUT\", \"P - [miscellaneous operations](https://github.com/ipfs/interface-ipfs-core/tree/master/SPEC/MISCELLANEOUS.md) - [`ipfs.id([callback])`](https://github.com/ipfs/interface-ipfs-core/tree/master/SPEC/MISCELLANEOUS.md#id) - [`ipfs.version([callback])`](https://github.com/ipfs/interface-ipfs-core/tree/master/SPEC/MISCELLANEOUS.md#version) - - [`ipfs.ping()`](https://github.com/ipfs/interface-ipfs-core/tree/master/SPEC/MISCELLANEOUS.md#ping) + - [`ipfs.ping(id, [options, callback])`](https://github.com/ipfs/interface-ipfs-core/tree/master/SPEC/MISCELLANEOUS.md#ping) + - `ipfs.pingPullStream(id, [options])` + - `ipfs.pingReadableStream(id, [options])` - [`ipfs.dns(domain, [callback])`](https://github.com/ipfs/interface-ipfs-core/tree/master/SPEC/MISCELLANEOUS.md#dns) - [`ipfs.stop([callback])`](https://github.com/ipfs/interface-ipfs-core/tree/master/SPEC/MISCELLANEOUS.md#stop). Alias to `ipfs.shutdown`. diff --git a/package.json b/package.json index f1ec47ba9..04a39b2ad 100644 --- a/package.json +++ b/package.json @@ -77,6 +77,7 @@ "ipfs": "~0.28.2", "ipfsd-ctl": "~0.30.4", "pre-commit": "^1.2.2", + "pull-stream": "^3.6.2", "socket.io": "^2.0.4", "socket.io-client": "^2.0.4", "stream-equal": "^1.1.1" diff --git a/src/ping-pull-stream.js b/src/ping-pull-stream.js new file mode 100644 index 000000000..9c18140e2 --- /dev/null +++ b/src/ping-pull-stream.js @@ -0,0 +1,29 @@ +'use strict' + +const toPull = require('stream-to-pull-stream') +const deferred = require('pull-defer') +const moduleConfig = require('./utils/module-config') + +module.exports = (arg) => { + const send = moduleConfig(arg) + + return (id, opts = {}) => { + // Default number of packtes to 1 + if (!opts.n && !opts.count) { + opts.n = 1 + } + const request = { + path: 'ping', + args: id, + qs: opts + } + const p = deferred.source() + + send(request, (err, stream) => { + if (err) { return p.abort(err) } + p.resolve(toPull.source(stream)) + }) + + return p + } +} diff --git a/src/ping-readable-stream.js b/src/ping-readable-stream.js new file mode 100644 index 000000000..6281a44de --- /dev/null +++ b/src/ping-readable-stream.js @@ -0,0 +1,33 @@ +'use strict' + +const Stream = require('readable-stream') +const pump = require('pump') +const moduleConfig = require('./utils/module-config') + +module.exports = (arg) => { + const send = moduleConfig(arg) + + return (id, opts = {}) => { + // Default number of packtes to 1 + if (!opts.n && !opts.count) { + opts.n = 1 + } + const request = { + path: 'ping', + args: id, + qs: opts + } + // ndjson streams objects + const pt = new Stream.PassThrough({ + objectMode: true + }) + + send(request, (err, stream) => { + if (err) { return pt.destroy(err) } + + pump(stream, pt) + }) + + return pt + } +} diff --git a/src/ping.js b/src/ping.js index 4dbb77b6c..8aca6c09d 100644 --- a/src/ping.js +++ b/src/ping.js @@ -7,30 +7,30 @@ const streamToValue = require('./utils/stream-to-value') module.exports = (arg) => { const send = moduleConfig(arg) - return promisify((id, callback) => { + return promisify((id, opts, callback) => { + if (typeof opts === 'function') { + callback = opts + opts = {} + } + // Default number of packtes to 1 + if (!opts.n && !opts.count) { + opts.n = 1 + } const request = { path: 'ping', args: id, - qs: { n: 1 } + qs: opts } // Transform the response stream to a value: - // { Success: , Time: , Text: } + // [{ Success: , Time: , Text: }] const transform = (res, callback) => { streamToValue(res, (err, res) => { if (err) { return callback(err) } - // go-ipfs http api currently returns 3 lines for a ping. - // they're a little messed, so take the correct values from each lines. - const pingResult = { - Success: res[1].Success, - Time: res[1].Time, - Text: res[2].Text - } - - callback(null, pingResult) + callback(null, res) }) } diff --git a/src/utils/load-commands.js b/src/utils/load-commands.js index f1b5f6923..e86982e18 100644 --- a/src/utils/load-commands.js +++ b/src/utils/load-commands.js @@ -31,6 +31,8 @@ function requireCommands () { object: require('../object'), pin: require('../pin'), ping: require('../ping'), + pingReadableStream: require('../ping-readable-stream'), + pingPullStream: require('../ping-pull-stream'), refs: require('../refs'), repo: require('../repo'), stop: require('../stop'), diff --git a/test/ping.spec.js b/test/ping.spec.js index 747464798..09ab34402 100644 --- a/test/ping.spec.js +++ b/test/ping.spec.js @@ -3,6 +3,8 @@ const chai = require('chai') const dirtyChai = require('dirty-chai') +const pull = require('pull-stream') +const collect = require('pull-stream/sinks/collect') const expect = chai.expect chai.use(dirtyChai) @@ -12,11 +14,12 @@ const series = require('async/series') const IPFSApi = require('../src') const f = require('./utils/factory') -describe.skip('.ping', () => { +describe('.ping', function () { let ipfs let ipfsd let other let otherd + let otherId before(function (done) { this.timeout(20 * 1000) // slow CI @@ -43,7 +46,14 @@ describe.skip('.ping', () => { ipfsd.api.id((err, id) => { expect(err).to.not.exist() const ma = id.addresses[0] - other.api.swarm.connect(ma, cb) + other.swarm.connect(ma, cb) + }) + }, + (cb) => { + other.id((err, id) => { + expect(err).to.not.exist() + otherId = id.id + cb() }) } ], done) @@ -57,35 +67,234 @@ describe.skip('.ping', () => { }) describe('callback API', () => { - it('ping another peer', (done) => { - other.id((err, id) => { + it('ping another peer with default packet count', (done) => { + ipfs.ping(otherId, (err, res) => { expect(err).to.not.exist() + expect(res).to.be.an('array') + expect(res).to.have.lengthOf(3) + res.forEach(packet => { + expect(packet).to.have.keys('Success', 'Time', 'Text') + expect(packet.Time).to.be.a('number') + }) + const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + expect(resultMsg).to.exist() + done() + }) + }) - ipfs.ping(id.id, (err, res) => { - expect(err).to.not.exist() - expect(res).to.have.a.property('Success') - expect(res).to.have.a.property('Time') - expect(res).to.have.a.property('Text') - expect(res.Text).to.contain('Average latency') - expect(res.Time).to.be.a('number') - done() + it('ping another peer with a specifc packet count through parameter count', (done) => { + ipfs.ping(otherId, {count: 3}, (err, res) => { + expect(err).to.not.exist() + expect(res).to.be.an('array') + expect(res).to.have.lengthOf(5) + res.forEach(packet => { + expect(packet).to.have.keys('Success', 'Time', 'Text') + expect(packet.Time).to.be.a('number') }) + const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + expect(resultMsg).to.exist() + done() + }) + }) + + it('ping another peer with a specifc packet count through parameter n', (done) => { + ipfs.ping(otherId, {n: 3}, (err, res) => { + expect(err).to.not.exist() + expect(res).to.be.an('array') + expect(res).to.have.lengthOf(5) + res.forEach(packet => { + expect(packet).to.have.keys('Success', 'Time', 'Text') + expect(packet.Time).to.be.a('number') + }) + const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + expect(resultMsg).to.exist() + done() + }) + }) + + it('sending both n and count should fail', (done) => { + ipfs.ping(otherId, {count: 10, n: 10}, (err, res) => { + expect(err).to.exist() + done() }) }) }) describe('promise API', () => { - it('ping another peer', () => { - return other.id() - .then((id) => { - return ipfs.ping(id.id) + it('ping another peer with default packet count', () => { + return ipfs.ping(otherId) + .then((res) => { + expect(res).to.be.an('array') + expect(res).to.have.lengthOf(3) + res.forEach(packet => { + expect(packet).to.have.keys('Success', 'Time', 'Text') + expect(packet.Time).to.be.a('number') + }) + const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + expect(resultMsg).to.exist() }) + }) + + it('ping another peer with a specifc packet count through parameter count', () => { + return ipfs.ping(otherId, {count: 3}) .then((res) => { - expect(res).to.have.a.property('Success') - expect(res).to.have.a.property('Time') - expect(res).to.have.a.property('Text') - expect(res.Text).to.contain('Average latency') - expect(res.Time).to.be.a('number') + expect(res).to.be.an('array') + expect(res).to.have.lengthOf(5) + res.forEach(packet => { + expect(packet).to.have.keys('Success', 'Time', 'Text') + expect(packet.Time).to.be.a('number') + }) + const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + expect(resultMsg).to.exist() + }) + }) + + it('ping another peer with a specifc packet count through parameter n', () => { + return ipfs.ping(otherId, {n: 3}) + .then((res) => { + expect(res).to.be.an('array') + expect(res).to.have.lengthOf(5) + res.forEach(packet => { + expect(packet).to.have.keys('Success', 'Time', 'Text') + expect(packet.Time).to.be.a('number') + }) + const resultMsg = res.find(packet => packet.Text.includes('Average latency')) + expect(resultMsg).to.exist() + }) + }) + + it('sending both n and count should fail', (done) => { + ipfs.ping(otherId, {n: 3, count: 3}) + .catch(err => { + expect(err).to.exist() + done() + }) + }) + }) + + describe('pull stream API', () => { + it('ping another peer with the default packet count', (done) => { + pull( + ipfs.pingPullStream(otherId), + collect((err, data) => { + expect(err).to.not.exist() + expect(data).to.be.an('array') + expect(data).to.have.lengthOf(3) + data.forEach(packet => { + expect(packet).to.have.keys('Success', 'Time', 'Text') + expect(packet.Time).to.be.a('number') + }) + const resultMsg = data.find(packet => packet.Text.includes('Average latency')) + expect(resultMsg).to.exist() + done() + }) + ) + }) + + it('ping another peer with a specifc packet count through parameter count', (done) => { + pull( + ipfs.pingPullStream(otherId, {count: 3}), + collect((err, data) => { + expect(err).to.not.exist() + expect(data).to.be.an('array') + expect(data).to.have.lengthOf(5) + data.forEach(packet => { + expect(packet).to.have.keys('Success', 'Time', 'Text') + expect(packet.Time).to.be.a('number') + }) + const resultMsg = data.find(packet => packet.Text.includes('Average latency')) + expect(resultMsg).to.exist() + done() + }) + ) + }) + + it('ping another peer with a specifc packet count through parameter n', (done) => { + pull( + ipfs.pingPullStream(otherId, {n: 3}), + collect((err, data) => { + expect(err).to.not.exist() + expect(data).to.be.an('array') + expect(data).to.have.lengthOf(5) + data.forEach(packet => { + expect(packet).to.have.keys('Success', 'Time', 'Text') + expect(packet.Time).to.be.a('number') + }) + const resultMsg = data.find(packet => packet.Text.includes('Average latency')) + expect(resultMsg).to.exist() + done() + }) + ) + }) + + it('sending both n and count should fail', (done) => { + pull( + ipfs.pingPullStream(otherId, {n: 3, count: 3}), + collect(err => { + expect(err).to.exist() + done() + }) + ) + }) + }) + + describe('readable stream API', () => { + it('ping another peer with the default packet count', (done) => { + let packetNum = 0 + ipfs.pingReadableStream(otherId) + .on('data', data => { + packetNum++ + expect(data).to.be.an('object') + expect(data).to.have.keys('Success', 'Time', 'Text') + }) + .on('error', err => { + expect(err).not.to.exist() + }) + .on('end', () => { + expect(packetNum).to.equal(3) + done() + }) + }) + + it('ping another peer with a specifc packet count through parameter count', (done) => { + let packetNum = 0 + ipfs.pingReadableStream(otherId, {count: 3}) + .on('data', data => { + packetNum++ + expect(data).to.be.an('object') + expect(data).to.have.keys('Success', 'Time', 'Text') + }) + .on('error', err => { + expect(err).not.to.exist() + }) + .on('end', () => { + expect(packetNum).to.equal(5) + done() + }) + }) + + it('ping another peer with a specifc packet count through parameter n', (done) => { + let packetNum = 0 + ipfs.pingReadableStream(otherId, {n: 3}) + .on('data', data => { + packetNum++ + expect(data).to.be.an('object') + expect(data).to.have.keys('Success', 'Time', 'Text') + }) + .on('error', err => { + expect(err).not.to.exist() + }) + .on('end', () => { + expect(packetNum).to.equal(5) + done() + }) + }) + + it('sending both n and count should fail', (done) => { + ipfs.pingReadableStream(otherId, {n: 3, count: 3}) + .on('error', err => { + expect(err).to.exist() + done() }) }) }) diff --git a/test/sub-modules.spec.js b/test/sub-modules.spec.js index 3512731cc..1921b0228 100644 --- a/test/sub-modules.spec.js +++ b/test/sub-modules.spec.js @@ -69,8 +69,12 @@ describe('submodules', () => { it('ping', () => { const ping = require('../src/ping')(config) + const pingPullStream = require('../src/ping-pull-stream')(config) + const pingReadableStream = require('../src/ping-readable-stream')(config) expect(ping).to.be.a('function') + expect(pingPullStream).to.be.a('function') + expect(pingReadableStream).to.be.a('function') }) it('log', () => {