Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement a faster dag structure for unixfs #687

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 102 additions & 0 deletions importer/builddag_fast.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
package importer

import (
"io"

"github.com/jbenet/go-ipfs/importer/chunk"
dag "github.com/jbenet/go-ipfs/merkledag"
"github.com/jbenet/go-ipfs/pin"
ft "github.com/jbenet/go-ipfs/unixfs"
u "github.com/jbenet/go-ipfs/util"
)

func BuildFastDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) {
// Start the splitter
blkch := spl.Split(r)

// Create our builder helper
db := &dagBuilderHelper{
dserv: ds,
mp: mp,
in: blkch,
maxlinks: DefaultLinksPerBlock,
indrSize: defaultIndirectBlockDataSize(),
}

root := newUnixfsNode()

for level := 1; !db.done(); level++ {
rotate(db, root)
err := db.fillStreamNodeRec(root, level)
if err != nil {
return nil, err
}
}

rootnode, err := root.getDagNode()
if err != nil {
return nil, err
}

rootkey, err := ds.Add(rootnode)
if err != nil {
return nil, err
}

if mp != nil {
mp.PinWithMode(rootkey, pin.Recursive)
err := mp.Flush()
if err != nil {
return nil, err
}
}

return root.getDagNode()
}

// rotate performs a recursive tree rotation down the leftmost child
// this attains a moderately balanced tree while retaining node odering
// postconditions: The given node will have a single child, and a
// correctly adjusted MultiBlock object
func rotate(db *dagBuilderHelper, node *unixfsNode) error {
// base case
if node.numChildren() == 0 {
return nil
}

// Grab our links
links := node.node.Links

// Get our first child (have to reload for modification)
leftnode, err := db.dserv.Get(u.Key(links[0].Hash))
if err != nil {
return err
}

lfsnode, err := unixfsNodeFromDagNode(leftnode)
if err != nil {
return err
}

// rotate our first child tree
err = rotate(db, lfsnode)
if err != nil {
return err
}

// Add all our children to our left child
for i, l := range links[1:] {
lfsnode.node.Links = append(lfsnode.node.Links, l)
lfsnode.ufmt.AddBlockSize(node.ufmt.GetBlocksizes()[i])
}

// Reset our current node, and re-add our lone child
node.ufmt = new(ft.MultiBlock)
node.node.Links = nil
err = node.addChild(lfsnode, db)
if err != nil {
return err
}

return nil
}
43 changes: 43 additions & 0 deletions importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,17 @@ func newUnixfsNode() *unixfsNode {
}
}

func unixfsNodeFromDagNode(nd *dag.Node) (*unixfsNode, error) {
un := new(unixfsNode)
un.node = nd
ufmt, err := ft.BytesToMultiblock(nd.Data)
if err != nil {
return nil, err
}
un.ufmt = ufmt
return un, nil
}

func (n *unixfsNode) numChildren() int {
return n.ufmt.NumChildren()
}
Expand Down Expand Up @@ -306,6 +317,38 @@ func (db *dagBuilderHelper) fillNodeWithData(node *unixfsNode) error {
return nil
}

func (db *dagBuilderHelper) fillStreamNodeRec(node *unixfsNode, depth int) error {
if depth < 0 {
return errors.New("attempt to fillNode at depth < 0")
}

// Base case
if depth <= 0 { // catch accidental -1's in case error above is removed.
return db.fillNodeWithData(node)
}

// Store data in the link nodes to lower latency reads
err := db.fillNodeWithData(node)
if err != nil {
return err
}

// while we have room AND we're not done
for node.numChildren() < db.maxlinks && !db.done() {
child := newUnixfsNode()

if err := db.fillStreamNodeRec(child, depth-1); err != nil {
return err
}

if err := node.addChild(child, db); err != nil {
return err
}
}

return nil
}

// why is intmin not in math?
func min(a, b int) int {
if a > b {
Expand Down
26 changes: 26 additions & 0 deletions importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,32 @@ func TestBuilderConsistency(t *testing.T) {
}
}

func TestFastBuilderConsistency(t *testing.T) {
nbytes := 2000000
buf := new(bytes.Buffer)
io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes))
should := dup(buf.Bytes())
dagserv := merkledag.Mock(t)
nd, err := BuildDagFromReader(buf, dagserv, nil, &chunk.SizeSplitter{500})
if err != nil {
t.Fatal(err)
}
r, err := uio.NewDagReader(context.Background(), nd, dagserv)
if err != nil {
t.Fatal(err)
}

out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}

err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
}

func arrComp(a, b []byte) error {
if len(a) != len(b) {
return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b))
Expand Down
20 changes: 20 additions & 0 deletions unixfs/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ type MultiBlock struct {
subtotal uint64
}

func BytesToMultiblock(b []byte) (*MultiBlock, error) {
pbn := new(pb.Data)
err := proto.Unmarshal(b, pbn)
if err != nil {
return nil, err
}

mb := new(MultiBlock)
mb.Data = pbn.Data
mb.blocksizes = pbn.Blocksizes
for _, s := range mb.blocksizes {
mb.subtotal += s
}
return mb, nil
}

func (mb *MultiBlock) AddBlockSize(s uint64) {
mb.subtotal += s
mb.blocksizes = append(mb.blocksizes, s)
Expand All @@ -119,6 +135,10 @@ func (mb *MultiBlock) GetBytes() ([]byte, error) {
return proto.Marshal(pbn)
}

func (mb *MultiBlock) GetBlocksizes() []uint64 {
return mb.GetBlocksizes()
}

func (mb *MultiBlock) FileSize() uint64 {
return uint64(len(mb.Data)) + mb.subtotal
}
Expand Down