Skip to content

Commit

Permalink
Merge pull request #713 from jbenet/feat/trickledag
Browse files Browse the repository at this point in the history
implement trickledag for faster unixfs operations
  • Loading branch information
whyrusleeping committed Feb 4, 2015
2 parents 1afb281 + 1e93ee0 commit adb7ad9
Show file tree
Hide file tree
Showing 11 changed files with 1,372 additions and 724 deletions.
443 changes: 443 additions & 0 deletions importer/balanced/balanced_test.go

Large diffs are not rendered by default.

66 changes: 66 additions & 0 deletions importer/balanced/builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package balanced

import (
"errors"

h "github.com/jbenet/go-ipfs/importer/helpers"
dag "github.com/jbenet/go-ipfs/merkledag"
)

func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) {
var root *h.UnixfsNode
for level := 0; !db.Done(); level++ {

nroot := h.NewUnixfsNode()

// add our old root as a child of the new root.
if root != nil { // nil if it's the first node.
if err := nroot.AddChild(root, db); err != nil {
return nil, err
}
}

// fill it up.
if err := fillNodeRec(db, nroot, level); err != nil {
return nil, err
}

root = nroot
}
if root == nil {
root = h.NewUnixfsNode()
}

return db.Add(root)
}

// fillNodeRec will fill the given node with data from the dagBuilders input
// source down to an indirection depth as specified by 'depth'
// it returns the total dataSize of the node, and a potential error
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error {
if depth < 0 {
return errors.New("attempt to fillNode at depth < 0")
}

// Base case
if depth <= 0 { // catch accidental -1's in case error above is removed.
return db.FillNodeWithData(node)
}

// while we have room AND we're not done
for node.NumChildren() < db.Maxlinks() && !db.Done() {
child := h.NewUnixfsNode()

if err := fillNodeRec(db, child, depth-1); err != nil {
return err
}

if err := node.AddChild(child, db); err != nil {
return err
}
}

return nil
}
50 changes: 0 additions & 50 deletions importer/calc_test.go

This file was deleted.

141 changes: 141 additions & 0 deletions importer/helpers/dagbuilder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package helpers

import (
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
)

// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type DagBuilderHelper struct {
dserv dag.DAGService
mp pin.ManualPinner
in <-chan []byte
nextData []byte // the next item to return.
maxlinks int
}

type DagBuilderParams struct {
// Maximum number of links per intermediate node
Maxlinks int

// DAGService to write blocks to (required)
Dagserv dag.DAGService

// Pinner to use for pinning files (optionally nil)
Pinner pin.ManualPinner
}

// Generate a new DagBuilderHelper from the given params, using 'in' as a
// data source
func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper {
return &DagBuilderHelper{
dserv: dbp.Dagserv,
mp: dbp.Pinner,
in: in,
maxlinks: dbp.Maxlinks,
}
}

// prepareNext consumes the next item from the channel and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
//
// i realized that building the dag becomes _a lot_ easier if we can
// "peek" the "are done yet?" (i.e. not consume it from the channel)
func (db *DagBuilderHelper) prepareNext() {
if db.in == nil {
// if our input is nil, there is "nothing to do". we're done.
// as if there was no data at all. (a sort of zero-value)
return
}

// if we already have data waiting to be consumed, we're ready.
if db.nextData != nil {
return
}

// if it's closed, nextData will be correctly set to nil, signaling
// that we're done consuming from the channel.
db.nextData = <-db.in
}

// Done returns whether or not we're done consuming the incoming data.
func (db *DagBuilderHelper) Done() bool {
// ensure we have an accurate perspective on data
// as `done` this may be called before `next`.
db.prepareNext() // idempotent
return db.nextData == nil
}

// Next returns the next chunk of data to be inserted into the dag
// if it returns nil, that signifies that the stream is at an end, and
// that the current building operation should finish
func (db *DagBuilderHelper) Next() []byte {
db.prepareNext() // idempotent
d := db.nextData
db.nextData = nil // signal we've consumed it
return d
}

