diff --git a/src/core/ipfs/pinner-utils.js b/src/core/ipfs/pinner-utils.js index b4228028d8..6050514c42 100644 --- a/src/core/ipfs/pinner-utils.js +++ b/src/core/ipfs/pinner-utils.js @@ -14,8 +14,9 @@ const emptyKey = new Buffer(bs58.decode(emptyKeyHash)) const defaultFanout = 256 const maxItems = 8192 -// from go-ipfs/pin/internal/pb/header.proto +// Protobuf interface const pbSchema = ( + // from go-ipfs/pin/internal/pb/header.proto 'message Set { ' + // 1 for now 'optional uint32 version = 1; ' + @@ -26,251 +27,244 @@ const pbSchema = ( '}' ) const pb = protobuf(pbSchema) +function readHeader (rootNode) { + // rootNode.data should be a buffer of the format: + // < varint(headerLength) | header | itemData... > + var rootData = rootNode.data + var hdrLength = varint.decode(rootData) + var vBytes = varint.decode.bytes + if (vBytes <= 0) { + return { err: 'Invalid Set header length' } + } + if (vBytes + hdrLength > rootData.length) { + return { err: 'Impossibly large set header length' } + } + var hdrSlice = rootData.slice(vBytes, hdrLength + vBytes) + var header = pb.Set.decode(hdrSlice) + if (header.version !== 1) { + return { err: 'Unsupported Set version: ' + header.version } + } + if (header.fanout > rootNode.links.length) { + return { err: 'Impossibly large fanout' } + } + return { + header: header, + data: rootData.slice(hdrLength + vBytes) + } +} -exports = module.exports = function PinUtils (dagS) { - // should this be part of `object` rather than `pinner`? - this.hasChild = (root, childhash, callback, _links, _checked, _seen) => { - // callback (err, has) - - if (callback.fired) { return } - if (typeof childhash === 'object') { - childhash = bs58.encode(childhash).toString() - } - _links = _links || root.links.length - _checked = _checked || 0 - _seen = _seen || {} +exports = module.exports = function (dagS) { + var pinnerUtils = { + // should this be part of `object` rather than `pinner`? + hasChild: (root, childhash, callback, _links, _checked, _seen) => { + // callback (err, has) + if (callback.fired) { return } + if (typeof childhash === 'object') { + childhash = bs58.encode(childhash).toString() + } + _links = _links || root.links.length + _checked = _checked || 0 + _seen = _seen || {} - if (!root.links.length && _links === _checked) { - // all nodes have been checked - return callback(null, false) - } - root.links.forEach((link) => { - var bs58link = bs58.encode(link.hash).toString() - if (bs58link === childhash) { - callback.fired = true - return callback(null, true) + if (!root.links.length && _links === _checked) { + // all nodes have been checked + return callback(null, false) } - dagS.get(link.hash, (err, obj) => { - if (err) { + root.links.forEach((link) => { + var bs58link = bs58.encode(link.hash).toString() + if (bs58link === childhash) { callback.fired = true - return callback(err) + return callback(null, true) } - // don't check the same links twice - if (bs58link in _seen) { return } - _seen[bs58link] = true + dagS.get(link.hash, (err, obj) => { + if (err) { + callback.fired = true + return callback(err) + } + // don't check the same links twice + if (bs58link in _seen) { return } + _seen[bs58link] = true - _checked++ - _links += obj.links.length - this.hasChild(obj, childhash, callback, _links, _checked, _seen) + _checked++ + _links += obj.links.length + pinnerUtils.hasChild(obj, childhash, callback, _links, _checked, _seen) + }) }) - }) - } + }, - this.storeSet = (keys, logInternalKey, callback) => { - // callback (err, rootNode) - - var items = keys.map((key) => { - return { - key: key, - data: null - } - }) - this.storeItems(items, logInternalKey, (err, rootNode) => { - if (err) { return callback(err) } - dagS.add(rootNode, (err) => { + storeSet: (keys, logInternalKey, callback) => { + // callback (err, rootNode) + var items = keys.map((key) => { + return { + key: key, + data: null + } + }) + pinnerUtils.storeItems(items, logInternalKey, (err, rootNode) => { if (err) { return callback(err) } - logInternalKey(rootNode.multihash()) - callback(null, rootNode) + dagS.add(rootNode, (err) => { + if (err) { return callback(err) } + logInternalKey(rootNode.multihash()) + callback(null, rootNode) + }) }) - }) - } - - this.storeItems = (items, logInternalKey, callback, _subcalls, _done) => { - // callback (err, rootNode) - - var seed = crypto.randomBytes(4).readUInt32LE(0, true) - var pbHeader = pb.Set.encode({ - version: 1, - fanout: defaultFanout, - seed: seed - }) - var rootData = Buffer.concat([ - new Buffer(varint.encode(pbHeader.length)), pbHeader - ]) - var rootLinks = [] - var i - for (i = 0; i < defaultFanout; i++) { - rootLinks.push(new DAGLink('', null, emptyKey)) - } - logInternalKey(emptyKey) - - if (items.length <= maxItems) { - // the items will fit in a single root node + }, - var itemLinks = [] - var itemData = [] - var indices = [] - for (i = 0; i < items.length; i++) { - itemLinks.push(new DAGLink('', null, items[i].key)) - itemData.push(items[i].data || new Buffer([])) - indices.push(i) - } - indices.sort((a, b) => { - var x = Buffer.compare(itemLinks[a].hash, itemLinks[b].hash) - if (x) { return x } - return (a < b ? -1 : 1) + storeItems: (items, logInternalKey, callback, _subcalls, _done) => { + // callback (err, rootNode) + var seed = crypto.randomBytes(4).readUInt32LE(0, true) + var pbHeader = pb.Set.encode({ + version: 1, + fanout: defaultFanout, + seed: seed }) - var sortedLinks = indices.map((i) => { return itemLinks[i] }) - var sortedData = indices.map((i) => { return itemData[i] }) - rootLinks = rootLinks.concat(sortedLinks) - rootData = Buffer.concat([rootData].concat(sortedData)) - readHeader(new DAGNode(rootData, rootLinks)) // - return callback(null, new DAGNode(rootData, rootLinks)) - } else { - // need to split up the items into multiple root nodes - // (using go-ipfs "wasteful but simple" approach for consistency) - _subcalls = _subcalls || 0 - _done = _done || 0 - - var h - var hashed = {} - var hashFn = (seed, key) => { - var buf = new Buffer(4) - var h = new fnv.FNV() - buf.writeUInt32LE(seed, 0) - h.update(buf) - h.update(bs58.encode(key).toString()) - return h.value() - } - // items will be distributed among `defaultFanout` bins - for (i = 0; i < items.length; i++) { - h = hashFn(seed, items[i].key) % defaultFanout - hashed[h] = hashed[h] || [] - hashed[h].push(items[i]) + var rootData = Buffer.concat([ + new Buffer(varint.encode(pbHeader.length)), pbHeader + ]) + var rootLinks = [] + var i + for (i = 0; i < defaultFanout; i++) { + rootLinks.push(new DAGLink('', null, emptyKey)) } + logInternalKey(emptyKey) - var storeItemsCb = (err, child) => { - if (callback.fired) { return } - if (err) { - callback.fired = true - return callback(err) + if (items.length <= maxItems) { + // the items will fit in a single root node + var itemLinks = [] + var itemData = [] + var indices = [] + for (i = 0; i < items.length; i++) { + itemLinks.push(new DAGLink('', null, items[i].key)) + itemData.push(items[i].data || new Buffer([])) + indices.push(i) + } + indices.sort((a, b) => { + var x = Buffer.compare(itemLinks[a].hash, itemLinks[b].hash) + if (x) { return x } + return (a < b ? -1 : 1) + }) + var sortedLinks = indices.map((i) => { return itemLinks[i] }) + var sortedData = indices.map((i) => { return itemData[i] }) + rootLinks = rootLinks.concat(sortedLinks) + rootData = Buffer.concat([rootData].concat(sortedData)) + readHeader(new DAGNode(rootData, rootLinks)) // + return callback(null, new DAGNode(rootData, rootLinks)) + } else { + // need to split up the items into multiple root nodes + // (using go-ipfs "wasteful but simple" approach for consistency) + _subcalls = _subcalls || 0 + _done = _done || 0 + var h + var hashed = {} + var hashFn = (seed, key) => { + var buf = new Buffer(4) + var h = new fnv.FNV() + buf.writeUInt32LE(seed, 0) + h.update(buf) + h.update(bs58.encode(key).toString()) + return h.value() + } + // items will be distributed among `defaultFanout` bins + for (i = 0; i < items.length; i++) { + h = hashFn(seed, items[i].key) % defaultFanout + hashed[h] = hashed[h] || [] + hashed[h].push(items[i]) } - dagS.add(child, (err) => { + var storeItemsCb = (err, child) => { if (callback.fired) { return } if (err) { callback.fired = true return callback(err) } - logInternalKey(child.multihash()) - rootLinks[this.h] = new DAGLink( - '', child.size(), child.multihash() - ) - _done++ - if (_done === _subcalls) { - // all finished - return callback(null, new DAGNode(rootData, rootLinks)) + dagS.add(child, (err) => { + if (callback.fired) { return } + if (err) { + callback.fired = true + return callback(err) + } + logInternalKey(child.multihash()) + rootLinks[this.h] = new DAGLink( + '', child.size(), child.multihash() + ) + _done++ + if (_done === _subcalls) { + // all finished + return callback(null, new DAGNode(rootData, rootLinks)) + } + }) + } + _subcalls += Object.keys(hashed).length + for (h in hashed) { + if (hashed.hasOwnProperty(h)) { + pinnerUtils.storeItems( + hashed[h], + logInternalKey, + storeItemsCb.bind({h: h}), + _subcalls, + _done + ) } - }) - } - - _subcalls += Object.keys(hashed).length - for (h in hashed) { - if (hashed.hasOwnProperty(h)) { - this.storeItems( - hashed[h], - logInternalKey, - storeItemsCb.bind({h: h}), - _subcalls, - _done - ) } } - } - } - - this.loadSet = (rootNode, name, logInternalKey, callback) => { - // callback (err, keys) + }, - var link = rootNode.links.filter((link) => { - return link.name === name - }).pop() - if (!link) { return callback('No link found with name ' + name) } - logInternalKey(link.hash) - dagS.get(link.hash, (err, obj) => { - if (err) { return callback(err) } - var keys = [] - var walkerFn = (link) => { - keys.push(link.hash) - } - this.walkItems(obj, walkerFn, logInternalKey, (err) => { + loadSet: (rootNode, name, logInternalKey, callback) => { + // callback (err, keys) + var link = rootNode.links.filter((link) => { + return link.name === name + }).pop() + if (!link) { return callback('No link found with name ' + name) } + logInternalKey(link.hash) + dagS.get(link.hash, (err, obj) => { if (err) { return callback(err) } - return callback(null, keys) + var keys = [] + var walkerFn = (link) => { + keys.push(link.hash) + } + pinnerUtils.walkItems(obj, walkerFn, logInternalKey, (err) => { + if (err) { return callback(err) } + return callback(null, keys) + }) }) - }) - } + }, - this.walkItems = (node, walkerFn, logInternalKey, callback) => { - // callback (err) + walkItems: (node, walkerFn, logInternalKey, callback) => { + // callback (err) + var h = readHeader(node) + if (h.err) { return callback(h.err) } + var fanout = h.header.fanout + var subwalks = 0 + var finished = 0 - var h = readHeader(node) - if (h.err) { return callback(h.err) } - var fanout = h.header.fanout - var subwalks = 0 - var finished = 0 - - var walkCb = (err) => { - if (err) { return callback(err) } - finished++ - if (subwalks === finished) { - return callback() + var walkCb = (err) => { + if (err) { return callback(err) } + finished++ + if (subwalks === finished) { + return callback() + } } - } - for (var i = 0; i < node.links.length; i++) { - var link = node.links[i] - if (i >= fanout) { - // item link - walkerFn(link, i, h.data) - } else { - // fanout link - logInternalKey(link.hash) - if (!emptyKey.equals(link.hash)) { - subwalks++ - dagS.get(link.hash, (err, obj) => { - if (err) { return callback(err) } - this.walkItems(obj, walkerFn, logInternalKey, walkCb) - }) + for (var i = 0; i < node.links.length; i++) { + var link = node.links[i] + if (i >= fanout) { + // item link + walkerFn(link, i, h.data) + } else { + // fanout link + logInternalKey(link.hash) + if (!emptyKey.equals(link.hash)) { + subwalks++ + dagS.get(link.hash, (err, obj) => { + if (err) { return callback(err) } + pinnerUtils.walkItems(obj, walkerFn, logInternalKey, walkCb) + }) + } } } + if (!subwalks) { + return callback() + } } - if (!subwalks) { - return callback() - } - } -} - -function readHeader (rootNode) { - // rootNode.data should be a buffer of the format: - // < varint(headerLength) | header | itemData... > - var rootData = rootNode.data - var hdrLength = varint.decode(rootData) - var vBytes = varint.decode.bytes - if (vBytes <= 0) { - return { err: 'Invalid Set header length' } - } - if (vBytes + hdrLength > rootData.length) { - return { err: 'Impossibly large set header length' } - } - var hdrSlice = rootData.slice(vBytes, hdrLength + vBytes) - var header = pb.Set.decode(hdrSlice) - if (header.version !== 1) { - return { err: 'Unsupported Set version: ' + header.version } - } - if (header.fanout > rootNode.links.length) { - return { err: 'Impossibly large fanout' } - } - return { - header: header, - data: rootData.slice(hdrLength + vBytes) } + return pinnerUtils } diff --git a/src/core/ipfs/pinner.js b/src/core/ipfs/pinner.js index 67a7605669..acc286646a 100644 --- a/src/core/ipfs/pinner.js +++ b/src/core/ipfs/pinner.js @@ -3,12 +3,46 @@ const bs58 = require('bs58') const mDAG = require('ipfs-merkle-dag') const DAGNode = mDAG.DAGNode -const PinnerUtils = require('./pinner-utils') +const pinnerUtils = require('./pinner-utils') + +function KeySet (keys) { + // Buffers with identical data are still different objects, so + // they need to be cast to strings to prevent duplicates in Sets + this.keys = new Set() + this.keyStrings = new Set() + this.add = (key) => { + if (!this.has(key)) { + var keyString = bs58.encode(key).toString() + this.keyStrings.add(keyString) + this.keys.add(key) + } + } + this.delete = (key) => { + var keyString = bs58.encode(key).toString() + this.keyStrings.delete(keyString) + this.keys.delete(key) + } + this.clear = () => { + this.keys.clear() + this.keyStrings.clear() + } + this.has = (key) => { + return this.keyStrings.has(bs58.encode(key).toString()) + } + this.toArray = () => { + return Array.from(this.keys) + } + this.toStringArray = () => { + return Array.from(this.keyStrings) + } + keys = keys || [] + keys.forEach(this.add) +} module.exports = function (self) { - const directPins = new Set() - const recursivePins = new Set() - var internalPins = new Set() + var directPins = new KeySet() + var recursivePins = new KeySet() + var internalPins = new KeySet() // repo.datastore makes a subfolder using first 8 chars of key, so // pin data will be saved under /blocks/internal/internal_pins.data @@ -26,17 +60,16 @@ module.exports = function (self) { all: 'all' }, - _clear: () => { + clear: () => { directPins.clear() recursivePins.clear() + internalPins.clear() }, - utils: new PinnerUtils(dagS), + utils: pinnerUtils(dagS), pin: (obj, recursive, callback) => { // callback (err) - - // TODO locking if (typeof recursive === 'function') { callback = recursive recursive = true @@ -71,8 +104,6 @@ module.exports = function (self) { unpin: (multihash, recursive, callback) => { // callback (err) - - // TODO locking if (typeof recursive === 'function') { callback = recursive recursive = true @@ -100,8 +131,6 @@ module.exports = function (self) { isPinned: (multihash, callback) => { // callback (err, pinned, reason) - - // TODO locking pinner.isPinnedWithType(multihash, pinner.types.all, callback) }, @@ -177,24 +206,33 @@ module.exports = function (self) { }, directKeys: () => { - return Array.from(directPins) + return directPins.toArray() + }, + + directKeyStrings: () => { + return directPins.toStringArray() }, recursiveKeys: () => { - return Array.from(recursivePins) + return recursivePins.toArray() }, - internalPins: () => { - return Array.from(internalPins) + recursiveKeyStrings: () => { + return recursivePins.toStringArray() + }, + + internalKeys: () => { + return internalPins.toArray() + }, + + internalKeyStrings: () => { + return internalPins.toStringArray() }, // encodes and writes pinner key sets to the datastore flush: (callback) => { // callback (err, root) - - // TODO locking - - var newInternalPins = new Set() + var newInternalPins = new KeySet() var logInternalKey = (multihash) => { newInternalPins.add(multihash) } @@ -229,6 +267,32 @@ module.exports = function (self) { }) }) }) + }, + + load: (callback) => { + repo.datastore.get(pinDataStoreKey, (err, pseudoblock) => { + if (err) { return callback(err) } + var rootBytes = pseudoblock.data + var root = (new DAGNode()).unMarshal(rootBytes) + var newInternalPins = new KeySet([root.multihash()]) + var logInternalKey = (multihash) => { + newInternalPins.add(multihash) + } + pinner.utils.loadSet( + root, pinner.types.recursive, logInternalKey, (err, keys) => { + if (err) { return callback(err) } + recursivePins = new KeySet(keys) + pinner.utils.loadSet( + root, pinner.types.direct, logInternalKey, (err, keys) => { + if (err) { return callback(err) } + directPins = new KeySet(keys) + internalPins = newInternalPins + return callback() + } + ) + } + ) + }) } } return pinner diff --git a/test/core-tests/test-pinner.js b/test/core-tests/test-pinner.js index 9d318d0c74..de0cfbc653 100644 --- a/test/core-tests/test-pinner.js +++ b/test/core-tests/test-pinner.js @@ -9,6 +9,8 @@ const mDAG = require('ipfs-merkle-dag') const DAGNode = mDAG.DAGNode const createTempRepo = require('../utils/temp-repo') +const emptyKeyHash = 'QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n' + describe('pinner', function () { this.timeout(20000) var ipfs @@ -16,6 +18,10 @@ describe('pinner', function () { var pinner var Obj = {} + function encode (key) { + return bs58.encode(key).toString() + } + before((done) => { repo = createTempRepo() ipfs = new IPFS(repo) @@ -54,7 +60,7 @@ describe('pinner', function () { }) beforeEach((done) => { - pinner._clear() + pinner.clear() done() }) @@ -93,9 +99,7 @@ describe('pinner', function () { expect(err).to.not.exist expect(pinned).to.be.true // indirect pin 'reason' is the b58 hash of recursive root pin - expect(reason).to.equal( - bs58.encode(Obj.B.multihash()).toString() - ) + expect(reason).to.equal(encode(Obj.B.multihash())) done() }) }) @@ -110,7 +114,7 @@ describe('pinner', function () { expect(err).to.not.exist // direct pin B should fail pinner.pin(Obj.B, false, (err) => { - expect(err).to.equal(bs58.encode(Obj.B.multihash()).toString() + + expect(err).to.equal(encode(Obj.B.multihash()) + ' already pinned recursively') // pinning recursively again should succeed pinner.pin(Obj.B, (err) => { @@ -159,8 +163,8 @@ describe('pinner', function () { }) it('unpins recursively by default', (done) => { - const bs58A = bs58.encode(Obj.A.multihash()).toString() - const bs58B = bs58.encode(Obj.B.multihash()).toString() + const bs58A = encode(Obj.A.multihash()) + const bs58B = encode(Obj.B.multihash()) // recursive pin B which has children A and C pinner.pin(Obj.B, (err) => { expect(err).to.not.exist @@ -187,8 +191,25 @@ describe('pinner', function () { }) }) - describe('flush', () => { - it('encodes and writes pinner key sets to the datastore', (done) => { + describe('flush and load (roundtrip)', () => { + it('writes pinned keys to datastore and reads them back', (done) => { + var checkInternal = (root) => { + var internal = pinner.internalKeyStrings() + expect(internal.length).to.equal(4) + internal = new Set(internal) + expect(internal.has(emptyKeyHash)).to.be.true + expect(internal.has(encode(root.multihash()))).to.be.true + expect(root.links.length).to.equal(2) + root.links.forEach((link) => { + expect(internal.has(encode(link.hash))).to.be.true + }) + } + var checkClear = () => { + expect(pinner.directKeys().length).to.equal(0) + expect(pinner.recursiveKeys().length).to.equal(0) + expect(pinner.internalKeys().length).to.equal(0) + } + checkClear() // recursive pin pinner.pin(Obj.B, (err) => { expect(err).to.not.exist @@ -197,18 +218,27 @@ describe('pinner', function () { expect(err).to.not.exist // save to datastore pinner.flush((err, root) => { + // internalPins should have a recursive root node, a direct root + // node, a root header node with links to both, and an empty node expect(err).to.not.exist - // test roundtrip by loading the root sets - pinner.utils.loadSet(root, 'recursive', () => {}, (err, keys) => { + checkInternal(root) + // clear from memory + pinner.clear() + checkClear() + // load from datastore + pinner.load((err) => { expect(err).to.not.exist - expect(keys.length).to.equal(1) - expect(keys[0].equals(Obj.B.multihash())) - pinner.utils.loadSet(root, 'direct', () => {}, (err, keys) => { - expect(err).to.not.exist - expect(keys.length).to.equal(1) - expect(keys[0].equals(Obj.E.multihash())) - done() - }) + // Obj.E should be restored as a direct pin + var direct = pinner.directKeyStrings() + expect(direct.length).to.equal(1) + expect(direct[0]).to.equal(encode(Obj.E.multihash())) + // Obj.B should be restored as a recursive pin + var recursive = pinner.recursiveKeyStrings() + expect(recursive.length).to.equal(1) + expect(recursive[0]).to.equal(encode(Obj.B.multihash())) + // Internal should be the same as before + checkInternal(root) + done() }) }) })