Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: auto-shard based on node size #171

Merged
merged 6 commits into from
Feb 9, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions packages/ipfs-unixfs-exporter/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,14 @@
},
"devDependencies": {
"@types/sinon": "^10.0.0",
"abort-controller": "^3.0.0",
"aegir": "^38.1.2",
"blockstore-core": "^3.0.0",
"crypto-browserify": "^3.12.0",
"delay": "^5.0.0",
"ipfs-unixfs-importer": "^12.0.0",
"it-all": "^2.0.0",
"it-buffer-stream": "^3.0.0",
"it-first": "^2.0.0",
"merge-options": "^3.0.4",
"native-abort-controller": "^1.0.3",
"sinon": "^15.0.0"
},
"browser": {
Expand Down
7 changes: 4 additions & 3 deletions packages/ipfs-unixfs-exporter/test/exporter-sharded.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ describe('exporter sharded', function () {
*/
const createShardWithFiles = async (files) => {
const result = await last(importer(files, block, {
shardSplitThreshold: SHARD_SPLIT_THRESHOLD,
shardSplitThresholdBytes: SHARD_SPLIT_THRESHOLD,
wrapWithDirectory: true
}))

Expand All @@ -60,7 +60,8 @@ describe('exporter sharded', function () {
/** @type {{ [key: string]: { content: Uint8Array, cid?: CID }}} */
const files = {}

for (let i = 0; i < (SHARD_SPLIT_THRESHOLD + 1); i++) {
// needs to result in a block that is larger than SHARD_SPLIT_THRESHOLD bytes
for (let i = 0; i < 100; i++) {
files[`file-${Math.random()}.txt`] = {
content: uint8ArrayConcat(await all(randomBytes(100)))
}
Expand All @@ -71,7 +72,7 @@ describe('exporter sharded', function () {
content: asAsyncIterable(files[path].content)
})), block, {
wrapWithDirectory: true,
shardSplitThreshold: SHARD_SPLIT_THRESHOLD
shardSplitThresholdBytes: SHARD_SPLIT_THRESHOLD
}))

const dirCid = imported.pop()?.cid
Expand Down
1 change: 0 additions & 1 deletion packages/ipfs-unixfs-exporter/test/exporter.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import all from 'it-all'
import last from 'it-last'
import first from 'it-first'
import randomBytes from 'it-buffer-stream'
import { AbortController } from 'native-abort-controller'
import blockApi from './helpers/block.js'
import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ describe('builder: directory sharding', () => {
path: 'a/b',
content: asAsyncIterable(content)
}], block, {
shardSplitThreshold: Infinity // never shard
shardSplitThresholdBytes: Infinity // never shard
}))

expect(nodes.length).to.equal(2)
Expand Down Expand Up @@ -62,7 +62,7 @@ describe('builder: directory sharding', () => {
path: 'a/b',
content: asAsyncIterable(uint8ArrayFromString('i have the best bytes'))
}], block, {
shardSplitThreshold: 0 // always shard
shardSplitThresholdBytes: 0 // always shard
}))

expect(nodes.length).to.equal(2)
Expand All @@ -84,7 +84,7 @@ describe('builder: directory sharding', () => {
path: 'a/b',
content: asAsyncIterable(uint8ArrayFromString(content))
}], block, {
shardSplitThreshold: Infinity // never shard
shardSplitThresholdBytes: Infinity // never shard
}))

const nonShardedHash = nodes[1].cid
Expand Down Expand Up @@ -121,7 +121,7 @@ describe('builder: directory sharding', () => {
path: 'a/b',
content: asAsyncIterable(uint8ArrayFromString(content))
}], block, {
shardSplitThreshold: 0 // always shard
shardSplitThresholdBytes: 0 // always shard
}))