// FillNodeLayer will add datanodes as children to the give node until
// at most db.indirSize ndoes are added
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {

// while we have room AND we're not done
for node.NumChildren() < db.maxlinks && !db.Done() {
child := NewUnixfsNode()

if err := db.FillNodeWithData(child); err != nil {
return err
}

if err := node.AddChild(child, db); err != nil {
return err
}
}

return nil
}

func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error {
data := db.Next()
if data == nil { // we're done!
return nil
}

if len(data) > BlockSizeLimit {
return ErrSizeLimitExceeded
}

node.setData(data)
return nil
}

func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) {
dn, err := node.GetDagNode()
if err != nil {
return nil, err
}

key, err := db.dserv.Add(dn)
if err != nil {
return nil, err
}

if db.mp != nil {
db.mp.PinWithMode(key, pin.Recursive)
err := db.mp.Flush()
if err != nil {
return nil, err
}
}

return dn, nil
}

func (db *DagBuilderHelper) Maxlinks() int {
return db.maxlinks
}
99 changes: 99 additions & 0 deletions importer/helpers/helpers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package helpers

import (
"fmt"

chunk "github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
)

// BlockSizeLimit specifies the maximum size an imported block can have.
var BlockSizeLimit = 1048576 // 1 MB

// rough estimates on expected sizes
var roughDataBlockSize = chunk.DefaultBlockSize
var roughLinkBlockSize = 1 << 13 // 8KB
var roughLinkSize = 258 + 8 + 5 // sha256 multihash + size + no name + protobuf framing

// DefaultLinksPerBlock governs how the importer decides how many links there
// will be per block. This calculation is based on expected distributions of:
// * the expected distribution of block sizes
// * the expected distribution of link sizes
// * desired access speed
// For now, we use:
//
// var roughLinkBlockSize = 1 << 13 // 8KB
// var roughLinkSize = 288 // sha256 + framing + name
// var DefaultLinksPerBlock = (roughLinkBlockSize / roughLinkSize)
//
// See calc_test.go
var DefaultLinksPerBlock = (roughLinkBlockSize / roughLinkSize)

// ErrSizeLimitExceeded signals that a block is larger than BlockSizeLimit.
var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded")

// UnixfsNode is a struct created to aid in the generation
// of unixfs DAG trees
type UnixfsNode struct {
node *dag.Node
ufmt *ft.MultiBlock
}

func NewUnixfsNode() *UnixfsNode {
return &UnixfsNode{
node: new(dag.Node),
ufmt: new(ft.MultiBlock),
}
}

func (n *UnixfsNode) NumChildren() int {
return n.ufmt.NumChildren()
}

// addChild will add the given UnixfsNode as a child of the receiver.
// the passed in DagBuilderHelper is used to store the child node an
// pin it locally so it doesnt get lost
func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
n.ufmt.AddBlockSize(child.ufmt.FileSize())

childnode, err := child.GetDagNode()
if err != nil {
return err
}

// Add a link to this node without storing a reference to the memory
// This way, we avoid nodes building up and consuming all of our RAM
err = n.node.AddNodeLinkClean("", childnode)
if err != nil {
return err
}

childkey, err := db.dserv.Add(childnode)
if err != nil {
return err
}

// Pin the child node indirectly
if db.mp != nil {
db.mp.PinWithMode(childkey, pin.Indirect)
}

return nil
}

func (n *UnixfsNode) setData(data []byte) {
n.ufmt.Data = data
}

// getDagNode fills out the proper formatting for the unixfs node
// inside of a DAG node and returns the dag node
func (n *UnixfsNode) GetDagNode() (*dag.Node, error) {
data, err := n.ufmt.GetBytes()
if err != nil {
return nil, err
}
n.node.Data = data
return n.node, nil
}
Loading

0 comments on commit adb7ad9

Please sign in to comment.