Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove chunker channel #1973

Merged
merged 2 commits into from
Dec 2, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 22 additions & 71 deletions importer/balanced/balanced_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,12 @@ import (
// TODO: extract these tests and more as a generic layout test suite

func buildTestDag(ds dag.DAGService, spl chunk.Splitter) (*dag.Node, error) {
// Start the splitter
blkch, errs := chunk.Chan(spl)

dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
}

return BalancedLayout(dbp.New(blkch, errs))
return BalancedLayout(dbp.New(spl))
}

func getTestDag(t *testing.T, ds dag.DAGService, size int64, blksize int64) (*dag.Node, []byte) {
Expand All @@ -52,13 +49,11 @@ func TestSizeBasedSplit(t *testing.T) {
t.SkipNow()
}

bs := chunk.SizeSplitterGen(512)
testFileConsistency(t, bs, 32*512)
bs = chunk.SizeSplitterGen(4096)
testFileConsistency(t, bs, 32*4096)
testFileConsistency(t, 32*512, 512)
testFileConsistency(t, 32*4096, 4096)

// Uneven offset
testFileConsistency(t, bs, 31*4095)
testFileConsistency(t, 31*4095, 4096)
}

func dup(b []byte) []byte {
Expand All @@ -67,51 +62,20 @@ func dup(b []byte) []byte {
return o
}

func testFileConsistency(t *testing.T, bs chunk.SplitterGen, nbytes int) {
should := make([]byte, nbytes)
u.NewTimeSeededRand().Read(should)

read := bytes.NewReader(should)
func testFileConsistency(t *testing.T, nbytes int64, blksize int64) {
ds := mdtest.Mock()
nd, err := buildTestDag(ds, bs(read))
if err != nil {
t.Fatal(err)
}
nd, should := getTestDag(t, ds, nbytes, blksize)

r, err := uio.NewDagReader(context.Background(), nd, ds)
if err != nil {
t.Fatal(err)
}

out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}

err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
dagrArrComp(t, r, should)
}

func TestBuilderConsistency(t *testing.T) {
dagserv := mdtest.Mock()
nd, should := getTestDag(t, dagserv, 100000, chunk.DefaultBlockSize)

r, err := uio.NewDagReader(context.Background(), nd, dagserv)
if err != nil {
t.Fatal(err)
}

out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}

err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
testFileConsistency(t, 100000, chunk.DefaultBlockSize)
}

func arrComp(a, b []byte) error {
Expand All @@ -126,6 +90,17 @@ func arrComp(a, b []byte) error {
return nil
}

func dagrArrComp(t *testing.T, r io.Reader, should []byte) {
out, err := ioutil.ReadAll(r)
if err != nil {
t.Fatal(err)
}

if err := arrComp(out, should); err != nil {
t.Fatal(err)
}
}

type dagservAndPinner struct {
ds dag.DAGService
mp pin.Pinner
Expand Down Expand Up @@ -169,15 +144,7 @@ func TestSeekingBasic(t *testing.T) {
t.Fatal("Failed to seek to correct offset")
}

out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}

err = arrComp(out, should[start:])
if err != nil {
t.Fatal(err)
}
dagrArrComp(t, rs, should[start:])
}

func TestSeekToBegin(t *testing.T) {
Expand Down Expand Up @@ -205,15 +172,7 @@ func TestSeekToBegin(t *testing.T) {
t.Fatal("Failed to seek to beginning")
}

out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}

err = arrComp(out, should)
if err != nil {
t.Fatal(err)
}
dagrArrComp(t, rs, should)
}

func TestSeekToAlmostBegin(t *testing.T) {
Expand Down Expand Up @@ -241,15 +200,7 @@ func TestSeekToAlmostBegin(t *testing.T) {
t.Fatal("Failed to seek to almost beginning")
}

out, err := ioutil.ReadAll(rs)
if err != nil {
t.Fatal(err)
}

err = arrComp(out, should[1:])
if err != nil {
t.Fatal(err)
}
dagrArrComp(t, rs, should[1:])
}

