diff --git a/importer/balanced/balanced_test.go b/importer/balanced/balanced_test.go new file mode 100644 index 00000000000..0e62406e663 --- /dev/null +++ b/importer/balanced/balanced_test.go @@ -0,0 +1,443 @@ +package balanced + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "io/ioutil" + mrand "math/rand" + "os" + "testing" + + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + chunk "github.com/jbenet/go-ipfs/importer/chunk" + h "github.com/jbenet/go-ipfs/importer/helpers" + merkledag "github.com/jbenet/go-ipfs/merkledag" + mdtest "github.com/jbenet/go-ipfs/merkledag/test" + pin "github.com/jbenet/go-ipfs/pin" + uio "github.com/jbenet/go-ipfs/unixfs/io" + u "github.com/jbenet/go-ipfs/util" +) + +func buildTestDag(r io.Reader, ds merkledag.DAGService, spl chunk.BlockSplitter) (*merkledag.Node, error) { + // Start the splitter + blkch := spl.Split(r) + + dbp := h.DagBuilderParams{ + Dagserv: ds, + Maxlinks: h.DefaultLinksPerBlock, + } + + return BalancedLayout(dbp.New(blkch)) +} + +//Test where calls to read are smaller than the chunk size +func TestSizeBasedSplit(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + bs := &chunk.SizeSplitter{Size: 512} + testFileConsistency(t, bs, 32*512) + bs = &chunk.SizeSplitter{Size: 4096} + testFileConsistency(t, bs, 32*4096) + + // Uneven offset + testFileConsistency(t, bs, 31*4095) +} + +func dup(b []byte) []byte { + o := make([]byte, len(b)) + copy(o, b) + return o +} + +func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) { + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, bs) + if err != nil { + t.Fatal(err) + } + + r, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + + err = arrComp(out, should) + if err != nil { + t.Fatal(err) + } +} + +func TestBuilderConsistency(t *testing.T) { + nbytes := 100000 + buf := new(bytes.Buffer) + io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes)) + should := dup(buf.Bytes()) + dagserv := mdtest.Mock(t) + nd, err := buildTestDag(buf, dagserv, chunk.DefaultSplitter) + if err != nil { + t.Fatal(err) + } + r, err := uio.NewDagReader(context.Background(), nd, dagserv) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + + err = arrComp(out, should) + if err != nil { + t.Fatal(err) + } +} + +func arrComp(a, b []byte) error { + if len(a) != len(b) { + return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b)) + } + for i, v := range a { + if v != b[i] { + return fmt.Errorf("Arrays differ at index: %d", i) + } + } + return nil +} + +func TestMaybeRabinConsistency(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + testFileConsistency(t, chunk.NewMaybeRabin(4096), 256*4096) +} + +func TestRabinBlockSize(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + buf := new(bytes.Buffer) + nbytes := 1024 * 1024 + io.CopyN(buf, rand.Reader, int64(nbytes)) + rab := chunk.NewMaybeRabin(4096) + blkch := rab.Split(buf) + + var blocks [][]byte + for b := range blkch { + blocks = append(blocks, b) + } + + fmt.Printf("Avg block size: %d\n", nbytes/len(blocks)) + +} + +type dagservAndPinner struct { + ds merkledag.DAGService + mp pin.ManualPinner +} + +func TestIndirectBlocks(t *testing.T) { + splitter := &chunk.SizeSplitter{512} + nbytes := 1024 * 1024 + buf := make([]byte, nbytes) + u.NewTimeSeededRand().Read(buf) + + read := bytes.NewReader(buf) + + ds := mdtest.Mock(t) + dag, err := buildTestDag(read, ds, splitter) + if err != nil { + t.Fatal(err) + } + + reader, err := uio.NewDagReader(context.Background(), dag, ds) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(out, buf) { + t.Fatal("Not equal!") + } +} + +func TestSeekingBasic(t *testing.T) { + nbytes := int64(10 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + start := int64(4000) + n, err := rs.Seek(start, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if n != start { + t.Fatal("Failed to seek to correct offset") + } + + out, err := ioutil.ReadAll(rs) + if err != nil { + t.Fatal(err) + } + + err = arrComp(out, should[start:]) + if err != nil { + t.Fatal(err) + } +} + +func TestSeekToBegin(t *testing.T) { + nbytes := int64(10 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + n, err := io.CopyN(ioutil.Discard, rs, 1024*4) + if err != nil { + t.Fatal(err) + } + if n != 4096 { + t.Fatal("Copy didnt copy enough bytes") + } + + seeked, err := rs.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if seeked != 0 { + t.Fatal("Failed to seek to beginning") + } + + out, err := ioutil.ReadAll(rs) + if err != nil { + t.Fatal(err) + } + + err = arrComp(out, should) + if err != nil { + t.Fatal(err) + } +} + +func TestSeekToAlmostBegin(t *testing.T) { + nbytes := int64(10 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + n, err := io.CopyN(ioutil.Discard, rs, 1024*4) + if err != nil { + t.Fatal(err) + } + if n != 4096 { + t.Fatal("Copy didnt copy enough bytes") + } + + seeked, err := rs.Seek(1, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if seeked != 1 { + t.Fatal("Failed to seek to almost beginning") + } + + out, err := ioutil.ReadAll(rs) + if err != nil { + t.Fatal(err) + } + + err = arrComp(out, should[1:]) + if err != nil { + t.Fatal(err) + } +} + +func TestSeekEnd(t *testing.T) { + nbytes := int64(50 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + seeked, err := rs.Seek(0, os.SEEK_END) + if err != nil { + t.Fatal(err) + } + if seeked != nbytes { + t.Fatal("Failed to seek to end") + } +} + +func TestSeekEndSingleBlockFile(t *testing.T) { + nbytes := int64(100) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{5000}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + seeked, err := rs.Seek(0, os.SEEK_END) + if err != nil { + t.Fatal(err) + } + if seeked != nbytes { + t.Fatal("Failed to seek to end") + } +} + +func TestSeekingStress(t *testing.T) { + nbytes := int64(1024 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{1000}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + testbuf := make([]byte, nbytes) + for i := 0; i < 50; i++ { + offset := mrand.Intn(int(nbytes)) + l := int(nbytes) - offset + n, err := rs.Seek(int64(offset), os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if n != int64(offset) { + t.Fatal("Seek failed to move to correct position") + } + + nread, err := rs.Read(testbuf[:l]) + if err != nil { + t.Fatal(err) + } + if nread != l { + t.Fatal("Failed to read enough bytes") + } + + err = arrComp(testbuf[:l], should[offset:offset+l]) + if err != nil { + t.Fatal(err) + } + } + +} + +func TestSeekingConsistency(t *testing.T) { + nbytes := int64(128 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + out := make([]byte, nbytes) + + for coff := nbytes - 4096; coff >= 0; coff -= 4096 { + t.Log(coff) + n, err := rs.Seek(coff, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if n != coff { + t.Fatal("wasnt able to seek to the right position") + } + nread, err := rs.Read(out[coff : coff+4096]) + if err != nil { + t.Fatal(err) + } + if nread != 4096 { + t.Fatal("didnt read the correct number of bytes") + } + } + + err = arrComp(out, should) + if err != nil { + t.Fatal(err) + } +} diff --git a/importer/balanced/builder.go b/importer/balanced/builder.go new file mode 100644 index 00000000000..550f669f2bc --- /dev/null +++ b/importer/balanced/builder.go @@ -0,0 +1,66 @@ +package balanced + +import ( + "errors" + + h "github.com/jbenet/go-ipfs/importer/helpers" + dag "github.com/jbenet/go-ipfs/merkledag" +) + +func BalancedLayout(db *h.DagBuilderHelper) (*dag.Node, error) { + var root *h.UnixfsNode + for level := 0; !db.Done(); level++ { + + nroot := h.NewUnixfsNode() + + // add our old root as a child of the new root. + if root != nil { // nil if it's the first node. + if err := nroot.AddChild(root, db); err != nil { + return nil, err + } + } + + // fill it up. + if err := fillNodeRec(db, nroot, level); err != nil { + return nil, err + } + + root = nroot + } + if root == nil { + root = h.NewUnixfsNode() + } + + return db.Add(root) +} + +// fillNodeRec will fill the given node with data from the dagBuilders input +// source down to an indirection depth as specified by 'depth' +// it returns the total dataSize of the node, and a potential error +// +// warning: **children** pinned indirectly, but input node IS NOT pinned. +func fillNodeRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error { + if depth < 0 { + return errors.New("attempt to fillNode at depth < 0") + } + + // Base case + if depth <= 0 { // catch accidental -1's in case error above is removed. + return db.FillNodeWithData(node) + } + + // while we have room AND we're not done + for node.NumChildren() < db.Maxlinks() && !db.Done() { + child := h.NewUnixfsNode() + + if err := fillNodeRec(db, child, depth-1); err != nil { + return err + } + + if err := node.AddChild(child, db); err != nil { + return err + } + } + + return nil +} diff --git a/importer/calc_test.go b/importer/calc_test.go deleted file mode 100644 index ef16fc85e42..00000000000 --- a/importer/calc_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package importer - -import ( - "math" - "testing" - - humanize "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/dustin/go-humanize" -) - -func TestCalculateSizes(t *testing.T) { - - // d := ((lbs/271) ^ layer) * dbs - - increments := func(a, b int) []int { - ints := []int{} - for ; a <= b; a *= 2 { - ints = append(ints, a) - } - return ints - } - - layers := 7 - roughLinkSize := roughLinkSize // from importer pkg - dataBlockSizes := increments(1<<12, 1<<18) - linkBlockSizes := increments(1<<12, 1<<14) - - t.Logf("rough link size: %d", roughLinkSize) - t.Logf("data block sizes: %v", dataBlockSizes) - t.Logf("link block sizes: %v", linkBlockSizes) - for _, dbs := range dataBlockSizes { - t.Logf("") - t.Logf("with data block size: %d", dbs) - for _, lbs := range linkBlockSizes { - t.Logf("") - t.Logf("\twith data block size: %d", dbs) - t.Logf("\twith link block size: %d", lbs) - - lpb := lbs / roughLinkSize - t.Logf("\tlinks per block: %d", lpb) - - for l := 1; l < layers; l++ { - total := int(math.Pow(float64(lpb), float64(l))) * dbs - htotal := humanize.Bytes(uint64(total)) - t.Logf("\t\t\tlayer %d: %s\t%d", l, htotal, total) - } - - } - } - -} diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go new file mode 100644 index 00000000000..f391495871a --- /dev/null +++ b/importer/helpers/dagbuilder.go @@ -0,0 +1,141 @@ +package helpers + +import ( + dag "github.com/jbenet/go-ipfs/merkledag" + "github.com/jbenet/go-ipfs/pin" +) + +// DagBuilderHelper wraps together a bunch of objects needed to +// efficiently create unixfs dag trees +type DagBuilderHelper struct { + dserv dag.DAGService + mp pin.ManualPinner + in <-chan []byte + nextData []byte // the next item to return. + maxlinks int +} + +type DagBuilderParams struct { + // Maximum number of links per intermediate node + Maxlinks int + + // DAGService to write blocks to (required) + Dagserv dag.DAGService + + // Pinner to use for pinning files (optionally nil) + Pinner pin.ManualPinner +} + +// Generate a new DagBuilderHelper from the given params, using 'in' as a +// data source +func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper { + return &DagBuilderHelper{ + dserv: dbp.Dagserv, + mp: dbp.Pinner, + in: in, + maxlinks: dbp.Maxlinks, + } +} + +// prepareNext consumes the next item from the channel and puts it +// in the nextData field. it is idempotent-- if nextData is full +// it will do nothing. +// +// i realized that building the dag becomes _a lot_ easier if we can +// "peek" the "are done yet?" (i.e. not consume it from the channel) +func (db *DagBuilderHelper) prepareNext() { + if db.in == nil { + // if our input is nil, there is "nothing to do". we're done. + // as if there was no data at all. (a sort of zero-value) + return + } + + // if we already have data waiting to be consumed, we're ready. + if db.nextData != nil { + return + } + + // if it's closed, nextData will be correctly set to nil, signaling + // that we're done consuming from the channel. + db.nextData = <-db.in +} + +// Done returns whether or not we're done consuming the incoming data. +func (db *DagBuilderHelper) Done() bool { + // ensure we have an accurate perspective on data + // as `done` this may be called before `next`. + db.prepareNext() // idempotent + return db.nextData == nil +} + +// Next returns the next chunk of data to be inserted into the dag +// if it returns nil, that signifies that the stream is at an end, and +// that the current building operation should finish +func (db *DagBuilderHelper) Next() []byte { + db.prepareNext() // idempotent + d := db.nextData + db.nextData = nil // signal we've consumed it + return d +} + +// FillNodeLayer will add datanodes as children to the give node until +// at most db.indirSize ndoes are added +// +// warning: **children** pinned indirectly, but input node IS NOT pinned. +func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error { + + // while we have room AND we're not done + for node.NumChildren() < db.maxlinks && !db.Done() { + child := NewUnixfsNode() + + if err := db.FillNodeWithData(child); err != nil { + return err + } + + if err := node.AddChild(child, db); err != nil { + return err + } + } + + return nil +} + +func (db *DagBuilderHelper) FillNodeWithData(node *UnixfsNode) error { + data := db.Next() + if data == nil { // we're done! + return nil + } + + if len(data) > BlockSizeLimit { + return ErrSizeLimitExceeded + } + + node.setData(data) + return nil +} + +func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) { + dn, err := node.GetDagNode() + if err != nil { + return nil, err + } + + key, err := db.dserv.Add(dn) + if err != nil { + return nil, err + } + + if db.mp != nil { + db.mp.PinWithMode(key, pin.Recursive) + err := db.mp.Flush() + if err != nil { + return nil, err + } + } + + return dn, nil +} + +func (db *DagBuilderHelper) Maxlinks() int { + return db.maxlinks +} diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go new file mode 100644 index 00000000000..7e4a55f2792 --- /dev/null +++ b/importer/helpers/helpers.go @@ -0,0 +1,99 @@ +package helpers + +import ( + "fmt" + + chunk "github.com/jbenet/go-ipfs/importer/chunk" + dag "github.com/jbenet/go-ipfs/merkledag" + "github.com/jbenet/go-ipfs/pin" + ft "github.com/jbenet/go-ipfs/unixfs" +) + +// BlockSizeLimit specifies the maximum size an imported block can have. +var BlockSizeLimit = 1048576 // 1 MB + +// rough estimates on expected sizes +var roughDataBlockSize = chunk.DefaultBlockSize +var roughLinkBlockSize = 1 << 13 // 8KB +var roughLinkSize = 258 + 8 + 5 // sha256 multihash + size + no name + protobuf framing + +// DefaultLinksPerBlock governs how the importer decides how many links there +// will be per block. This calculation is based on expected distributions of: +// * the expected distribution of block sizes +// * the expected distribution of link sizes +// * desired access speed +// For now, we use: +// +// var roughLinkBlockSize = 1 << 13 // 8KB +// var roughLinkSize = 288 // sha256 + framing + name +// var DefaultLinksPerBlock = (roughLinkBlockSize / roughLinkSize) +// +// See calc_test.go +var DefaultLinksPerBlock = (roughLinkBlockSize / roughLinkSize) + +// ErrSizeLimitExceeded signals that a block is larger than BlockSizeLimit. +var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded") + +// UnixfsNode is a struct created to aid in the generation +// of unixfs DAG trees +type UnixfsNode struct { + node *dag.Node + ufmt *ft.MultiBlock +} + +func NewUnixfsNode() *UnixfsNode { + return &UnixfsNode{ + node: new(dag.Node), + ufmt: new(ft.MultiBlock), + } +} + +func (n *UnixfsNode) NumChildren() int { + return n.ufmt.NumChildren() +} + +// addChild will add the given UnixfsNode as a child of the receiver. +// the passed in DagBuilderHelper is used to store the child node an +// pin it locally so it doesnt get lost +func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error { + n.ufmt.AddBlockSize(child.ufmt.FileSize()) + + childnode, err := child.GetDagNode() + if err != nil { + return err + } + + // Add a link to this node without storing a reference to the memory + // This way, we avoid nodes building up and consuming all of our RAM + err = n.node.AddNodeLinkClean("", childnode) + if err != nil { + return err + } + + childkey, err := db.dserv.Add(childnode) + if err != nil { + return err + } + + // Pin the child node indirectly + if db.mp != nil { + db.mp.PinWithMode(childkey, pin.Indirect) + } + + return nil +} + +func (n *UnixfsNode) setData(data []byte) { + n.ufmt.Data = data +} + +// getDagNode fills out the proper formatting for the unixfs node +// inside of a DAG node and returns the dag node +func (n *UnixfsNode) GetDagNode() (*dag.Node, error) { + data, err := n.ufmt.GetBytes() + if err != nil { + return nil, err + } + n.node.Data = data + return n.node, nil +} diff --git a/importer/importer.go b/importer/importer.go index f5f7ce7aefc..481609956e6 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -3,70 +3,21 @@ package importer import ( - "errors" "fmt" "io" "os" + bal "github.com/jbenet/go-ipfs/importer/balanced" "github.com/jbenet/go-ipfs/importer/chunk" + h "github.com/jbenet/go-ipfs/importer/helpers" + trickle "github.com/jbenet/go-ipfs/importer/trickle" dag "github.com/jbenet/go-ipfs/merkledag" "github.com/jbenet/go-ipfs/pin" - ft "github.com/jbenet/go-ipfs/unixfs" "github.com/jbenet/go-ipfs/util" ) var log = util.Logger("importer") -// BlockSizeLimit specifies the maximum size an imported block can have. -var BlockSizeLimit = 1048576 // 1 MB - -// rough estimates on expected sizes -var roughDataBlockSize = chunk.DefaultBlockSize -var roughLinkBlockSize = 1 << 13 // 8KB -var roughLinkSize = 258 + 8 + 5 // sha256 multihash + size + no name + protobuf framing - -// DefaultLinksPerBlock governs how the importer decides how many links there -// will be per block. This calculation is based on expected distributions of: -// * the expected distribution of block sizes -// * the expected distribution of link sizes -// * desired access speed -// For now, we use: -// -// var roughLinkBlockSize = 1 << 13 // 8KB -// var roughLinkSize = 288 // sha256 + framing + name -// var DefaultLinksPerBlock = (roughLinkBlockSize / roughLinkSize) -// -// See calc_test.go -var DefaultLinksPerBlock = (roughLinkBlockSize / roughLinkSize) - -// ErrSizeLimitExceeded signals that a block is larger than BlockSizeLimit. -var ErrSizeLimitExceeded = fmt.Errorf("object size limit exceeded") - -// IndirectBlocksCopyData governs whether indirect blocks should copy over -// data from their first child, and how much. If this is 0, indirect blocks -// have no data, only links. If this is larger, Indirect blocks will copy -// as much as (maybe less than) this many bytes. -// -// This number should be <= (BlockSizeLimit - (DefaultLinksPerBlock * LinkSize)) -// Note that it is not known here what the LinkSize is, because the hash function -// could vary wildly in size. Exercise caution when setting this option. For -// safety, it will be clipped to (BlockSizeLimit - (DefaultLinksPerBlock * 256)) -var IndirectBlockDataSize = 0 - -// this check is here to ensure the conditions on IndirectBlockDataSize hold. -// returns int because it will be used as an input to `make()` later on. if -// `int` will flip over to negative, better know here. -func defaultIndirectBlockDataSize() int { - max := BlockSizeLimit - (DefaultLinksPerBlock * 256) - if IndirectBlockDataSize < max { - max = IndirectBlockDataSize - } - if max < 0 { - return 0 - } - return max -} - // Builds a DAG from the given file, writing created blocks to disk as they are // created func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*dag.Node, error) { @@ -88,228 +39,28 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.ManualPinner) (*da return BuildDagFromReader(f, ds, mp, chunk.DefaultSplitter) } -// unixfsNode is a struct created to aid in the generation -// of unixfs DAG trees -type unixfsNode struct { - node *dag.Node - ufmt *ft.MultiBlock -} - -func newUnixfsNode() *unixfsNode { - return &unixfsNode{ - node: new(dag.Node), - ufmt: new(ft.MultiBlock), - } -} - -func (n *unixfsNode) numChildren() int { - return n.ufmt.NumChildren() -} - -// addChild will add the given unixfsNode as a child of the receiver. -// the passed in dagBuilderHelper is used to store the child node an -// pin it locally so it doesnt get lost -func (n *unixfsNode) addChild(child *unixfsNode, db *dagBuilderHelper) error { - n.ufmt.AddBlockSize(child.ufmt.FileSize()) - - childnode, err := child.getDagNode() - if err != nil { - return err - } - - // Add a link to this node without storing a reference to the memory - // This way, we avoid nodes building up and consuming all of our RAM - err = n.node.AddNodeLinkClean("", childnode) - if err != nil { - return err - } - - childkey, err := db.dserv.Add(childnode) - if err != nil { - return err - } - - // Pin the child node indirectly - if db.mp != nil { - db.mp.PinWithMode(childkey, pin.Indirect) - } - - return nil -} - -func (n *unixfsNode) setData(data []byte) { - n.ufmt.Data = data -} - -// getDagNode fills out the proper formatting for the unixfs node -// inside of a DAG node and returns the dag node -func (n *unixfsNode) getDagNode() (*dag.Node, error) { - data, err := n.ufmt.GetBytes() - if err != nil { - return nil, err - } - n.node.Data = data - return n.node, nil -} - func BuildDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) { // Start the splitter blkch := spl.Split(r) - // Create our builder helper - db := &dagBuilderHelper{ - dserv: ds, - mp: mp, - in: blkch, - maxlinks: DefaultLinksPerBlock, - indrSize: defaultIndirectBlockDataSize(), - } - - var root *unixfsNode - for level := 0; !db.done(); level++ { - - nroot := newUnixfsNode() - - // add our old root as a child of the new root. - if root != nil { // nil if it's the first node. - if err := nroot.addChild(root, db); err != nil { - return nil, err - } - } - - // fill it up. - if err := db.fillNodeRec(nroot, level); err != nil { - return nil, err - } - - root = nroot - } - if root == nil { - root = newUnixfsNode() - } - - rootnode, err := root.getDagNode() - if err != nil { - return nil, err - } - - rootkey, err := ds.Add(rootnode) - if err != nil { - return nil, err - } - - if mp != nil { - mp.PinWithMode(rootkey, pin.Recursive) - err := mp.Flush() - if err != nil { - return nil, err - } - } - - return root.getDagNode() -} - -// dagBuilderHelper wraps together a bunch of objects needed to -// efficiently create unixfs dag trees -type dagBuilderHelper struct { - dserv dag.DAGService - mp pin.ManualPinner - in <-chan []byte - nextData []byte // the next item to return. - maxlinks int - indrSize int // see IndirectBlockData -} - -// prepareNext consumes the next item from the channel and puts it -// in the nextData field. it is idempotent-- if nextData is full -// it will do nothing. -// -// i realized that building the dag becomes _a lot_ easier if we can -// "peek" the "are done yet?" (i.e. not consume it from the channel) -func (db *dagBuilderHelper) prepareNext() { - if db.in == nil { - // if our input is nil, there is "nothing to do". we're done. - // as if there was no data at all. (a sort of zero-value) - return - } - - // if we already have data waiting to be consumed, we're ready. - if db.nextData != nil { - return + dbp := h.DagBuilderParams{ + Dagserv: ds, + Maxlinks: h.DefaultLinksPerBlock, + Pinner: mp, } - // if it's closed, nextData will be correctly set to nil, signaling - // that we're done consuming from the channel. - db.nextData = <-db.in + return bal.BalancedLayout(dbp.New(blkch)) } -// done returns whether or not we're done consuming the incoming data. -func (db *dagBuilderHelper) done() bool { - // ensure we have an accurate perspective on data - // as `done` this may be called before `next`. - db.prepareNext() // idempotent - return db.nextData == nil -} - -// next returns the next chunk of data to be inserted into the dag -// if it returns nil, that signifies that the stream is at an end, and -// that the current building operation should finish -func (db *dagBuilderHelper) next() []byte { - db.prepareNext() // idempotent - d := db.nextData - db.nextData = nil // signal we've consumed it - return d -} - -// fillNodeRec will fill the given node with data from the dagBuilders input -// source down to an indirection depth as specified by 'depth' -// it returns the total dataSize of the node, and a potential error -// -// warning: **children** pinned indirectly, but input node IS NOT pinned. -func (db *dagBuilderHelper) fillNodeRec(node *unixfsNode, depth int) error { - if depth < 0 { - return errors.New("attempt to fillNode at depth < 0") - } - - // Base case - if depth <= 0 { // catch accidental -1's in case error above is removed. - return db.fillNodeWithData(node) - } - - // while we have room AND we're not done - for node.numChildren() < db.maxlinks && !db.done() { - child := newUnixfsNode() - - if err := db.fillNodeRec(child, depth-1); err != nil { - return err - } - - if err := node.addChild(child, db); err != nil { - return err - } - } - - return nil -} - -func (db *dagBuilderHelper) fillNodeWithData(node *unixfsNode) error { - data := db.next() - if data == nil { // we're done! - return nil - } +func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, mp pin.ManualPinner, spl chunk.BlockSplitter) (*dag.Node, error) { + // Start the splitter + blkch := spl.Split(r) - if len(data) > BlockSizeLimit { - return ErrSizeLimitExceeded + dbp := h.DagBuilderParams{ + Dagserv: ds, + Maxlinks: h.DefaultLinksPerBlock, + Pinner: mp, } - node.setData(data) - return nil -} - -// why is intmin not in math? -func min(a, b int) int { - if a > b { - return a - } - return b + return trickle.TrickleLayout(dbp.New(blkch)) } diff --git a/importer/importer_test.go b/importer/importer_test.go index 8a70cec9392..daaa9940db9 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -2,448 +2,116 @@ package importer import ( "bytes" - "crypto/rand" - "fmt" "io" "io/ioutil" - mrand "math/rand" - "os" "testing" - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" - dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" - bstore "github.com/jbenet/go-ipfs/blocks/blockstore" - bserv "github.com/jbenet/go-ipfs/blockservice" - offline "github.com/jbenet/go-ipfs/exchange/offline" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" chunk "github.com/jbenet/go-ipfs/importer/chunk" - merkledag "github.com/jbenet/go-ipfs/merkledag" - pin "github.com/jbenet/go-ipfs/pin" + dag "github.com/jbenet/go-ipfs/merkledag" + mdtest "github.com/jbenet/go-ipfs/merkledag/test" uio "github.com/jbenet/go-ipfs/unixfs/io" u "github.com/jbenet/go-ipfs/util" ) -//Test where calls to read are smaller than the chunk size -func TestSizeBasedSplit(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - bs := &chunk.SizeSplitter{Size: 512} - testFileConsistency(t, bs, 32*512) - bs = &chunk.SizeSplitter{Size: 4096} - testFileConsistency(t, bs, 32*4096) - - // Uneven offset - testFileConsistency(t, bs, 31*4095) -} - -func dup(b []byte) []byte { - o := make([]byte, len(b)) - copy(o, b) - return o -} - -func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) { - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) - dnp := getDagservAndPinner(t) - nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, bs) - if err != nil { - t.Fatal(err) - } - - r, err := uio.NewDagReader(context.Background(), nd, dnp.ds) - if err != nil { - t.Fatal(err) - } - - out, err := ioutil.ReadAll(r) - if err != nil { - t.Fatal(err) - } - - err = arrComp(out, should) +func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) { + ds := mdtest.Mock(t) + r := io.LimitReader(u.NewTimeSeededRand(), size) + nd, err := BuildDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize}) if err != nil { t.Fatal(err) } + return nd, ds } -func TestBuilderConsistency(t *testing.T) { - nbytes := 100000 - buf := new(bytes.Buffer) - io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes)) - should := dup(buf.Bytes()) - dagserv := merkledag.Mock(t) - nd, err := BuildDagFromReader(buf, dagserv, nil, chunk.DefaultSplitter) - if err != nil { - t.Fatal(err) - } - r, err := uio.NewDagReader(context.Background(), nd, dagserv) - if err != nil { - t.Fatal(err) - } - - out, err := ioutil.ReadAll(r) - if err != nil { - t.Fatal(err) - } - - err = arrComp(out, should) +func getTrickleDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) { + ds := mdtest.Mock(t) + r := io.LimitReader(u.NewTimeSeededRand(), size) + nd, err := BuildTrickleDagFromReader(r, ds, nil, &chunk.SizeSplitter{blksize}) if err != nil { t.Fatal(err) } + return nd, ds } -func arrComp(a, b []byte) error { - if len(a) != len(b) { - return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b)) - } - for i, v := range a { - if v != b[i] { - return fmt.Errorf("Arrays differ at index: %d", i) - } - } - return nil -} - -func TestMaybeRabinConsistency(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - testFileConsistency(t, chunk.NewMaybeRabin(4096), 256*4096) -} - -func TestRabinBlockSize(t *testing.T) { - if testing.Short() { - t.SkipNow() - } - buf := new(bytes.Buffer) - nbytes := 1024 * 1024 - io.CopyN(buf, rand.Reader, int64(nbytes)) - rab := chunk.NewMaybeRabin(4096) - blkch := rab.Split(buf) - - var blocks [][]byte - for b := range blkch { - blocks = append(blocks, b) - } - - fmt.Printf("Avg block size: %d\n", nbytes/len(blocks)) - -} - -type dagservAndPinner struct { - ds merkledag.DAGService - mp pin.ManualPinner -} - -func getDagservAndPinner(t *testing.T) dagservAndPinner { - db := dssync.MutexWrap(ds.NewMapDatastore()) - bs := bstore.NewBlockstore(db) - blockserv, err := bserv.New(bs, offline.Exchange(bs)) - if err != nil { - t.Fatal(err) - } - dserv := merkledag.NewDAGService(blockserv) - mpin := pin.NewPinner(db, dserv).GetManual() - return dagservAndPinner{ - ds: dserv, - mp: mpin, - } -} - -func TestIndirectBlocks(t *testing.T) { - splitter := &chunk.SizeSplitter{512} - nbytes := 1024 * 1024 - buf := make([]byte, nbytes) +func TestBalancedDag(t *testing.T) { + ds := mdtest.Mock(t) + buf := make([]byte, 10000) u.NewTimeSeededRand().Read(buf) + r := bytes.NewReader(buf) - read := bytes.NewReader(buf) - - dnp := getDagservAndPinner(t) - dag, err := BuildDagFromReader(read, dnp.ds, dnp.mp, splitter) + nd, err := BuildDagFromReader(r, ds, nil, chunk.DefaultSplitter) if err != nil { t.Fatal(err) } - reader, err := uio.NewDagReader(context.Background(), dag, dnp.ds) + dr, err := uio.NewDagReader(context.TODO(), nd, ds) if err != nil { t.Fatal(err) } - out, err := ioutil.ReadAll(reader) + out, err := ioutil.ReadAll(dr) if err != nil { t.Fatal(err) } if !bytes.Equal(out, buf) { - t.Fatal("Not equal!") + t.Fatal("bad read") } } -func TestSeekingBasic(t *testing.T) { - nbytes := int64(10 * 1024) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) - dnp := getDagservAndPinner(t) - nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500}) - if err != nil { - t.Fatal(err) - } - - rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds) - if err != nil { - t.Fatal(err) - } - - start := int64(4000) - n, err := rs.Seek(start, os.SEEK_SET) - if err != nil { - t.Fatal(err) - } - if n != start { - t.Fatal("Failed to seek to correct offset") - } - - out, err := ioutil.ReadAll(rs) - if err != nil { - t.Fatal(err) - } - - err = arrComp(out, should[start:]) - if err != nil { - t.Fatal(err) - } -} - -func TestSeekToBegin(t *testing.T) { - nbytes := int64(10 * 1024) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) - dnp := getDagservAndPinner(t) - nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500}) - if err != nil { - t.Fatal(err) - } - - rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds) - if err != nil { - t.Fatal(err) - } - - n, err := io.CopyN(ioutil.Discard, rs, 1024*4) - if err != nil { - t.Fatal(err) - } - if n != 4096 { - t.Fatal("Copy didnt copy enough bytes") - } - - seeked, err := rs.Seek(0, os.SEEK_SET) - if err != nil { - t.Fatal(err) - } - if seeked != 0 { - t.Fatal("Failed to seek to beginning") - } - - out, err := ioutil.ReadAll(rs) - if err != nil { - t.Fatal(err) - } +func BenchmarkBalancedReadSmallBlock(b *testing.B) { + b.StopTimer() + nbytes := int64(10000000) + nd, ds := getBalancedDag(b, nbytes, 4096) - err = arrComp(out, should) - if err != nil { - t.Fatal(err) - } + b.SetBytes(nbytes) + b.StartTimer() + runReadBench(b, nd, ds) } -func TestSeekToAlmostBegin(t *testing.T) { - nbytes := int64(10 * 1024) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) - dnp := getDagservAndPinner(t) - nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500}) - if err != nil { - t.Fatal(err) - } - - rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds) - if err != nil { - t.Fatal(err) - } - - n, err := io.CopyN(ioutil.Discard, rs, 1024*4) - if err != nil { - t.Fatal(err) - } - if n != 4096 { - t.Fatal("Copy didnt copy enough bytes") - } - - seeked, err := rs.Seek(1, os.SEEK_SET) - if err != nil { - t.Fatal(err) - } - if seeked != 1 { - t.Fatal("Failed to seek to almost beginning") - } - - out, err := ioutil.ReadAll(rs) - if err != nil { - t.Fatal(err) - } +func BenchmarkTrickleReadSmallBlock(b *testing.B) { + b.StopTimer() + nbytes := int64(10000000) + nd, ds := getTrickleDag(b, nbytes, 4096) - err = arrComp(out, should[1:]) - if err != nil { - t.Fatal(err) - } + b.SetBytes(nbytes) + b.StartTimer() + runReadBench(b, nd, ds) } -func TestSeekEnd(t *testing.T) { - nbytes := int64(50 * 1024) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) - dnp := getDagservAndPinner(t) - nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500}) - if err != nil { - t.Fatal(err) - } - - rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds) - if err != nil { - t.Fatal(err) - } +func BenchmarkBalancedReadFull(b *testing.B) { + b.StopTimer() + nbytes := int64(10000000) + nd, ds := getBalancedDag(b, nbytes, chunk.DefaultBlockSize) - seeked, err := rs.Seek(0, os.SEEK_END) - if err != nil { - t.Fatal(err) - } - if seeked != nbytes { - t.Fatal("Failed to seek to end") - } + b.SetBytes(nbytes) + b.StartTimer() + runReadBench(b, nd, ds) } -func TestSeekEndSingleBlockFile(t *testing.T) { - nbytes := int64(100) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) - dnp := getDagservAndPinner(t) - nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{5000}) - if err != nil { - t.Fatal(err) - } +func BenchmarkTrickleReadFull(b *testing.B) { + b.StopTimer() + nbytes := int64(10000000) + nd, ds := getTrickleDag(b, nbytes, chunk.DefaultBlockSize) - rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds) - if err != nil { - t.Fatal(err) - } - - seeked, err := rs.Seek(0, os.SEEK_END) - if err != nil { - t.Fatal(err) - } - if seeked != nbytes { - t.Fatal("Failed to seek to end") - } + b.SetBytes(nbytes) + b.StartTimer() + runReadBench(b, nd, ds) } -func TestSeekingStress(t *testing.T) { - nbytes := int64(1024 * 1024) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) - dnp := getDagservAndPinner(t) - nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{1000}) - if err != nil { - t.Fatal(err) - } - - rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds) - if err != nil { - t.Fatal(err) - } - - testbuf := make([]byte, nbytes) - for i := 0; i < 50; i++ { - offset := mrand.Intn(int(nbytes)) - l := int(nbytes) - offset - n, err := rs.Seek(int64(offset), os.SEEK_SET) - if err != nil { - t.Fatal(err) - } - if n != int64(offset) { - t.Fatal("Seek failed to move to correct position") - } - - nread, err := rs.Read(testbuf[:l]) +func runReadBench(b *testing.B, nd *dag.Node, ds dag.DAGService) { + for i := 0; i < b.N; i++ { + ctx, cancel := context.WithCancel(context.TODO()) + read, err := uio.NewDagReader(ctx, nd, ds) if err != nil { - t.Fatal(err) - } - if nread != l { - t.Fatal("Failed to read enough bytes") + b.Fatal(err) } - err = arrComp(testbuf[:l], should[offset:offset+l]) - if err != nil { - t.Fatal(err) + _, err = read.WriteTo(ioutil.Discard) + if err != nil && err != io.EOF { + b.Fatal(err) } - } - -} - -func TestSeekingConsistency(t *testing.T) { - nbytes := int64(128 * 1024) - should := make([]byte, nbytes) - u.NewTimeSeededRand().Read(should) - - read := bytes.NewReader(should) - dnp := getDagservAndPinner(t) - nd, err := BuildDagFromReader(read, dnp.ds, dnp.mp, &chunk.SizeSplitter{500}) - if err != nil { - t.Fatal(err) - } - - rs, err := uio.NewDagReader(context.Background(), nd, dnp.ds) - if err != nil { - t.Fatal(err) - } - - out := make([]byte, nbytes) - - for coff := nbytes - 4096; coff >= 0; coff -= 4096 { - t.Log(coff) - n, err := rs.Seek(coff, os.SEEK_SET) - if err != nil { - t.Fatal(err) - } - if n != coff { - t.Fatal("wasnt able to seek to the right position") - } - nread, err := rs.Read(out[coff : coff+4096]) - if err != nil { - t.Fatal(err) - } - if nread != 4096 { - t.Fatal("didnt read the correct number of bytes") - } - } - - err = arrComp(out, should) - if err != nil { - t.Fatal(err) + cancel() } } diff --git a/importer/trickle/trickle_test.go b/importer/trickle/trickle_test.go new file mode 100644 index 00000000000..79d728c5ea7 --- /dev/null +++ b/importer/trickle/trickle_test.go @@ -0,0 +1,443 @@ +package trickle + +import ( + "bytes" + "crypto/rand" + "fmt" + "io" + "io/ioutil" + mrand "math/rand" + "os" + "testing" + + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + chunk "github.com/jbenet/go-ipfs/importer/chunk" + h "github.com/jbenet/go-ipfs/importer/helpers" + merkledag "github.com/jbenet/go-ipfs/merkledag" + mdtest "github.com/jbenet/go-ipfs/merkledag/test" + pin "github.com/jbenet/go-ipfs/pin" + uio "github.com/jbenet/go-ipfs/unixfs/io" + u "github.com/jbenet/go-ipfs/util" +) + +func buildTestDag(r io.Reader, ds merkledag.DAGService, spl chunk.BlockSplitter) (*merkledag.Node, error) { + // Start the splitter + blkch := spl.Split(r) + + dbp := h.DagBuilderParams{ + Dagserv: ds, + Maxlinks: h.DefaultLinksPerBlock, + } + + return TrickleLayout(dbp.New(blkch)) +} + +//Test where calls to read are smaller than the chunk size +func TestSizeBasedSplit(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + bs := &chunk.SizeSplitter{Size: 512} + testFileConsistency(t, bs, 32*512) + bs = &chunk.SizeSplitter{Size: 4096} + testFileConsistency(t, bs, 32*4096) + + // Uneven offset + testFileConsistency(t, bs, 31*4095) +} + +func dup(b []byte) []byte { + o := make([]byte, len(b)) + copy(o, b) + return o +} + +func testFileConsistency(t *testing.T, bs chunk.BlockSplitter, nbytes int) { + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, bs) + if err != nil { + t.Fatal(err) + } + + r, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + + err = arrComp(out, should) + if err != nil { + t.Fatal(err) + } +} + +func TestBuilderConsistency(t *testing.T) { + nbytes := 100000 + buf := new(bytes.Buffer) + io.CopyN(buf, u.NewTimeSeededRand(), int64(nbytes)) + should := dup(buf.Bytes()) + dagserv := mdtest.Mock(t) + nd, err := buildTestDag(buf, dagserv, chunk.DefaultSplitter) + if err != nil { + t.Fatal(err) + } + r, err := uio.NewDagReader(context.Background(), nd, dagserv) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(r) + if err != nil { + t.Fatal(err) + } + + err = arrComp(out, should) + if err != nil { + t.Fatal(err) + } +} + +func arrComp(a, b []byte) error { + if len(a) != len(b) { + return fmt.Errorf("Arrays differ in length. %d != %d", len(a), len(b)) + } + for i, v := range a { + if v != b[i] { + return fmt.Errorf("Arrays differ at index: %d", i) + } + } + return nil +} + +func TestMaybeRabinConsistency(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + testFileConsistency(t, chunk.NewMaybeRabin(4096), 256*4096) +} + +func TestRabinBlockSize(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + buf := new(bytes.Buffer) + nbytes := 1024 * 1024 + io.CopyN(buf, rand.Reader, int64(nbytes)) + rab := chunk.NewMaybeRabin(4096) + blkch := rab.Split(buf) + + var blocks [][]byte + for b := range blkch { + blocks = append(blocks, b) + } + + fmt.Printf("Avg block size: %d\n", nbytes/len(blocks)) + +} + +type dagservAndPinner struct { + ds merkledag.DAGService + mp pin.ManualPinner +} + +func TestIndirectBlocks(t *testing.T) { + splitter := &chunk.SizeSplitter{512} + nbytes := 1024 * 1024 + buf := make([]byte, nbytes) + u.NewTimeSeededRand().Read(buf) + + read := bytes.NewReader(buf) + + ds := mdtest.Mock(t) + dag, err := buildTestDag(read, ds, splitter) + if err != nil { + t.Fatal(err) + } + + reader, err := uio.NewDagReader(context.Background(), dag, ds) + if err != nil { + t.Fatal(err) + } + + out, err := ioutil.ReadAll(reader) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(out, buf) { + t.Fatal("Not equal!") + } +} + +func TestSeekingBasic(t *testing.T) { + nbytes := int64(10 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + start := int64(4000) + n, err := rs.Seek(start, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if n != start { + t.Fatal("Failed to seek to correct offset") + } + + out, err := ioutil.ReadAll(rs) + if err != nil { + t.Fatal(err) + } + + err = arrComp(out, should[start:]) + if err != nil { + t.Fatal(err) + } +} + +func TestSeekToBegin(t *testing.T) { + nbytes := int64(10 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + n, err := io.CopyN(ioutil.Discard, rs, 1024*4) + if err != nil { + t.Fatal(err) + } + if n != 4096 { + t.Fatal("Copy didnt copy enough bytes") + } + + seeked, err := rs.Seek(0, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if seeked != 0 { + t.Fatal("Failed to seek to beginning") + } + + out, err := ioutil.ReadAll(rs) + if err != nil { + t.Fatal(err) + } + + err = arrComp(out, should) + if err != nil { + t.Fatal(err) + } +} + +func TestSeekToAlmostBegin(t *testing.T) { + nbytes := int64(10 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + n, err := io.CopyN(ioutil.Discard, rs, 1024*4) + if err != nil { + t.Fatal(err) + } + if n != 4096 { + t.Fatal("Copy didnt copy enough bytes") + } + + seeked, err := rs.Seek(1, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if seeked != 1 { + t.Fatal("Failed to seek to almost beginning") + } + + out, err := ioutil.ReadAll(rs) + if err != nil { + t.Fatal(err) + } + + err = arrComp(out, should[1:]) + if err != nil { + t.Fatal(err) + } +} + +func TestSeekEnd(t *testing.T) { + nbytes := int64(50 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + seeked, err := rs.Seek(0, os.SEEK_END) + if err != nil { + t.Fatal(err) + } + if seeked != nbytes { + t.Fatal("Failed to seek to end") + } +} + +func TestSeekEndSingleBlockFile(t *testing.T) { + nbytes := int64(100) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{5000}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + seeked, err := rs.Seek(0, os.SEEK_END) + if err != nil { + t.Fatal(err) + } + if seeked != nbytes { + t.Fatal("Failed to seek to end") + } +} + +func TestSeekingStress(t *testing.T) { + nbytes := int64(1024 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{1000}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + testbuf := make([]byte, nbytes) + for i := 0; i < 50; i++ { + offset := mrand.Intn(int(nbytes)) + l := int(nbytes) - offset + n, err := rs.Seek(int64(offset), os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if n != int64(offset) { + t.Fatal("Seek failed to move to correct position") + } + + nread, err := rs.Read(testbuf[:l]) + if err != nil { + t.Fatal(err) + } + if nread != l { + t.Fatal("Failed to read enough bytes") + } + + err = arrComp(testbuf[:l], should[offset:offset+l]) + if err != nil { + t.Fatal(err) + } + } + +} + +func TestSeekingConsistency(t *testing.T) { + nbytes := int64(128 * 1024) + should := make([]byte, nbytes) + u.NewTimeSeededRand().Read(should) + + read := bytes.NewReader(should) + ds := mdtest.Mock(t) + nd, err := buildTestDag(read, ds, &chunk.SizeSplitter{500}) + if err != nil { + t.Fatal(err) + } + + rs, err := uio.NewDagReader(context.Background(), nd, ds) + if err != nil { + t.Fatal(err) + } + + out := make([]byte, nbytes) + + for coff := nbytes - 4096; coff >= 0; coff -= 4096 { + t.Log(coff) + n, err := rs.Seek(coff, os.SEEK_SET) + if err != nil { + t.Fatal(err) + } + if n != coff { + t.Fatal("wasnt able to seek to the right position") + } + nread, err := rs.Read(out[coff : coff+4096]) + if err != nil { + t.Fatal(err) + } + if nread != 4096 { + t.Fatal("didnt read the correct number of bytes") + } + } + + err = arrComp(out, should) + if err != nil { + t.Fatal(err) + } +} diff --git a/importer/trickle/trickledag.go b/importer/trickle/trickledag.go new file mode 100644 index 00000000000..9577a832178 --- /dev/null +++ b/importer/trickle/trickledag.go @@ -0,0 +1,58 @@ +package trickle + +import ( + h "github.com/jbenet/go-ipfs/importer/helpers" + dag "github.com/jbenet/go-ipfs/merkledag" +) + +// layerRepeat specifies how many times to append a child tree of a +// given depth. Higher values increase the width of a given node, which +// improves seek speeds. +const layerRepeat = 4 + +func TrickleLayout(db *h.DagBuilderHelper) (*dag.Node, error) { + root := h.NewUnixfsNode() + err := db.FillNodeLayer(root) + if err != nil { + return nil, err + } + for level := 1; !db.Done(); level++ { + for i := 0; i < layerRepeat && !db.Done(); i++ { + next := h.NewUnixfsNode() + err := fillTrickleRec(db, next, level) + if err != nil { + return nil, err + } + err = root.AddChild(next, db) + if err != nil { + return nil, err + } + } + } + + return db.Add(root) +} + +func fillTrickleRec(db *h.DagBuilderHelper, node *h.UnixfsNode, depth int) error { + // Always do this, even in the base case + err := db.FillNodeLayer(node) + if err != nil { + return err + } + + for i := 1; i < depth && !db.Done(); i++ { + for j := 0; j < layerRepeat; j++ { + next := h.NewUnixfsNode() + err := fillTrickleRec(db, next, i) + if err != nil { + return err + } + + err = node.AddChild(next, db) + if err != nil { + return err + } + } + } + return nil +} diff --git a/merkledag/mock.go b/merkledag/test/utils.go similarity index 79% rename from merkledag/mock.go rename to merkledag/test/utils.go index ea3737f5807..cc373a8fdad 100644 --- a/merkledag/mock.go +++ b/merkledag/test/utils.go @@ -1,4 +1,4 @@ -package merkledag +package mdutils import ( "testing" @@ -8,13 +8,14 @@ import ( "github.com/jbenet/go-ipfs/blocks/blockstore" bsrv "github.com/jbenet/go-ipfs/blockservice" "github.com/jbenet/go-ipfs/exchange/offline" + dag "github.com/jbenet/go-ipfs/merkledag" ) -func Mock(t testing.TB) DAGService { +func Mock(t testing.TB) dag.DAGService { bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore())) bserv, err := bsrv.New(bstore, offline.Exchange(bstore)) if err != nil { t.Fatal(err) } - return NewDAGService(bserv) + return dag.NewDAGService(bserv) } diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index 7262daf684e..4d3f37b7e98 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -50,6 +50,7 @@ type ReadSeekCloser interface { io.Reader io.Seeker io.Closer + io.WriterTo } // NewDagReader creates a new reader object that reads the data represented by the given @@ -68,22 +69,26 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag case ftpb.Data_Raw: fallthrough case ftpb.Data_File: - fctx, cancel := context.WithCancel(ctx) - promises := serv.GetDAG(fctx, n) - return &DagReader{ - node: n, - serv: serv, - buf: NewRSNCFromBytes(pb.GetData()), - promises: promises, - ctx: fctx, - cancel: cancel, - pbdata: pb, - }, nil + return newDataFileReader(ctx, n, pb, serv), nil default: return nil, ft.ErrUnrecognizedType } } +func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader { + fctx, cancel := context.WithCancel(ctx) + promises := serv.GetDAG(fctx, n) + return &DagReader{ + node: n, + serv: serv, + buf: NewRSNCFromBytes(pb.GetData()), + promises: promises, + ctx: fctx, + cancel: cancel, + pbdata: pb, + } +} + // precalcNextBuf follows the next link in line and loads it from the DAGService, // setting the next buffer to read from func (dr *DagReader) precalcNextBuf() error { @@ -108,11 +113,7 @@ func (dr *DagReader) precalcNextBuf() error { // A directory should not exist within a file return ft.ErrInvalidDirLocation case ftpb.Data_File: - subr, err := NewDagReader(dr.ctx, nxt, dr.serv) - if err != nil { - return err - } - dr.buf = subr + dr.buf = newDataFileReader(dr.ctx, nxt, pb, dr.serv) return nil case ftpb.Data_Raw: dr.buf = NewRSNCFromBytes(pb.GetData()) @@ -156,6 +157,31 @@ func (dr *DagReader) Read(b []byte) (int, error) { } } +func (dr *DagReader) WriteTo(w io.Writer) (int64, error) { + // If no cached buffer, load one + total := int64(0) + for { + // Attempt to write bytes from cached buffer + n, err := dr.buf.WriteTo(w) + total += n + dr.offset += n + if err != nil { + if err != io.EOF { + return total, err + } + } + + // Otherwise, load up the next block + err = dr.precalcNextBuf() + if err != nil { + if err == io.EOF { + return total, nil + } + return total, err + } + } +} + func (dr *DagReader) Close() error { dr.cancel() return nil @@ -163,6 +189,8 @@ func (dr *DagReader) Close() error { // Seek implements io.Seeker, and will seek to a given offset in the file // interface matches standard unix seek +// TODO: check if we can do relative seeks, to reduce the amount of dagreader +// recreations that need to happen. func (dr *DagReader) Seek(offset int64, whence int) (int64, error) { switch whence { case os.SEEK_SET: