Skip to content
This repository has been archived by the owner on Feb 12, 2024. It is now read-only.

Commit

Permalink
feat: gc uses multihashes instead CIDs
Browse files Browse the repository at this point in the history
  • Loading branch information
AuHau committed Oct 9, 2019
1 parent 6d6e8c6 commit 57de7b5
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 73 deletions.
62 changes: 22 additions & 40 deletions src/core/components/pin/gc.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
const CID = require('cids')
const base32 = require('base32.js')
const callbackify = require('callbackify')
const { cidToString } = require('../../../utils/cid')
const log = require('debug')('ipfs:gc')
const { default: Queue } = require('p-queue')
// TODO: Use exported key from root when upgraded to ipfs-mfs@>=13
Expand Down Expand Up @@ -47,15 +46,16 @@ module.exports = function gc (self) {
})
}

// Get Set of CIDs of blocks to keep
// Get Set of multihashes of blocks to keep
async function createMarkedSet (ipfs) {
const output = new Set()

const addPins = pins => {
log(`Found ${pins.length} pinned blocks`)

pins.forEach(pin => {
output.add(cidToString(new CID(pin), { base: 'base32' }))
const cid = new CID(pin)
output.add(base32.encode(cid.multihash))
})
}

Expand Down Expand Up @@ -91,7 +91,6 @@ async function getDescendants (ipfs, cid) {
const refs = await ipfs.refs(cid, { recursive: true })
const cids = [cid, ...refs.map(r => new CID(r.ref))]
log(`Found ${cids.length} MFS blocks`)
// log(' ' + cids.join('\n '))

return cids
}
Expand All @@ -100,54 +99,37 @@ async function getDescendants (ipfs, cid) {
async function deleteUnmarkedBlocks (ipfs, markedSet, blockKeys) {
// Iterate through all blocks and find those that are not in the marked set
// The blockKeys variable has the form [ { key: Key() }, { key: Key() }, ... ]
const unreferenced = []
const result = []
let blockCounter = 0

const queue = new Queue({
concurrency: BLOCK_RM_CONCURRENCY
})

for await (const { key: k } of blockKeys) {
try {
const cid = dsKeyToCid(k)
const b32 = cid.toV1().toString('base32')
if (!markedSet.has(b32)) {
unreferenced.push(cid)

queue.add(async () => {
const res = {
cid
}

try {
await ipfs._repo.blocks.delete(cid)
} catch (err) {
res.err = new Error(`Could not delete block with CID ${cid}: ${err.message}`)
}

result.push(res)
})
}
} catch (err) {
const msg = `Could not convert block with key '${k}' to CID`
log(msg, err)
result.push({ err: new Error(msg + `: ${err.message}`) })
blockCounter++
const multihashString = k.toString().substr(1)
if (!markedSet.has(multihashString)) {
queue.add(async () => {
const res = {
multihash: multihashString
}

try {
await ipfs._repo.blocks.delete(Buffer.from(multihashString))
} catch (err) {
res.err = new Error(`Could not delete block with multihash ${multihashString}: ${err.message}`)
}

result.push(res)
})
}
}

await queue.onIdle()

log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blockKeys.length} blocks. ` +
`Deleted ${unreferenced.length} blocks.`)
log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blockCounter} blocks. ` +
`Deleted ${result.filter(res => res.err === undefined).length} blocks.`)

return result
}

// TODO: Use exported utility when upgrade to ipfs-repo@>=0.27.1
// https://github.com/ipfs/js-ipfs-repo/pull/206
function dsKeyToCid (key) {
// Block key is of the form /<base32 encoded string>
const decoder = new base32.Decoder()
const buff = decoder.write(key.toString().slice(1)).finalize()
return new CID(Buffer.from(buff))
}
2 changes: 1 addition & 1 deletion src/http/api/resources/repo.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ exports.gc = {
const response = filtered.map(r => {
return {
Err: r.err && r.err.message,
Key: !r.err && { '/': r.cid.toString() }
Key: !r.err && r.multihash
}
})
return h.response(response)
Expand Down
71 changes: 39 additions & 32 deletions test/core/gc.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ const IPFSFactory = require('ipfsd-ctl')
const pEvent = require('p-event')
const env = require('ipfs-utils/src/env')
const IPFS = require('../../src/core')
const CID = require('cids')
const base32 = require('base32.js')
const { Errors } = require('interface-datastore')

// We need to detect when a readLock or writeLock is requested for the tests
Expand Down Expand Up @@ -90,17 +92,17 @@ describe('gc', function () {
name: 'add',
add1: () => ipfs.add(fixtures[0], { pin: false }),
add2: () => ipfs.add(fixtures[1], { pin: false }),
resToCid: (res) => res[0].hash
resToMultihash: (res) => base32.encode(new CID(res[0].hash).multihash)
}, {
name: 'object put',
add1: () => ipfs.object.put({ Data: 'obj put 1', Links: [] }),
add2: () => ipfs.object.put({ Data: 'obj put 2', Links: [] }),
resToCid: (res) => res.toString()
resToMultihash: (res) => base32.encode(res.multihash)
}, {
name: 'block put',
add1: () => ipfs.block.put(Buffer.from('block put 1'), null),
add2: () => ipfs.block.put(Buffer.from('block put 2'), null),
resToCid: (res) => res.cid.toString()
resToMultihash: (res) => base32.encode(res.cid.multihash)
}]

describe('locks', function () {
Expand All @@ -122,9 +124,9 @@ describe('gc', function () {
await gcStarted
const add2 = test.add2()

const deleted = (await gc).map(i => i.cid.toString())
const add1Res = test.resToCid(await add1)
const add2Res = test.resToCid(await add2)
const deleted = (await gc).map(i => i.multihash)
const add1Res = test.resToMultihash(await add1)
const add2Res = test.resToMultihash(await add2)

// Should have garbage collected blocks from first add, because GC should
// have waited for first add to finish
Expand Down Expand Up @@ -152,9 +154,9 @@ describe('gc', function () {
await gcStarted
const add2 = ipfs.add(fixtures[3], { pin: true })

const deleted = (await gc).map(i => i.cid.toString())
const add1Res = (await add1)[0].hash
const add2Res = (await add2)[0].hash
const deleted = (await gc).map(i => i.multihash)
const add1Res = base32.encode(new CID((await add1)[0].hash).multihash)
const add2Res = base32.encode(new CID((await add2)[0].hash).multihash)

// Should not have garbage collected blocks from first add, because GC should
// have waited for first add + pin to finish (protected by pin)
Expand All @@ -168,7 +170,9 @@ describe('gc', function () {
it('garbage collection should wait for pending block rm to finish', async () => {
// Add two blocks so that we can remove them
const cid1 = (await ipfs.block.put(Buffer.from('block to rm 1'), null)).cid
const cid1Multihash = base32.encode(cid1.multihash)
const cid2 = (await ipfs.block.put(Buffer.from('block to rm 2'), null)).cid
const cid2Multihash = base32.encode(cid2.multihash)

// Remove first block from IPFS
// Note: block rm will take a write lock
Expand All @@ -185,33 +189,34 @@ describe('gc', function () {
await gcStarted
const rm2 = ipfs.block.rm(cid2)

const deleted = (await gc).map(i => i.cid.toString())
await rm1

// Second rm should fail because GC has already removed that block
try {
await rm2
} catch (err) {
expect(err.code).eql(Errors.dbDeleteFailedError().code)
}
const deleted = (await gc).map(i => i.multihash)
const rm1Out = await rm1
expect(rm1Out[0]).to.not.have.property('error')

// Confirm second block has been removed
const localRefs = (await ipfs.refs.local()).map(r => r.ref)
expect(localRefs).not.includes(cid2.toString())
const localMultihashes = (await ipfs.refs.local()).map(r => base32.encode(new CID(r.ref).multihash))
expect(localMultihashes).not.includes(cid2Multihash)

// Second rm should fail because GC has already removed that block
expect((await rm2)[0])
.to.have.property('error')
.that.has.property('code').that.equal(Errors.dbDeleteFailedError().code)

// Should not have garbage collected block from first block put, because
// GC should have waited for first rm (removing first block put) to finish
expect(deleted).not.includes(cid1.toString())
expect(deleted).not.includes(cid1Multihash)

// Should have garbage collected block from second block put, because GC
// should have completed before second rm (removing second block put)
expect(deleted).includes(cid2.toString())
expect(deleted).includes(cid2Multihash)
})

it('garbage collection should wait for pending pin add to finish', async () => {
// Add two blocks so that we can pin them
const cid1 = (await ipfs.block.put(Buffer.from('block to pin add 1'), null)).cid
const cid2 = (await ipfs.block.put(Buffer.from('block to pin add 2'), null)).cid
const cid1 = (await ipfs.block.put(Buffer.from('block to test pin add 1'), null)).cid
const cid2 = (await ipfs.block.put(Buffer.from('block to test pin add 2'), null)).cid
const cid1Multihash = base32.encode(cid1.multihash)
const cid2Multihash = base32.encode(cid2.multihash)

// Pin first block
// Note: pin add will take a read lock
Expand All @@ -221,30 +226,32 @@ describe('gc', function () {
// Once pin lock has been requested, start GC
await pinLockRequested
const gc = ipfs.repo.gc()
const deleted = (await gc).map(i => i.cid.toString())
const deleted = (await gc).map(i => i.multihash)
await pin1

// TODO: Adding pin for removed block never returns, which means the lock
// never gets released
// const pin2 = ipfs.pin.add(cid2)

// Confirm second second block has been removed
const localRefs = (await ipfs.refs.local()).map(r => r.ref)
expect(localRefs).not.includes(cid2.toString())
const localMultihashes = (await ipfs.refs.local()).map(r => base32.encode(new CID(r.ref).multihash))
expect(localMultihashes).not.includes(cid2Multihash)

// Should not have garbage collected block from first block put, because
// GC should have waited for pin (protecting first block put) to finish
expect(deleted).not.includes(cid1.toString())
expect(deleted).not.includes(cid1Multihash)

// Should have garbage collected block from second block put, because GC
// should have completed before second pin
expect(deleted).includes(cid2.toString())
expect(deleted).includes(cid2Multihash)
})

it('garbage collection should wait for pending pin rm to finish', async () => {
// Add two blocks so that we can pin them
const cid1 = (await ipfs.block.put(Buffer.from('block to pin rm 1'), null)).cid
const cid2 = (await ipfs.block.put(Buffer.from('block to pin rm 2'), null)).cid
const cid1Multihash = base32.encode(cid1.multihash)
const cid2Multihash = base32.encode(cid2.multihash)

// Pin blocks
await ipfs.pin.add(cid1)
Expand All @@ -265,17 +272,17 @@ describe('gc', function () {
await gcStarted
const pinRm2 = ipfs.pin.rm(cid2)

const deleted = (await gc).map(i => i.cid.toString())
const deleted = (await gc).map(i => i.multihash)
await pinRm1
await pinRm2

// Should have garbage collected block from first block put, because
// GC should have waited for pin rm (unpinning first block put) to finish
expect(deleted).includes(cid1.toString())
expect(deleted).includes(cid1Multihash)

// Should not have garbage collected block from second block put, because
// GC should have completed before second block was unpinned
expect(deleted).not.includes(cid2.toString())
expect(deleted).not.includes(cid2Multihash)
})
})
})

0 comments on commit 57de7b5

Please sign in to comment.