Skip to content

Commit

Permalink
Merge pull request #3640 from ipfs/fix/pinset-obj-explosion
Browse files Browse the repository at this point in the history
Make pinset sharding deterministic
  • Loading branch information
whyrusleeping committed Feb 12, 2017
2 parents ac2d7a0 + 728ff6d commit 4028e89
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 33 deletions.
23 changes: 5 additions & 18 deletions pin/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pin
import (
"bytes"
"context"
"crypto/rand"
"encoding/binary"
"errors"
"fmt"
Expand All @@ -26,14 +25,6 @@ const (
maxItems = 8192
)

func randomSeed() (uint32, error) {
var buf [4]byte
if _, err := rand.Read(buf[:]); err != nil {
return 0, err
}
return binary.LittleEndian.Uint32(buf[:]), nil
}

func hash(seed uint32, c *cid.Cid) uint32 {
var buf [4]byte
binary.LittleEndian.PutUint32(buf[:], seed)
Expand Down Expand Up @@ -63,11 +54,7 @@ func (s sortByHash) Swap(a, b int) {
s.links[a], s.links[b] = s.links[b], s.links[a]
}

func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
seed, err := randomSeed()
if err != nil {
return nil, err
}
func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint64, depth uint32, iter itemIterator, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
links := make([]*node.Link, 0, defaultFanout+maxItems)
for i := 0; i < defaultFanout; i++ {
links = append(links, &node.Link{Cid: emptyKey})
Expand All @@ -82,7 +69,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
hdr := &pb.Set{
Version: proto.Uint32(1),
Fanout: proto.Uint32(defaultFanout),
Seed: proto.Uint32(seed),
Seed: proto.Uint32(depth),
}
if err := writeHdr(n, hdr); err != nil {
return nil, err
Expand Down Expand Up @@ -129,7 +116,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
if !ok {
break
}
h := hash(seed, k) % defaultFanout
h := hash(depth, k) % defaultFanout
hashed[h] = append(hashed[h], k)
}

Expand All @@ -142,7 +129,7 @@ func storeItems(ctx context.Context, dag merkledag.DAGService, estimatedLen uint
childIter := getCidListIterator(items)

// recursively create a pinset from the items for this bucket index
child, err := storeItems(ctx, dag, uint64(len(items)), childIter, internalKeys)
child, err := storeItems(ctx, dag, uint64(len(items)), depth+1, childIter, internalKeys)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -296,7 +283,7 @@ func getCidListIterator(cids []*cid.Cid) itemIterator {
func storeSet(ctx context.Context, dag merkledag.DAGService, cids []*cid.Cid, internalKeys keyObserver) (*merkledag.ProtoNode, error) {
iter := getCidListIterator(cids)

n, err := storeItems(ctx, dag, uint64(len(cids)), iter, internalKeys)
n, err := storeItems(ctx, dag, uint64(len(cids)), 0, iter, internalKeys)
if err != nil {
return nil, err
}
Expand Down
65 changes: 50 additions & 15 deletions pin/set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,40 +2,75 @@ package pin

import (
"context"
"fmt"
"os"
"encoding/binary"
"testing"

blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
bserv "github.com/ipfs/go-ipfs/blockservice"
offline "github.com/ipfs/go-ipfs/exchange/offline"
dag "github.com/ipfs/go-ipfs/merkledag"
mdtest "github.com/ipfs/go-ipfs/merkledag/test"

ds "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore"
dsq "gx/ipfs/QmRWDav6mzWseLWeYfVd5fvUKiVe9xNH29YfMF438fG364/go-datastore/query"
cid "gx/ipfs/QmcTcsTvfaeEBRFo1TkFgT8sRmgi1n1LTZpecfVP8fzpGD/go-cid"
)

func ignoreCids(_ *cid.Cid) {}

func TestSet(t *testing.T) {
ds := mdtest.Mock()
limit := 10000 // 10000 reproduces the pinloss issue fairly reliably

if os.Getenv("STRESS_IT_OUT_YO") != "" {
limit = 10000000
func objCount(d ds.Datastore) int {
q := dsq.Query{KeysOnly: true}
res, err := d.Query(q)
if err != nil {
panic(err)
}
var inputs []*cid.Cid
for i := 0; i < limit; i++ {
c, err := ds.Add(dag.NodeWithData([]byte(fmt.Sprint(i))))
if err != nil {
t.Fatal(err)

var count int
for {
_, ok := res.NextSync()
if !ok {
break
}

count++
}
return count
}

func TestSet(t *testing.T) {
dst := ds.NewMapDatastore()
bstore := blockstore.NewBlockstore(dst)
ds := dag.NewDAGService(bserv.New(bstore, offline.Exchange(bstore)))

// this value triggers the creation of a recursive shard.
// If the recursive sharding is done improperly, this will result in
// an infinite recursion and crash (OOM)
limit := uint32((defaultFanout * maxItems) + 1)

var inputs []*cid.Cid
buf := make([]byte, 4)
for i := uint32(0); i < limit; i++ {
binary.BigEndian.PutUint32(buf, i)
c := dag.NewRawNode(buf).Cid()
inputs = append(inputs, c)
}

_, err := storeSet(context.Background(), ds, inputs[:len(inputs)-1], ignoreCids)
if err != nil {
t.Fatal(err)
}

objs1 := objCount(dst)

out, err := storeSet(context.Background(), ds, inputs, ignoreCids)
if err != nil {
t.Fatal(err)
}

objs2 := objCount(dst)
if objs2-objs1 > 2 {
t.Fatal("set sharding does not appear to be deterministic")
}

// weird wrapper node because loadSet expects us to pass an
// object pointing to multiple named sets
setroot := &dag.ProtoNode{}
Expand All @@ -49,7 +84,7 @@ func TestSet(t *testing.T) {
t.Fatal(err)
}

if len(outset) != limit {
if uint32(len(outset)) != limit {
t.Fatal("got wrong number", len(outset), limit)
}

Expand Down

0 comments on commit 4028e89

Please sign in to comment.