diff --git a/importer/importer_test.go b/importer/importer_test.go index 6aa1dcf8599..daaa9940db9 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -65,21 +65,9 @@ func BenchmarkBalancedReadSmallBlock(b *testing.B) { nbytes := int64(10000000) nd, ds := getBalancedDag(b, nbytes, 4096) - b.StartTimer() - for i := 0; i < b.N; i++ { - read, err := uio.NewDagReader(context.TODO(), nd, ds) - if err != nil { - b.Fatal(err) - } - n, err := io.Copy(ioutil.Discard, read) - if err != nil { - b.Fatal(err) - } - if n != nbytes { - b.Fatal("Failed to read correct amount") - } - } b.SetBytes(nbytes) + b.StartTimer() + runReadBench(b, nd, ds) } func BenchmarkTrickleReadSmallBlock(b *testing.B) { @@ -87,22 +75,9 @@ func BenchmarkTrickleReadSmallBlock(b *testing.B) { nbytes := int64(10000000) nd, ds := getTrickleDag(b, nbytes, 4096) - b.StartTimer() - for i := 0; i < b.N; i++ { - read, err := uio.NewDagReader(context.TODO(), nd, ds) - if err != nil { - b.Fatal(err) - } - - n, err := io.Copy(ioutil.Discard, read) - if err != nil { - b.Fatal(err) - } - if n != nbytes { - b.Fatal("Failed to read correct amount") - } - } b.SetBytes(nbytes) + b.StartTimer() + runReadBench(b, nd, ds) } func BenchmarkBalancedReadFull(b *testing.B) { @@ -110,21 +85,9 @@ func BenchmarkBalancedReadFull(b *testing.B) { nbytes := int64(10000000) nd, ds := getBalancedDag(b, nbytes, chunk.DefaultBlockSize) - b.StartTimer() - for i := 0; i < b.N; i++ { - read, err := uio.NewDagReader(context.TODO(), nd, ds) - if err != nil { - b.Fatal(err) - } - n, err := io.Copy(ioutil.Discard, read) - if err != nil { - b.Fatal(err) - } - if n != nbytes { - b.Fatal("Failed to read correct amount") - } - } b.SetBytes(nbytes) + b.StartTimer() + runReadBench(b, nd, ds) } func BenchmarkTrickleReadFull(b *testing.B) { @@ -132,20 +95,23 @@ func BenchmarkTrickleReadFull(b *testing.B) { nbytes := int64(10000000) nd, ds := getTrickleDag(b, nbytes, chunk.DefaultBlockSize) + b.SetBytes(nbytes) b.StartTimer() + runReadBench(b, nd, ds) +} + +func runReadBench(b *testing.B, nd *dag.Node, ds dag.DAGService) { for i := 0; i < b.N; i++ { - read, err := uio.NewDagReader(context.TODO(), nd, ds) + ctx, cancel := context.WithCancel(context.TODO()) + read, err := uio.NewDagReader(ctx, nd, ds) if err != nil { b.Fatal(err) } - n, err := io.Copy(ioutil.Discard, read) - if err != nil { + _, err = read.WriteTo(ioutil.Discard) + if err != nil && err != io.EOF { b.Fatal(err) } - if n != nbytes { - b.Fatal("Failed to read correct amount") - } + cancel() } - b.SetBytes(nbytes) } diff --git a/unixfs/io/dagreader.go b/unixfs/io/dagreader.go index 7262daf684e..4d3f37b7e98 100644 --- a/unixfs/io/dagreader.go +++ b/unixfs/io/dagreader.go @@ -50,6 +50,7 @@ type ReadSeekCloser interface { io.Reader io.Seeker io.Closer + io.WriterTo } // NewDagReader creates a new reader object that reads the data represented by the given @@ -68,22 +69,26 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag case ftpb.Data_Raw: fallthrough case ftpb.Data_File: - fctx, cancel := context.WithCancel(ctx) - promises := serv.GetDAG(fctx, n) - return &DagReader{ - node: n, - serv: serv, - buf: NewRSNCFromBytes(pb.GetData()), - promises: promises, - ctx: fctx, - cancel: cancel, - pbdata: pb, - }, nil + return newDataFileReader(ctx, n, pb, serv), nil default: return nil, ft.ErrUnrecognizedType } } +func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader { + fctx, cancel := context.WithCancel(ctx) + promises := serv.GetDAG(fctx, n) + return &DagReader{ + node: n, + serv: serv, + buf: NewRSNCFromBytes(pb.GetData()), + promises: promises, + ctx: fctx, + cancel: cancel, + pbdata: pb, + } +} + // precalcNextBuf follows the next link in line and loads it from the DAGService, // setting the next buffer to read from func (dr *DagReader) precalcNextBuf() error { @@ -108,11 +113,7 @@ func (dr *DagReader) precalcNextBuf() error { // A directory should not exist within a file return ft.ErrInvalidDirLocation case ftpb.Data_File: - subr, err := NewDagReader(dr.ctx, nxt, dr.serv) - if err != nil { - return err - } - dr.buf = subr + dr.buf = newDataFileReader(dr.ctx, nxt, pb, dr.serv) return nil case ftpb.Data_Raw: dr.buf = NewRSNCFromBytes(pb.GetData()) @@ -156,6 +157,31 @@ func (dr *DagReader) Read(b []byte) (int, error) { } } +func (dr *DagReader) WriteTo(w io.Writer) (int64, error) { + // If no cached buffer, load one + total := int64(0) + for { + // Attempt to write bytes from cached buffer + n, err := dr.buf.WriteTo(w) + total += n + dr.offset += n + if err != nil { + if err != io.EOF { + return total, err + } + } + + // Otherwise, load up the next block + err = dr.precalcNextBuf() + if err != nil { + if err == io.EOF { + return total, nil + } + return total, err + } + } +} + func (dr *DagReader) Close() error { dr.cancel() return nil @@ -163,6 +189,8 @@ func (dr *DagReader) Close() error { // Seek implements io.Seeker, and will seek to a given offset in the file // interface matches standard unix seek +// TODO: check if we can do relative seeks, to reduce the amount of dagreader +// recreations that need to happen. func (dr *DagReader) Seek(offset int64, whence int) (int64, error) { switch whence { case os.SEEK_SET: