Skip to content

Commit

Permalink
clean up benchmarks, implement WriterTo on DAGReader, and optimize Da…
Browse files Browse the repository at this point in the history
…gReader
  • Loading branch information
whyrusleeping committed Feb 4, 2015
1 parent 414bdc7 commit 1e93ee0
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 66 deletions.
66 changes: 16 additions & 50 deletions importer/importer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,87 +65,53 @@ func BenchmarkBalancedReadSmallBlock(b *testing.B) {
nbytes := int64(10000000)
nd, ds := getBalancedDag(b, nbytes, 4096)

b.StartTimer()
for i := 0; i < b.N; i++ {
read, err := uio.NewDagReader(context.TODO(), nd, ds)
if err != nil {
b.Fatal(err)
}
n, err := io.Copy(ioutil.Discard, read)
if err != nil {
b.Fatal(err)
}
if n != nbytes {
b.Fatal("Failed to read correct amount")
}
}
b.SetBytes(nbytes)
b.StartTimer()
runReadBench(b, nd, ds)
}

func BenchmarkTrickleReadSmallBlock(b *testing.B) {
b.StopTimer()
nbytes := int64(10000000)
nd, ds := getTrickleDag(b, nbytes, 4096)

b.StartTimer()
for i := 0; i < b.N; i++ {
read, err := uio.NewDagReader(context.TODO(), nd, ds)
if err != nil {
b.Fatal(err)
}

n, err := io.Copy(ioutil.Discard, read)
if err != nil {
b.Fatal(err)
}
if n != nbytes {
b.Fatal("Failed to read correct amount")
}
}
b.SetBytes(nbytes)
b.StartTimer()
runReadBench(b, nd, ds)
}

func BenchmarkBalancedReadFull(b *testing.B) {
b.StopTimer()
nbytes := int64(10000000)
nd, ds := getBalancedDag(b, nbytes, chunk.DefaultBlockSize)

b.StartTimer()
for i := 0; i < b.N; i++ {
read, err := uio.NewDagReader(context.TODO(), nd, ds)
if err != nil {
b.Fatal(err)
}
n, err := io.Copy(ioutil.Discard, read)
if err != nil {
b.Fatal(err)
}
if n != nbytes {
b.Fatal("Failed to read correct amount")
}
}
b.SetBytes(nbytes)
b.StartTimer()
runReadBench(b, nd, ds)
}

func BenchmarkTrickleReadFull(b *testing.B) {
b.StopTimer()
nbytes := int64(10000000)
nd, ds := getTrickleDag(b, nbytes, chunk.DefaultBlockSize)

b.SetBytes(nbytes)
b.StartTimer()
runReadBench(b, nd, ds)
}

func runReadBench(b *testing.B, nd *dag.Node, ds dag.DAGService) {
for i := 0; i < b.N; i++ {
read, err := uio.NewDagReader(context.TODO(), nd, ds)
ctx, cancel := context.WithCancel(context.TODO())
read, err := uio.NewDagReader(ctx, nd, ds)
if err != nil {
b.Fatal(err)
}

n, err := io.Copy(ioutil.Discard, read)
if err != nil {
_, err = read.WriteTo(ioutil.Discard)
if err != nil && err != io.EOF {
b.Fatal(err)
}
if n != nbytes {
b.Fatal("Failed to read correct amount")
}
cancel()
}
b.SetBytes(nbytes)
}
60 changes: 44 additions & 16 deletions unixfs/io/dagreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type ReadSeekCloser interface {
io.Reader
io.Seeker
io.Closer
io.WriterTo
}

// NewDagReader creates a new reader object that reads the data represented by the given
Expand All @@ -68,22 +69,26 @@ func NewDagReader(ctx context.Context, n *mdag.Node, serv mdag.DAGService) (*Dag
case ftpb.Data_Raw:
fallthrough
case ftpb.Data_File:
fctx, cancel := context.WithCancel(ctx)
promises := serv.GetDAG(fctx, n)
return &DagReader{
node: n,
serv: serv,
buf: NewRSNCFromBytes(pb.GetData()),
promises: promises,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}, nil
return newDataFileReader(ctx, n, pb, serv), nil
default:
return nil, ft.ErrUnrecognizedType
}
}

func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv mdag.DAGService) *DagReader {
fctx, cancel := context.WithCancel(ctx)
promises := serv.GetDAG(fctx, n)
return &DagReader{
node: n,
serv: serv,
buf: NewRSNCFromBytes(pb.GetData()),
promises: promises,
ctx: fctx,
cancel: cancel,
pbdata: pb,
}
}

// precalcNextBuf follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error {
Expand All @@ -108,11 +113,7 @@ func (dr *DagReader) precalcNextBuf() error {
// A directory should not exist within a file
return ft.ErrInvalidDirLocation
case ftpb.Data_File:
subr, err := NewDagReader(dr.ctx, nxt, dr.serv)
if err != nil {
return err
}
dr.buf = subr
dr.buf = newDataFileReader(dr.ctx, nxt, pb, dr.serv)
return nil
case ftpb.Data_Raw:
dr.buf = NewRSNCFromBytes(pb.GetData())
Expand Down Expand Up @@ -156,13 +157,40 @@ func (dr *DagReader) Read(b []byte) (int, error) {
}
}

func (dr *DagReader) WriteTo(w io.Writer) (int64, error) {
// If no cached buffer, load one
total := int64(0)
for {
// Attempt to write bytes from cached buffer
n, err := dr.buf.WriteTo(w)
total += n
dr.offset += n
if err != nil {
if err != io.EOF {
return total, err
}
}

// Otherwise, load up the next block
err = dr.precalcNextBuf()
if err != nil {
if err == io.EOF {
return total, nil
}
return total, err
}
}
}

func (dr *DagReader) Close() error {
dr.cancel()
return nil
}

// Seek implements io.Seeker, and will seek to a given offset in the file
// interface matches standard unix seek
// TODO: check if we can do relative seeks, to reduce the amount of dagreader
// recreations that need to happen.
func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case os.SEEK_SET:
Expand Down

0 comments on commit 1e93ee0

Please sign in to comment.