const shardedHash = nodes[1].cid
Expand Down
4 changes: 2 additions & 2 deletions packages/ipfs-unixfs-exporter/test/importer.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,7 @@ strategies.forEach((strategy) => {
const options = {
cidVersion: 1,
// Ensures we use DirSharded for the data below
shardSplitThreshold: 3
shardSplitThresholdBytes: 3
}

const files = await all(importer(inputFiles.map(file => ({
Expand Down Expand Up @@ -941,7 +941,7 @@ strategies.forEach((strategy) => {
}, {
path: '/foo/qux'
}], block, {
shardSplitThreshold: 0
shardSplitThresholdBytes: 0
}))

const nodes = await all(recursive(entries[entries.length - 1].cid, block))
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-unixfs-importer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ The input's file paths and directory structure will be preserved in the [`dag-pb
`options` is an JavaScript option that might include the following keys:

- `wrapWithDirectory` (boolean, defaults to false): if true, a wrapping node will be created
- `shardSplitThreshold` (positive integer, defaults to 1000): the number of directory entries above which we decide to use a sharding directory builder (instead of the default flat one)
- `shardSplitThresholdBytes` (positive integer, defaults to 256KiB): if the serialized node is larger than this it will be converted to a HAMT sharded directory
- `chunker` (string, defaults to `"fixed"`): the chunking strategy. Supports:
- `fixed`
- `rabin`
Expand Down
2 changes: 1 addition & 1 deletion packages/ipfs-unixfs-importer/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@
},
"devDependencies": {
"aegir": "^38.1.2",
"assert": "^2.0.0",
"blockstore-core": "^3.0.0",
"it-buffer-stream": "^3.0.0",
"it-last": "^2.0.0",
"wherearewe": "^2.0.1"
},
"browser": {
Expand Down
66 changes: 48 additions & 18 deletions packages/ipfs-unixfs-importer/src/dir-flat.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { encode, prepare } from '@ipld/dag-pb'
import { UnixFS } from 'ipfs-unixfs'
import Dir from './dir.js'
import { Dir, CID_V0, CID_V1 } from './dir.js'
import persist from './utils/persist.js'

/**
Expand All @@ -21,8 +21,8 @@ class DirFlat extends Dir {
constructor (props, options) {
super(props, options)

/** @type {{ [key: string]: InProgressImportResult | Dir }} */
this._children = {}
/** @type {Map<string, InProgressImportResult | Dir>} */
this._children = new Map()
}

/**
Expand All @@ -32,53 +32,83 @@ class DirFlat extends Dir {
async put (name, value) {
this.cid = undefined
this.size = undefined
this.nodeSize = undefined

this._children[name] = value
this._children.set(name, value)
}

/**
* @param {string} name
*/
get (name) {
return Promise.resolve(this._children[name])
return Promise.resolve(this._children.get(name))
}

childCount () {
return Object.keys(this._children).length
return this._children.size
}

directChildrenCount () {
return this.childCount()
}

onlyChild () {
return this._children[Object.keys(this._children)[0]]
return this._children.values().next().value
}

async * eachChildSeries () {
const keys = Object.keys(this._children)
for (const [key, child] of this._children.entries()) {
yield {
key,
child
}
}
}

for (let i = 0; i < keys.length; i++) {
const key = keys[i]
calculateNodeSize () {
if (this.nodeSize !== undefined) {
return this.nodeSize
}

yield {
key: key,
child: this._children[key]
const links = []

for (const [name, child] of this._children.entries()) {
let size

if (child instanceof Dir) {
size = child.calculateNodeSize()
} else {
size = child.size
}

if (child.size != null && child.cid) {
links.push({
Name: name,
Tsize: size,
Hash: this.options.cidVersion === 0 ? CID_V0 : CID_V1
})
}
}

const unixfs = new UnixFS({
type: 'directory',
mtime: this.mtime,
mode: this.mode
})

this.nodeSize = encode(prepare({ Data: unixfs.marshal(), Links: links })).length

return this.nodeSize
}

/**
* @param {Blockstore} block
* @returns {AsyncIterable<ImportResult>}
*/
async * flush (block) {
const children = Object.keys(this._children)
const links = []

for (let i = 0; i < children.length; i++) {
let child = this._children[children[i]]

for (let [name, child] of this._children.entries()) {
if (child instanceof Dir) {
for await (const entry of child.flush(block)) {
child = entry
Expand All @@ -89,7 +119,7 @@ class DirFlat extends Dir {

if (child.size != null && child.cid) {
links.push({
Name: children[i],
Name: name,
Tsize: child.size,
Hash: child.cid
})
Expand Down
91 changes: 89 additions & 2 deletions packages/ipfs-unixfs-importer/src/dir-sharded.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { encode, prepare } from '@ipld/dag-pb'
import { UnixFS } from 'ipfs-unixfs'
import Dir from './dir.js'
import { Dir, CID_V0, CID_V1 } from './dir.js'
import persist from './utils/persist.js'
import { createHAMT, Bucket } from 'hamt-sharding'

Expand Down Expand Up @@ -35,6 +35,10 @@ class DirSharded extends Dir {
* @param {InProgressImportResult | Dir} value
*/
async put (name, value) {
this.cid = undefined
this.size = undefined
this.nodeSize = undefined

await this._bucket.put(name, value)
}

Expand Down Expand Up @@ -66,6 +70,16 @@ class DirSharded extends Dir {
}
}

calculateNodeSize () {
if (this.nodeSize !== undefined) {
return this.nodeSize
}

this.nodeSize = calculateSize(this._bucket, this, this.options)

return this.nodeSize
}

/**
* @param {Blockstore} blockstore
* @returns {AsyncIterable<ImportResult>}
Expand All @@ -85,7 +99,7 @@ export default DirSharded
/**
* @param {Bucket<?>} bucket
* @param {Blockstore} blockstore
* @param {*} shardRoot
* @param {DirSharded | null} shardRoot
* @param {ImporterOptions} options
* @returns {AsyncIterable<ImportResult>}
*/
Expand Down Expand Up @@ -183,3 +197,76 @@ async function * flush (bucket, blockstore, shardRoot, options) {
size
}
}

/**
* @param {Bucket<?>} bucket
* @param {DirSharded | null} shardRoot
* @param {ImporterOptions} options
*/
function calculateSize (bucket, shardRoot, options) {
const children = bucket._children
const links = []

for (let i = 0; i < children.length; i++) {
const child = children.get(i)

if (!child) {
continue
}

const labelPrefix = i.toString(16).toUpperCase().padStart(2, '0')

if (child instanceof Bucket) {
const size = calculateSize(child, null, options)

links.push({
Name: labelPrefix,
Tsize: size,
Hash: options.cidVersion === 0 ? CID_V0 : CID_V1
})
} else if (typeof child.value.flush === 'function') {
const dir = child.value
const size = dir.nodeSize()

links.push({
Name: labelPrefix + child.key,
Tsize: size,
Hash: options.cidVersion === 0 ? CID_V0 : CID_V1
})
} else {
const value = child.value

if (!value.cid) {
continue
}

const label = labelPrefix + child.key
const size = value.size

links.push({
Name: label,
Tsize: size,
Hash: value.cid
})
}
}

// go-ipfs uses little endian, that's why we have to
// reverse the bit field before storing it
const data = Uint8Array.from(children.bitField().reverse())
const dir = new UnixFS({
type: 'hamt-sharded-directory',
data,
fanout: bucket.tableSize(),
hashType: options.hamtHashCode,
mtime: shardRoot && shardRoot.mtime,
mode: shardRoot && shardRoot.mode
})

const buffer = encode(prepare({
Data: dir.marshal(),
Links: links
}))

return buffer.length
}
Loading