func TestSeekEnd(t *testing.T) {
Expand Down
31 changes: 10 additions & 21 deletions importer/helpers/dagbuilder.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
package helpers

import (
"github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
)

// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type DagBuilderHelper struct {
dserv dag.DAGService
in <-chan []byte
errs <-chan error
spl chunk.Splitter
recvdErr error
nextData []byte // the next item to return.
maxlinks int
Expand All @@ -24,39 +24,28 @@ type DagBuilderParams struct {
Dagserv dag.DAGService
}

// Generate a new DagBuilderHelper from the given params, using 'in' as a
// data source
func (dbp *DagBuilderParams) New(in <-chan []byte, errs <-chan error) *DagBuilderHelper {
// Generate a new DagBuilderHelper from the given params, which data source comes
// from chunks object
func (dbp *DagBuilderParams) New(spl chunk.Splitter) *DagBuilderHelper {
return &DagBuilderHelper{
dserv: dbp.Dagserv,
in: in,
errs: errs,
spl: spl,
maxlinks: dbp.Maxlinks,
batch: dbp.Dagserv.Batch(),
}
}

// prepareNext consumes the next item from the channel and puts it
// prepareNext consumes the next item from the splitter and puts it
// in the nextData field. it is idempotent-- if nextData is full
// it will do nothing.
//
// i realized that building the dag becomes _a lot_ easier if we can
// "peek" the "are done yet?" (i.e. not consume it from the channel)
func (db *DagBuilderHelper) prepareNext() {
if db.in == nil {
// if our input is nil, there is "nothing to do". we're done.
// as if there was no data at all. (a sort of zero-value)
return
}

// if we already have data waiting to be consumed, we're ready.
// if we already have data waiting to be consumed, we're ready
if db.nextData != nil {
return
}

// if it's closed, nextData will be correctly set to nil, signaling
// that we're done consuming from the channel.
db.nextData = <-db.in
// TODO: handle err (which wasn't handled either when the splitter was channeled)
db.nextData, _ = db.spl.NextBytes()
}

// Done returns whether or not we're done consuming the incoming data.
Expand Down
10 changes: 2 additions & 8 deletions importer/importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,19 @@ func BuildDagFromFile(fpath string, ds dag.DAGService) (*dag.Node, error) {
}

func BuildDagFromReader(ds dag.DAGService, spl chunk.Splitter) (*dag.Node, error) {
// Start the splitter
blkch, errch := chunk.Chan(spl)

dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
}

return bal.BalancedLayout(dbp.New(blkch, errch))
return bal.BalancedLayout(dbp.New(spl))
}

func BuildTrickleDagFromReader(ds dag.DAGService, spl chunk.Splitter) (*dag.Node, error) {
// Start the splitter
blkch, errch := chunk.Chan(spl)

dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
}

return trickle.TrickleLayout(dbp.New(blkch, errch))
return trickle.TrickleLayout(dbp.New(spl))
}
19 changes: 5 additions & 14 deletions importer/trickle/trickle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@ import (
)

func buildTestDag(ds merkledag.DAGService, spl chunk.Splitter) (*merkledag.Node, error) {
// Start the splitter
blkch, errs := chunk.Chan(spl)

dbp := h.DagBuilderParams{
Dagserv: ds,
Maxlinks: h.DefaultLinksPerBlock,
}

nd, err := TrickleLayout(dbp.New(blkch, errs))
nd, err := TrickleLayout(dbp.New(spl))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -441,10 +438,9 @@ func TestAppend(t *testing.T) {
}

r := bytes.NewReader(should[nbytes/2:])
blks, errs := chunk.Chan(chunk.NewSizeSplitter(r, 500))

ctx := context.Background()
nnode, err := TrickleAppend(ctx, nd, dbp.New(blks, errs))
nnode, err := TrickleAppend(ctx, nd, dbp.New(chunk.NewSizeSplitter(r, 500)))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -494,9 +490,8 @@ func TestMultipleAppends(t *testing.T) {

ctx := context.Background()
for i := 0; i < len(should); i++ {
blks, errs := chunk.Chan(spl(bytes.NewReader(should[i : i+1])))

nnode, err := TrickleAppend(ctx, nd, dbp.New(blks, errs))
nnode, err := TrickleAppend(ctx, nd, dbp.New(spl(bytes.NewReader(should[i:i+1]))))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -538,17 +533,13 @@ func TestAppendSingleBytesToEmpty(t *testing.T) {

spl := chunk.SizeSplitterGen(500)

blks, errs := chunk.Chan(spl(bytes.NewReader(data[:1])))

ctx := context.Background()
nnode, err := TrickleAppend(ctx, nd, dbp.New(blks, errs))
nnode, err := TrickleAppend(ctx, nd, dbp.New(spl(bytes.NewReader(data[:1]))))
if err != nil {
t.Fatal(err)
}

blks, errs = chunk.Chan(spl(bytes.NewReader(data[1:])))

nnode, err = TrickleAppend(ctx, nnode, dbp.New(blks, errs))
nnode, err = TrickleAppend(ctx, nnode, dbp.New(spl(bytes.NewReader(data[1:]))))
if err != nil {
t.Fatal(err)
}
Expand Down
10 changes: 4 additions & 6 deletions unixfs/mod/dagmodifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,7 @@ func (zr zeroReader) Read(b []byte) (int, error) {
func (dm *DagModifier) expandSparse(size int64) error {
r := io.LimitReader(zeroReader{}, size)
spl := chunk.NewSizeSplitter(r, 4096)
blks, errs := chunk.Chan(spl)
nnode, err := dm.appendData(dm.curNode, blks, errs)
nnode, err := dm.appendData(dm.curNode, spl)
if err != nil {
return err
}
Expand Down Expand Up @@ -191,8 +190,7 @@ func (dm *DagModifier) Sync() error {

// need to write past end of current dag
if !done {
blks, errs := chunk.Chan(dm.splitter(dm.wrBuf))
nd, err = dm.appendData(dm.curNode, blks, errs)
nd, err = dm.appendData(dm.curNode, dm.splitter(dm.wrBuf))
if err != nil {
return err
}
Expand Down Expand Up @@ -286,13 +284,13 @@ func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader)
}

// appendData appends the blocks from the given chan to the end of this dag
func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte, errs <-chan error) (*mdag.Node, error) {
func (dm *DagModifier) appendData(node *mdag.Node, spl chunk.Splitter) (*mdag.Node, error) {
dbp := &help.DagBuilderParams{
Dagserv: dm.dagserv,
Maxlinks: help.DefaultLinksPerBlock,
}

return trickle.TrickleAppend(dm.ctx, node, dbp.New(blks, errs))
return trickle.TrickleAppend(dm.ctx, node, dbp.New(spl))
}

// Read data from this dag starting at the current offset
Expand Down