From b1c3da08d445ce448519ed0c38a6cb7e8101cd4b Mon Sep 17 00:00:00 2001 From: noffle Date: Sun, 19 May 2019 18:51:31 +0200 Subject: [PATCH 1/6] PROTOCOL-BREAKING: use discovery-swarm to shuttle around peer id --- README.md | 5 +++-- package.json | 3 ++- swarm.js | 56 +++++++++++++++------------------------------------- 3 files changed, 21 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index f653ea2..f0efdf7 100644 --- a/README.md +++ b/README.md @@ -103,9 +103,10 @@ documented types include > var swarm = require('cabal-core/swarm') -#### swarm(cabal) +#### swarm(cabal, cb) -Join the P2P swarm for a cabal, start connecting to peers and replicating messages. +Join the P2P swarm for a cabal, start connecting to peers and replicating +messages. Returns a [discovery-swarm](https://github.com/mafintosh/discovery-swarm). diff --git a/package.json b/package.json index 1bf160d..a572611 100644 --- a/package.json +++ b/package.json @@ -41,11 +41,12 @@ "kappa-view-level": "^1.1.0", "memdb": "^1.3.1", "monotonic-timestamp": "0.0.9", + "multifeed": "^2.0.5", "randombytes": "^2.0.6", "read-only-stream": "^2.0.0", "strftime": "^0.10.0", "through2": "^2.0.3", - "thunky": "^1.0.2", + "thunky": "^1.0.3", "xtend": "^4.0.1" }, "devDependencies": { diff --git a/swarm.js b/swarm.js index 2bcf2f6..48878b8 100644 --- a/swarm.js +++ b/swarm.js @@ -1,52 +1,28 @@ +var pump = require('pump') var discovery = require('discovery-swarm') var swarmDefaults = require('dat-swarm-defaults') -module.exports = function (cabal) { - var swarm = discovery(swarmDefaults()) - swarm.join(cabal.key.toString('hex')) - swarm.on('connection', function (conn, info) { - var remoteKey - var ended = false +module.exports = function (cabal, cb) { + cb = cb || function () {} - cabal.getLocalKey(function (err, key) { - if (key) { - // send local key to remote - conn.write(new Buffer(key, 'hex')) + cabal.getLocalKey(function (err, key) { + if (err) return cb(err) - // read remote key from remote - conn.once('readable', onReadable) + var swarm = discovery(Object.assign({}, swarmDefaults(), { id: key })) + swarm.join(cabal.key.toString('hex')) + swarm.on('connection', function (conn, info) { + conn.once('error', function () { if (info.id) cabal._removeConnection(info.id) }) + conn.once('end', function () { if (info.id) cabal._removeConnection(info.id) }) - conn.once('end', function () { - ended = true - }) - - function onReadable () { - if (ended) return - var rkey = conn.read(32) - if (!rkey) { - conn.once('readable', onReadable) - return - } + var r = cabal.replicate() + pump(conn, r, conn, function (err) { + // TODO: report somehow + }) - remoteKey = rkey.toString('hex') - cabal._addConnection(remoteKey) - replicate() - } - } else { - throw new Error('UNEXPECTED STATE: no local key!') - } + cabal._addConnection(info.id) }) - function replicate () { - var r = cabal.replicate() - conn.pipe(r).pipe(conn) - r.on('error', noop) - } - - conn.once('error', function () { if (remoteKey) cabal._removeConnection(remoteKey) }) - conn.once('end', function () { if (remoteKey) cabal._removeConnection(remoteKey) }) + cb(null, swarm) }) - return swarm } -function noop () {} From 0584aea8ae0ce01a9981bff1199cfb3f8512bd37 Mon Sep 17 00:00:00 2001 From: noffle Date: Sun, 19 May 2019 19:47:41 +0200 Subject: [PATCH 2/6] send correct key --- swarm.js | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/swarm.js b/swarm.js index 48878b8..dbf7445 100644 --- a/swarm.js +++ b/swarm.js @@ -8,18 +8,19 @@ module.exports = function (cabal, cb) { cabal.getLocalKey(function (err, key) { if (err) return cb(err) - var swarm = discovery(Object.assign({}, swarmDefaults(), { id: key })) + var swarm = discovery(Object.assign({}, swarmDefaults(), { id: Buffer.from(key, 'hex') })) swarm.join(cabal.key.toString('hex')) swarm.on('connection', function (conn, info) { - conn.once('error', function () { if (info.id) cabal._removeConnection(info.id) }) - conn.once('end', function () { if (info.id) cabal._removeConnection(info.id) }) + var remoteKey = info.id.toString('hex') + conn.once('error', function () { if (remoteKey) cabal._removeConnection(remoteKey) }) + conn.once('end', function () { if (remoteKey) cabal._removeConnection(remoteKey) }) var r = cabal.replicate() pump(conn, r, conn, function (err) { // TODO: report somehow }) - cabal._addConnection(info.id) + cabal._addConnection(remoteKey) }) cb(null, swarm) From a53163b8dddea16b1efc78c78750c840ffcc3c8a Mon Sep 17 00:00:00 2001 From: noffle Date: Mon, 20 May 2019 13:36:42 +0200 Subject: [PATCH 3/6] feat: expose "swarm" API on the cabal itself --- README.md | 23 ++++++++++------------- index.js | 19 ++++++++++--------- 2 files changed, 20 insertions(+), 22 deletions(-) diff --git a/README.md b/README.md index f0efdf7..784374c 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # cabal-core -Core database, replication, and chat APIs for cabal. +Core database, replication, swarming, and chat APIs for cabal. ## Usage @@ -20,7 +20,7 @@ If this is a new database, `key` can be omitted and will be generated. ### cabal.getLocalKey(cb) -Returns the local user's key (as a string). +Returns the local user's key (as a hex string). ### var ds = cabal.replicate() @@ -55,6 +55,14 @@ Calls `fn` with every new message that arrives in `channel`. ### Network +> var swarm = require('cabal-core/swarm') + +#### cabal.swarm(cb) + +Joins the P2P swarm for a cabal. This seeks out peers who are also part of this cabal by various means (internet, local network), connects to them, and replicates cabal messages between them. + +The returned object is an instance of [discovery-swarm](https://github.com/mafintosh/discovery-swarm). + #### cabal.on('peer-added', function (key) {}) Emitted when you connect to a peer. `key` is a hex string of their public key. @@ -99,17 +107,6 @@ documented types include } ``` -### swarm - -> var swarm = require('cabal-core/swarm') - -#### swarm(cabal, cb) - -Join the P2P swarm for a cabal, start connecting to peers and replicating -messages. - -Returns a [discovery-swarm](https://github.com/mafintosh/discovery-swarm). - ## License AGPLv3 diff --git a/index.js b/index.js index e9aab2e..7fa8555 100644 --- a/index.js +++ b/index.js @@ -8,6 +8,7 @@ var createChannelView = require('./views/channels') var createMessagesView = require('./views/messages') var createTopicsView = require('./views/topics') var createUsersView = require('./views/users') +var swarm = require('./swarm') var DATABASE_VERSION = 1 @@ -105,7 +106,7 @@ Cabal.prototype.publish = function (message, opts, cb) { if (!opts) opts = {} this.feed(function (feed) { - message.timestamp = timestamp() + message.timestamp = message.timestamp || timestamp() feed.append(message, function (err) { cb(err, err ? null : message) }) @@ -154,14 +155,14 @@ Cabal.prototype.getLocalKey = function (cb) { }) } -/** - * Replication stream for the mesh. - */ -Cabal.prototype.replicate = function () { - return this.db.replicate({ - live: true, - maxFeeds: this.maxFeeds - }) +Cabal.prototype.swarm = function (cb) { + swarm(this, cb) +} + +Cabal.prototype.replicate = function (opts) { + opts = opts || {} + opts = Object.assign({}, {live:true}, opts) + return this.db.replicate(opts) } Cabal.prototype._addConnection = function (key) { From 3ab13bf5b0904e3b4cd7858a9295f31f3d713460 Mon Sep 17 00:00:00 2001 From: noffle Date: Mon, 20 May 2019 13:36:54 +0200 Subject: [PATCH 4/6] test: local and network replication --- package.json | 1 + test/test.js | 131 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 132 insertions(+) diff --git a/package.json b/package.json index a572611..c66602c 100644 --- a/package.json +++ b/package.json @@ -52,6 +52,7 @@ "devDependencies": { "collect-stream": "^1.2.1", "documentation": "^6.3.2", + "pump": "^3.0.0", "random-access-memory": "^3.0.0", "standard": "^11.0.1", "tape": "^4.9.1" diff --git a/test/test.js b/test/test.js index 032005f..79c4b9a 100644 --- a/test/test.js +++ b/test/test.js @@ -2,6 +2,7 @@ var collect = require('collect-stream') var Cabal = require('..') var test = require('tape') var ram = require('random-access-memory') +var pump = require('pump') test('create a cabal + channel', function (t) { var cabal = Cabal(ram) @@ -130,3 +131,133 @@ test('listening for live messages', function (t) { }) }) }) + +test('local replication', function (t) { + t.plan(15) + + function create (id, cb) { + var cabal = Cabal(ram) + cabal.db.ready(function () { + var msg = { + type: 'chat/text', + content: { + text: 'hello from ' + id, + channel: 'general', + timestamp: Number(id) * 1000 + } + } + cabal.getLocalKey(function (err, key) { + if (err) return cb(err) + cabal.key = key + cabal.publish(msg, function (err) { + if (err) cb(err) + else cb(null, cabal) + }) + }) + }) + } + + create(1, function (err, c1) { + t.error(err) + create(2, function (err, c2) { + t.error(err) + sync(c1, c2, function (err) { + t.error(err, 'sync ok') + + function check (cabal) { + var r = cabal.messages.read('general') + collect(r, function (err, data) { + t.error(err) + t.same(data.length, 2, '2 messages') + t.same(data[0].key, c2.key) + t.same(data[0].seq, 0) + t.same(data[1].key, c1.key) + t.same(data[1].seq, 0) + }) + } + + check(c1) + check(c2) + }) + }) + }) +}) + +test.only('local replication', function (t) { + t.plan(15) + + function create (id, cb) { + var cabal = Cabal(ram) + cabal.db.ready(function () { + var msg = { + type: 'chat/text', + content: { + text: 'hello from ' + id, + channel: 'general', + timestamp: Number(id) * 1000 + } + } + cabal.getLocalKey(function (err, key) { + if (err) return cb(err) + cabal.key = key + cabal.publish(msg, function (err) { + if (err) cb(err) + else cb(null, cabal) + }) + }) + }) + } + + create(1, function (err, c1) { + t.error(err) + create(2, function (err, c2) { + t.error(err) + syncNetwork(c1, c2, function (err) { + t.error(err, 'sync ok') + + function check (cabal) { + var r = cabal.messages.read('general') + collect(r, function (err, data) { + t.error(err) + t.same(data.length, 2, '2 messages') + t.same(data[0].key, c2.key) + t.same(data[0].seq, 0) + t.same(data[1].key, c1.key) + t.same(data[1].seq, 0) + }) + } + + check(c1) + check(c2) + }) + }) + }) +}) + +function sync (a, b, cb) { + var r = a.replicate({live:false}) + pump(r, b.replicate({live:false}), r, cb) +} + +function syncNetwork (a, b, cb) { + var pending = 2 + + a.swarm(function (err, swarm1) { + if (err) return cb(err) + b.swarm(function (err, swarm2) { + if (err) return cb(err) + a.on('peer-added', function (key) { + console.log('a-add', key) + setTimeout(function () { + if (!--pending) cb() + }, 1000) + }) + b.on('peer-added', function (key) { + console.log('b-add', key) + setTimeout(function () { + if (!--pending) cb() + }, 1000) + }) + }) + }) +} From abf2f121260f7b6b166aa2ae9c1da1d54f3f5d87 Mon Sep 17 00:00:00 2001 From: noffle Date: Mon, 20 May 2019 06:02:51 -0700 Subject: [PATCH 5/6] test: fix network swarming --- test/test.js | 28 +++++++++++++++++----------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/test/test.js b/test/test.js index 79c4b9a..c21fe01 100644 --- a/test/test.js +++ b/test/test.js @@ -183,11 +183,11 @@ test('local replication', function (t) { }) }) -test.only('local replication', function (t) { +test('swarm network replication', function (t) { t.plan(15) function create (id, cb) { - var cabal = Cabal(ram) + var cabal = Cabal(ram, 'fake') cabal.db.ready(function () { var msg = { type: 'chat/text', @@ -199,7 +199,7 @@ test.only('local replication', function (t) { } cabal.getLocalKey(function (err, key) { if (err) return cb(err) - cabal.key = key + cabal._localkey = key cabal.publish(msg, function (err) { if (err) cb(err) else cb(null, cabal) @@ -220,9 +220,9 @@ test.only('local replication', function (t) { collect(r, function (err, data) { t.error(err) t.same(data.length, 2, '2 messages') - t.same(data[0].key, c2.key) + t.same(data[0].key, c2._localkey) t.same(data[0].seq, 0) - t.same(data[1].key, c1.key) + t.same(data[1].key, c1._localkey) t.same(data[1].seq, 0) }) } @@ -246,17 +246,23 @@ function syncNetwork (a, b, cb) { if (err) return cb(err) b.swarm(function (err, swarm2) { if (err) return cb(err) + + function end () { + if (!--pending) { + swarm1.destroy(function () { + swarm2.destroy(cb) + }) + } + } + a.on('peer-added', function (key) { console.log('a-add', key) - setTimeout(function () { - if (!--pending) cb() - }, 1000) + setTimeout(end, 2000) }) + b.on('peer-added', function (key) { console.log('b-add', key) - setTimeout(function () { - if (!--pending) cb() - }, 1000) + setTimeout(end, 2000) }) }) }) From ce38211f989670953c9b1af4947aca53ee7a30ef Mon Sep 17 00:00:00 2001 From: noffle Date: Mon, 20 May 2019 06:09:04 -0700 Subject: [PATCH 6/6] chore: remove unused dep --- package.json | 1 - 1 file changed, 1 deletion(-) diff --git a/package.json b/package.json index c66602c..277cd11 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,6 @@ "kappa-view-level": "^1.1.0", "memdb": "^1.3.1", "monotonic-timestamp": "0.0.9", - "multifeed": "^2.0.5", "randombytes": "^2.0.6", "read-only-stream": "^2.0.0", "strftime": "^0.10.0",