From a342a15ddebdfe8cdb50c404d7d30954473b6046 Mon Sep 17 00:00:00 2001 From: Volker Mische Date: Fri, 12 Mar 2021 14:11:53 +0100 Subject: [PATCH] feat: add new BlockService This commits adds the BlockService directly to IPFS. It takes js-multiformats CIDs as input and also returns new style blocks and no longer IpldBlocks. It currently converts those CIDs and blocks into the legacy format, in order to be compatible with the current ipfs-repo and ipfs-bitswap implementations. In the future this kind of conversion will disappear, once the full stack is using js-multiformats. --- packages/ipfs-core-utils/src/as-legacy-cid.js | 29 +++ packages/ipfs-core/package.json | 3 + packages/ipfs-core/src/block-service.js | 204 +++++++++++++++++ packages/ipfs-core/test/block-service.spec.js | 208 ++++++++++++++++++ .../ipfs-message-port-server/src/block.js | 3 + 5 files changed, 447 insertions(+) create mode 100644 packages/ipfs-core-utils/src/as-legacy-cid.js create mode 100644 packages/ipfs-core/src/block-service.js create mode 100644 packages/ipfs-core/test/block-service.spec.js diff --git a/packages/ipfs-core-utils/src/as-legacy-cid.js b/packages/ipfs-core-utils/src/as-legacy-cid.js new file mode 100644 index 0000000000..a927db39ca --- /dev/null +++ b/packages/ipfs-core-utils/src/as-legacy-cid.js @@ -0,0 +1,29 @@ +'use strict' + +const LegacyCID = require('cids') +const { CID } = require('multiformats') +const errCode = require('err-code') + +/** + * Makes sure a CID is a legacy one. + * + * If it is already a legacy one, it is returned, if it is a new CID, it's + * converted to a legacy one. + * + * @param {CID|LegacyCID} cid - The object to do the transformation on + */ +const asLegacyCid = (cid) => { + if (LegacyCID.isCID(cid)) { + return cid + } + + const newCid = CID.asCID(cid) + if (newCid) { + const { version, code, multihash } = newCid + return new LegacyCID(version, code, multihash.bytes) + } else { + throw errCode(new Error('invalid CID'), 'ERR_INVALID_CID') + } +} + +module.exports = asLegacyCid diff --git a/packages/ipfs-core/package.json b/packages/ipfs-core/package.json index e3c36de3d4..8cd2b782db 100644 --- a/packages/ipfs-core/package.json +++ b/packages/ipfs-core/package.json @@ -88,6 +88,7 @@ "it-all": "^1.0.4", "it-first": "^1.0.4", "it-last": "^1.0.4", + "it-map": "^1.0.5", "it-pipe": "^1.1.0", "libp2p": "^0.30.7", "libp2p-bootstrap": "^0.12.1", @@ -109,6 +110,7 @@ "multiaddr-to-uri": "^6.0.0", "multibase": "^4.0.0", "multicodec": "^3.0.1", + "multiformats": "^4.0.0", "multihashing-async": "^2.0.1", "native-abort-controller": "^1.0.3", "p-queue": "^6.6.1", @@ -125,6 +127,7 @@ "ipfsd-ctl": "^7.2.0", "ipld-git": "^0.6.1", "iso-url": "^1.0.0", + "lodash.range": "^3.2.0", "nanoid": "^3.1.12", "rimraf": "^3.0.2", "sinon": "^9.0.3" diff --git a/packages/ipfs-core/src/block-service.js b/packages/ipfs-core/src/block-service.js new file mode 100644 index 0000000000..f9c18ee122 --- /dev/null +++ b/packages/ipfs-core/src/block-service.js @@ -0,0 +1,204 @@ +'use strict' + +const errCode = require('err-code') +const IpldBlock = require('ipld-block') +const map = require('it-map') +const { CID } = require('multiformats') + +const asLegacyCid = require('ipfs-core-utils/src/as-legacy-cid') + +/** + * @typedef {import('ipfs-core-types/src/bitswap').Bitswap} BitSwap + * @typedef {import('ipfs-repo')} IPFSRepo + * + * @typedef {object} Block + * @property {Uint8Array} bytes + * @property {CID} cid + */ + +/** + * BlockService is a hybrid block datastore. It stores data in a local + * datastore and may retrieve data from a remote Exchange. + * It uses an internal `datastore.Datastore` instance to store values. + */ +class BlockService { + /** + * Create a new BlockService + * + * @param {IPFSRepo} ipfsRepo + */ + constructor (ipfsRepo) { + this._repo = ipfsRepo + this._bitswap = null + } + + /** + * Add a bitswap instance that communicates with the + * network to retreive blocks that are not in the local store. + * + * If the node is online all requests for blocks first + * check locally and afterwards ask the network for the blocks. + * + * @param {BitSwap} bitswap + */ + setExchange (bitswap) { + this._bitswap = bitswap + } + + /** + * Go offline, i.e. drop the reference to bitswap. + */ + unsetExchange () { + this._bitswap = null + } + + /** + * Is the blockservice online, i.e. is bitswap present. + */ + hasExchange () { + return this._bitswap !== null + } + + /** + * Put a block to the underlying datastore. + * + * @param {Block} block + * @param {object} [options] - Options is an object with the following properties + * @param {AbortSignal} [options.signal] - A signal that can be used to abort any long-lived operations that are started as a result of this operation + * @returns {Promise} + */ + async put (block, options) { + const legacyBlock = new IpldBlock(block.bytes, asLegacyCid(block.cid)) + + if (this._bitswap !== null) { + await this._bitswap.put(legacyBlock, options) + } else { + await this._repo.blocks.put(legacyBlock, options) + } + return block + } + + /** + * Put a multiple blocks to the underlying datastore. + * + * @param {AsyncIterable | Iterable} blocks + * @param {object} [options] - Options is an object with the following properties + * @param {AbortSignal} [options.signal] - A signal that can be used to abort any long-lived operations that are started as a result of this operation + * @returns {AsyncIterable} + */ + putMany (blocks, options) { + const legacyBlocks = map(blocks, (block) => { + return new IpldBlock(block.bytes, asLegacyCid(block.cid)) + }) + + let result + if (this._bitswap !== null) { + result = this._bitswap.putMany(legacyBlocks, options) + } else { + result = this._repo.blocks.putMany(legacyBlocks, options) + } + + return map(result, (legacyBlock) => { + return { + cid: CID.decode(legacyBlock.cid.bytes), + bytes: legacyBlock.data + } + }) + } + + /** + * Get a block by cid. + * + * @param {CID} cid + * @param {object} [options] - Options is an object with the following properties + * @param {AbortSignal} [options.signal] - A signal that can be used to abort any long-lived operations that are started as a result of this operation + * @returns {Promise} + */ + async get (cid, options) { + const legacyCid = asLegacyCid(cid) + + let legacyBlock + if (this._bitswap !== null) { + legacyBlock = await this._bitswap.get(legacyCid, options) + } else { + legacyBlock = await this._repo.blocks.get(legacyCid, options) + } + + return { + cid: CID.decode(legacyBlock.cid.bytes), + bytes: legacyBlock.data + } + } + + /** + * Get multiple blocks back from an array of cids. + * + * @param {AsyncIterable | Iterable} cids + * @param {object} [options] - Options is an object with the following properties + * @param {AbortSignal} [options.signal] - A signal that can be used to abort any long-lived operations that are started as a result of this operation + * @returns {AsyncIterable} + */ + getMany (cids, options) { + if (!Array.isArray(cids)) { + throw new Error('first arg must be an array of cids') + } + + const legacyCids = map(cids, asLegacyCid) + + let result + if (this._bitswap !== null) { + result = this._bitswap.getMany(legacyCids, options) + } else { + result = this._repo.blocks.getMany(legacyCids, options) + } + + return map(result, (legacyBlock) => { + return { + cid: CID.decode(legacyBlock.cid.bytes), + bytes: legacyBlock.data + } + }) + } + + /** + * Delete a block from the blockstore. + * + * @param {CID} cid + * @param {object} [options] - Options is an object with the following properties + * @param {AbortSignal} [options.signal] - A signal that can be used to abort any long-lived operations that are started as a result of this operation + */ + async delete (cid, options) { + const legacyCid = asLegacyCid(cid) + + if (!await this._repo.blocks.has(legacyCid)) { + throw errCode(new Error('blockstore: block not found'), 'ERR_BLOCK_NOT_FOUND') + } + + return this._repo.blocks.delete(legacyCid, options) + } + + /** + * Delete multiple blocks from the blockstore. + * + * @param {AsyncIterable | Iterable} cids + * @param {object} [options] - Options is an object with the following properties + * @param {AbortSignal} [options.signal] - A signal that can be used to abort any long-lived operations that are started as a result of this operation + */ + deleteMany (cids, options) { + const repo = this._repo + + const existingCids = map(cids, async (cid) => { + const legacyCid = asLegacyCid(cid) + + if (!await repo.blocks.has(legacyCid)) { + throw errCode(new Error('blockstore: block not found'), 'ERR_BLOCK_NOT_FOUND') + } + + return legacyCid + }) + + return this._repo.blocks.deleteMany(existingCids, options) + } +} + +module.exports = BlockService diff --git a/packages/ipfs-core/test/block-service.spec.js b/packages/ipfs-core/test/block-service.spec.js new file mode 100644 index 0000000000..84e4331612 --- /dev/null +++ b/packages/ipfs-core/test/block-service.spec.js @@ -0,0 +1,208 @@ +/* eslint-env mocha */ +'use strict' + +const { expect } = require('aegir/utils/chai') + +const IpldBlock = require('ipld-block') +const range = require('lodash.range') +const all = require('it-all') +const rawCodec = require('multiformats/codecs/raw') +const { sha256 } = require('multiformats/hashes/sha2') +const CID = require('multiformats/cid') +const uint8ArrayFromString = require('uint8arrays/from-string') +const drain = require('it-drain') +const asLegacyCid = require('ipfs-core-utils/src/as-legacy-cid') + +// This gets replaced by `create-repo-browser.js` in the browser +const createTempRepo = require('./utils/create-repo-nodejs.js') + +/** + * @typedef {import('ipfs-repo')} IPFSRepo + */ + +const BlockService = require('../src/block-service.js') + +// Creates a new block from string. It hashes the data and creates a CIDv1 +// with RAW codec. +const blockFromString = async (data) => { + const bytes = uint8ArrayFromString(data) + const hash = await sha256.digest(bytes) + return { + cid: CID.create(1, rawCodec.code, hash), + bytes + } +} + +describe('block-service', () => { + /** @type {IPFSRepo} */ + const repo = createTempRepo() + + /** @type {BlockService} */ + let bs + /** @type {Block[]} */ + let testBlocks + + before(async () => { + await repo.init({}) + await repo.open() + bs = new BlockService(repo) + + const data = [ + '1', + '2', + '3', + 'A random data block' + ] + + testBlocks = await Promise.all(data.map(async (d) => { + return blockFromString(d) + })) + }) + + describe('fetch only from local Repo', () => { + it('store and get a block', async () => { + const b = testBlocks[3] + + await bs.put(b) + const res = await bs.get(b.cid) + expect(res).to.eql(b) + }) + + it('get a non stored yet block', async () => { + const b = testBlocks[2] + + try { + await bs.get(b.cid) + } catch (err) { + expect(err).to.exist() + } + }) + + it('store many blocks', async () => { + await drain(bs.putMany(testBlocks)) + + expect( + await Promise.all( + testBlocks.map(b => bs.get(b.cid)) + ) + ).to.deep.equal( + testBlocks + ) + }) + + it('get many blocks through .get', async () => { + const blocks = await Promise.all(testBlocks.map(b => bs.get(b.cid))) + expect(blocks).to.eql(testBlocks) + }) + + it('get many blocks through .getMany', async () => { + const cids = testBlocks.map(b => b.cid) + const blocks = await all(bs.getMany(cids)) + expect(blocks).to.eql(testBlocks) + }) + + it('delete a block', async () => { + const block = await blockFromString('Will not live that much') + + await bs.put(block) + await bs.delete(block.cid) + const res = await bs._repo.blocks.has(asLegacyCid(block.cid)) + expect(res).to.be.eql(false) + }) + + it('does not delete a block it does not have', async () => { + const block = await blockFromString('Will not live that much ' + Date.now()) + + await bs.delete(block.cid) + .then( + () => expect.fail('Should have thrown'), + (err) => expect(err).to.have.property('code', 'ERR_BLOCK_NOT_FOUND') + ) + }) + + it('deletes lots of blocks', async () => { + const block = await blockFromString('Will not live that much') + + await bs.put(block) + await drain(bs.deleteMany([block.cid])) + const res = await bs._repo.blocks.has(asLegacyCid(block.cid)) + expect(res).to.be.false() + }) + + it('does not delete a blocks it does not have', async () => { + const block = await blockFromString('Will not live that much ' + Date.now()) + + await expect(drain(bs.deleteMany([block.cid]))).to.eventually.be.rejected().with.property('code', 'ERR_BLOCK_NOT_FOUND') + }) + + it('stores and gets lots of blocks', async function () { + this.timeout(20 * 1000) + + const blocks = await Promise.all(range(200).map(async (i) => { + return blockFromString(`hello-${i}-${Math.random()}`) + })) + + await drain(bs.putMany(blocks)) + const res = await Promise.all(blocks.map(b => bs.get(b.cid))) + expect(res).to.be.eql(blocks) + }) + + it('sets and unsets exchange', () => { + bs = new BlockService(repo) + bs.setExchange({}) + expect(bs.hasExchange()).to.be.eql(true) + bs.unsetExchange() + expect(bs.hasExchange()).to.be.eql(false) + }) + }) + + describe('fetch through Bitswap (has exchange)', () => { + beforeEach(() => { + bs = new BlockService(repo) + }) + + it('hasExchange returns true when online', () => { + bs.setExchange({}) + expect(bs.hasExchange()).to.be.eql(true) + }) + + it('retrieves a block through bitswap', async () => { + // returns a block with a value equal to its key + const bitswap = { + /** + * @param {CID} cid + */ + get (cid) { + return new IpldBlock(uint8ArrayFromString('secret'), cid) + } + } + + bs.setExchange(bitswap) + + const block = await blockFromString('secret') + const result = await bs.get(block.cid) + + expect(result.bytes).to.be.eql(block.bytes) + }) + + it('puts the block through bitswap', async () => { + /** @type {Block[]} */ + const puts = [] + const bitswap = { + /** + * @param {Block} block + */ + put (block) { + puts.push(block) + } + } + bs.setExchange(bitswap) + + const block = await blockFromString('secret sauce') + + await bs.put(block) + + expect(puts).to.have.length(1) + }) + }) +}) diff --git a/packages/ipfs-message-port-server/src/block.js b/packages/ipfs-message-port-server/src/block.js index 0e19ec581a..bc2b64fe53 100644 --- a/packages/ipfs-message-port-server/src/block.js +++ b/packages/ipfs-message-port-server/src/block.js @@ -45,6 +45,7 @@ exports.BlockService = class BlockService { const block = await this.ipfs.block.get(cid, query) /** @type {Transferable[]} */ const transfer = [] + // @ts-ignore TODO vmx 2021-03-12 fix this return { transfer, block: encodeBlock(block, transfer) } } @@ -80,6 +81,7 @@ exports.BlockService = class BlockService { }) } else { const block = decodeBlock(input) + // @ts-ignore TODO vmx 2021-03-12 fix this result = await this.ipfs.block.put(block, { ...query, cid: undefined @@ -88,6 +90,7 @@ exports.BlockService = class BlockService { /** @type {Transferable[]} */ const transfer = [] + // @ts-ignore TODO vmx 2021-03-12 fix this return { transfer, block: encodeBlock(result, transfer) } }