Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Migration of Blockstore to use multihash instead of CID as key #2522

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,11 @@
"ipfs-bitswap": "^0.26.0",
"ipfs-block": "~0.8.1",
"ipfs-block-service": "~0.16.0",
"ipfs-http-client": "^38.2.0",
"ipfs-http-client": "github:ipfs/js-ipfs-http-client#auhau/feat/multihash_keys_in_datastore",
"ipfs-http-response": "~0.3.1",
"ipfs-mfs": "^0.13.0",
"ipfs-multipart": "^0.2.0",
"ipfs-repo": "^0.28.0",
"ipfs-repo": "github:ipfs/js-ipfs-repo#auhau/feat/multihash_keys_in_datastore",
"ipfs-unixfs": "~0.1.16",
"ipfs-unixfs-exporter": "^0.38.0",
"ipfs-unixfs-importer": "^0.40.0",
Expand Down Expand Up @@ -203,7 +203,7 @@
"execa": "^2.0.4",
"form-data": "^2.5.1",
"hat": "0.0.3",
"interface-ipfs-core": "^0.117.2",
"interface-ipfs-core": "github:ipfs/interface-js-ipfs-core#auhau/feat/multihash_keys_in_datastore",
"ipfs-interop": "^0.1.1",
"ipfsd-ctl": "^0.47.2",
"libp2p-websocket-star": "~0.10.2",
Expand Down
14 changes: 12 additions & 2 deletions src/cli/commands/refs-local.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,22 @@ module.exports = {

describe: 'List all local references.',

handler ({ getIpfs, print, resolve }) {
builder (yargs) {
return yargs
.option('multihash', {
desc: 'Shows base32 encoded multihashes instead of reconstructed CIDs',
type: 'boolean',
default: false
})
.epilog('CIDs are reconstructed therefore they might differ from those under which the blocks were originally stored.')
},

handler ({ getIpfs, print, resolve, multihash }) {
resolve((async () => {
const ipfs = await getIpfs()

return new Promise((resolve, reject) => {
const stream = ipfs.refs.localReadableStream()
const stream = ipfs.refs.localReadableStream({ multihash })

stream.on('error', reject)
stream.on('end', resolve)
Expand Down
2 changes: 1 addition & 1 deletion src/cli/commands/repo/gc.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ module.exports = {
if (r.err) {
streamErrors && print(r.err.message, true, true)
} else {
print((quiet ? '' : 'removed ') + r.cid)
print((quiet ? '' : 'removed ') + r.multihash)
}
}
})())
Expand Down
24 changes: 8 additions & 16 deletions src/core/components/files-regular/refs-local-pull-stream.js
Original file line number Diff line number Diff line change
@@ -1,26 +1,18 @@
'use strict'

const CID = require('cids')
const base32 = require('base32.js')
const { keyToCid } = require('ipfs-repo/src/blockstore-utils')
const itToPull = require('async-iterator-to-pull-stream')

module.exports = function (self) {
return () => {
return ({ multihash }) => {
return itToPull((async function * () {
for await (const result of self._repo.blocks.query({ keysOnly: true })) {
yield dsKeyToRef(result.key)
for await (const { key: k } of self._repo.blocks.query({ keysOnly: true })) {
try {
yield { ref: multihash ? k.toString().substr(1) : keyToCid(k).toString() }
} catch (err) {
yield { err: `Could not convert block with key '${k.toString()}' to CID: ${err.message}` }
}
}
})())
}
}

function dsKeyToRef (key) {
try {
// Block key is of the form /<base32 encoded string>
const decoder = new base32.Decoder()
const buff = Buffer.from(decoder.write(key.toString().slice(1)).finalize())
return { ref: new CID(buff).toString() }
} catch (err) {
return { err: `Could not convert block with key '${key}' to CID: ${err.message}` }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
const toStream = require('pull-stream-to-stream')

module.exports = function (self) {
return (ipfsPath, options) => {
return toStream.source(self.refs.localPullStream())
return (options) => {
return toStream.source(self.refs.localPullStream(options))
}
}
11 changes: 9 additions & 2 deletions src/core/components/files-regular/refs-local.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@ const promisify = require('promisify-es6')
const pull = require('pull-stream')

module.exports = function (self) {
return promisify((callback) => {
return promisify((options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}

options = options || {}

pull(
self.refs.localPullStream(),
self.refs.localPullStream(options),
pull.collect((err, values) => {
if (err) {
return callback(err)
Expand Down
62 changes: 22 additions & 40 deletions src/core/components/pin/gc.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const CID = require('cids')
const base32 = require('base32.js')
const callbackify = require('callbackify')
const { cidToString } = require('../../../utils/cid')
const log = require('debug')('ipfs:gc')
const { default: Queue } = require('p-queue')
// TODO: Use exported key from root when upgraded to ipfs-mfs@>=13
Expand Down Expand Up @@ -47,15 +46,16 @@ module.exports = function gc (self) {
})
}

// Get Set of CIDs of blocks to keep
// Get Set of multihashes of blocks to keep
async function createMarkedSet (ipfs) {
const output = new Set()

const addPins = pins => {
log(`Found ${pins.length} pinned blocks`)

pins.forEach(pin => {
output.add(cidToString(new CID(pin), { base: 'base32' }))
const cid = new CID(pin)
output.add(base32.encode(cid.multihash))
})
}

Expand Down Expand Up @@ -91,7 +91,6 @@ async function getDescendants (ipfs, cid) {
const refs = await ipfs.refs(cid, { recursive: true })
const cids = [cid, ...refs.map(r => new CID(r.ref))]
log(`Found ${cids.length} MFS blocks`)
// log(' ' + cids.join('\n '))

return cids
}
Expand All @@ -100,54 +99,37 @@ async function getDescendants (ipfs, cid) {
async function deleteUnmarkedBlocks (ipfs, markedSet, blockKeys) {
// Iterate through all blocks and find those that are not in the marked set
// The blockKeys variable has the form [ { key: Key() }, { key: Key() }, ... ]
const unreferenced = []
const result = []
let blockCounter = 0

const queue = new Queue({
concurrency: BLOCK_RM_CONCURRENCY
})

for await (const { key: k } of blockKeys) {
try {
const cid = dsKeyToCid(k)
const b32 = cid.toV1().toString('base32')
if (!markedSet.has(b32)) {
unreferenced.push(cid)

queue.add(async () => {
const res = {
cid
}

try {
await ipfs._repo.blocks.delete(cid)
} catch (err) {
res.err = new Error(`Could not delete block with CID ${cid}: ${err.message}`)
}

result.push(res)
})
}
} catch (err) {
const msg = `Could not convert block with key '${k}' to CID`
log(msg, err)
result.push({ err: new Error(msg + `: ${err.message}`) })
blockCounter++
const multihashString = k.toString().substr(1)
if (!markedSet.has(multihashString)) {
queue.add(async () => {
const res = {
multihash: multihashString
}

try {
await ipfs._repo.blocks.delete(base32.decode(multihashString))
} catch (err) {
res.err = new Error(`Could not delete block with multihash ${multihashString}: ${err.message}`)
}

result.push(res)
})
}
}

await queue.onIdle()

log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blockKeys.length} blocks. ` +
`Deleted ${unreferenced.length} blocks.`)
log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blockCounter} blocks. ` +
`Deleted ${result.filter(res => res.err === undefined).length} blocks.`)

return result
}

// TODO: Use exported utility when upgrade to ipfs-repo@>=0.27.1
// https://github.com/ipfs/js-ipfs-repo/pull/206
function dsKeyToCid (key) {
// Block key is of the form /<base32 encoded string>
const decoder = new base32.Decoder()
const buff = decoder.write(key.toString().slice(1)).finalize()
return new CID(Buffer.from(buff))
}
15 changes: 12 additions & 3 deletions src/http/api/resources/files-regular.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ exports.parseKey = (request, h) => {
const { arg } = request.query

if (!arg) {
throw Boom.badRequest("Argument 'key' is required")
throw Boom.badRequest('Argument \'key\' is required')
}

const isArray = Array.isArray(arg)
Expand Down Expand Up @@ -235,7 +235,7 @@ exports.add = {
)
.then(() => {
if (!filesParsed) {
throw new Error("File argument 'data' is required.")
throw new Error('File argument \'data\' is required.')
}
})
.catch(err => {
Expand Down Expand Up @@ -342,10 +342,19 @@ exports.refs = {
}

exports.refs.local = {
validate: {
query: Joi.object().keys({
multihash: Joi.boolean().default(false),
}).unknown()
},

// main route handler
handler (request, h) {
const { ipfs } = request.server.app
const source = ipfs.refs.localPullStream()

const multihash = request.query.multihash

const source = ipfs.refs.localPullStream({ multihash })
return sendRefsReplyStream(request, h, 'local refs', source)
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/http/api/resources/repo.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ exports.gc = {
const response = filtered.map(r => {
return {
Err: r.err && r.err.message,
Key: !r.err && { '/': r.cid.toString() }
Key: !r.err && r.multihash
}
})
return h.response(response)
Expand Down
4 changes: 2 additions & 2 deletions test/cli/refs-local.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('refs-local', () => runOnAndOff((thing) => {
const out = await ipfs('refs-local')
const lines = out.split('\n')

expect(lines.includes('QmPkWYfSLCEBLZu7BZt4kigGDMe3cpogMbeVf97gN2xJDN')).to.eql(true)
expect(lines.includes('QmUhUuiTKkkK8J6JZ9zmj8iNHPuNfGYcszgRumzhHBxEEU')).to.eql(true)
expect(lines.includes('bafkreicjl7v3vyyv4zlryihez5xhunqmriry6styhil7z5lhd3r4prnz6y')).to.eql(true)
expect(lines.includes('bafkreidj5bovvm25wszvajfshj7m7m2efpswcs6dsz7giz52ovlquxc4o4')).to.eql(true)
})
}))
Loading