Skip to content

Commit

Permalink
Merge pull request #2953 from ipfs/feature/blocks-bloom-no-rebuild
Browse files Browse the repository at this point in the history
core: do not run bloom in case of ephemeral node
  • Loading branch information
whyrusleeping committed Jul 10, 2016
2 parents 94e35db + d9bafb6 commit 4e67003
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 17 deletions.
27 changes: 19 additions & 8 deletions blocks/blockstore/bloom_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,10 @@ import (
"sync/atomic"
)

// BloomCached returns Blockstore that caches Has requests using Bloom filter
// bloomCached returns Blockstore that caches Has requests using Bloom filter
// Size is size of bloom filter in bytes
func BloomCached(bs Blockstore, bloomSize, lruSize int) (*bloomcache, error) {
bl, err := bloom.New(float64(bloomSize), float64(7))
func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount, lruSize int) (*bloomcache, error) {
bl, err := bloom.New(float64(bloomSize), float64(hashCount))
if err != nil {
return nil, err
}
Expand All @@ -24,7 +24,7 @@ func BloomCached(bs Blockstore, bloomSize, lruSize int) (*bloomcache, error) {
}
bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc}
bc.Invalidate()
go bc.Rebuild()
go bc.Rebuild(ctx)

return bc, nil
}
Expand Down Expand Up @@ -52,8 +52,7 @@ func (b *bloomcache) BloomActive() bool {
return atomic.LoadInt32(&b.active) != 0
}

func (b *bloomcache) Rebuild() {
ctx := context.TODO()
func (b *bloomcache) Rebuild(ctx context.Context) {
evt := log.EventBegin(ctx, "bloomcache.Rebuild")
defer evt.Done()

Expand All @@ -62,8 +61,19 @@ func (b *bloomcache) Rebuild() {
log.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err)
return
}
for key := range ch {
b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better
finish := false
for !finish {
select {
case key, ok := <-ch:
if ok {
b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better
} else {
finish = true
}
case <-ctx.Done():
log.Warning("Cache rebuild closed by context finishing.")
return
}
}
close(b.rebuildChan)
atomic.StoreInt32(&b.active, 1)
Expand Down Expand Up @@ -159,6 +169,7 @@ func (b *bloomcache) PutMany(bs []blocks.Block) error {
if err == nil {
for _, block := range bs {
b.bloom.AddTS([]byte(block.Key()))
b.arc.Add(block.Key(), true)
}
}
return err
Expand Down
27 changes: 21 additions & 6 deletions blocks/blockstore/bloom_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,32 @@ import (

"github.com/ipfs/go-ipfs/blocks"

context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore"
dsq "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/query"
syncds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/sync"
)

func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) {
if ctx == nil {
ctx = context.TODO()
}
opts := DefaultCacheOpts()
bbs, err := CachedBlockstore(bs, ctx, opts)
if err == nil {
return bbs.(*bloomcache), nil
} else {
return nil, err
}
}

func TestReturnsErrorWhenSizeNegative(t *testing.T) {
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
_, err := BloomCached(bs, 100, -1)
_, err := bloomCached(bs, context.TODO(), 100, 1, -1)
if err == nil {
t.Fail()
}
_, err = BloomCached(bs, -1, 100)
_, err = bloomCached(bs, context.TODO(), -1, 1, 100)
if err == nil {
t.Fail()
}
Expand All @@ -29,7 +43,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) {
b := blocks.NewBlock([]byte("foo"))
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := BloomCached(bs, 1, 1)
cachedbs, err := testBloomCached(bs, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -53,7 +67,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) {
func TestElideDuplicateWrite(t *testing.T) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := BloomCached(bs, 1, 1)
cachedbs, err := testBloomCached(bs, nil)
if err != nil {
t.Fatal(err)
}
Expand All @@ -73,14 +87,15 @@ func TestHasIsBloomCached(t *testing.T) {
for i := 0; i < 1000; i++ {
bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i))))
}
cachedbs, err := BloomCached(bs, 256*1024, 128)
ctx, _ := context.WithTimeout(context.Background(), 1*time.Second)
cachedbs, err := testBloomCached(bs, ctx)
if err != nil {
t.Fatal(err)
}

select {
case <-cachedbs.rebuildChan:
case <-time.After(1 * time.Second):
case <-ctx.Done():
t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded())
}

Expand Down
42 changes: 42 additions & 0 deletions blocks/blockstore/caching.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package blockstore

import (
"errors"

context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
)

// Next to each option is it aproximate memory usage per unit
type CacheOpts struct {
HasBloomFilterSize int // 1 bit
HasBloomFilterHashes int // No size, 7 is usually best, consult bloom papers
HasARCCacheSize int // 32 bytes
}

func DefaultCacheOpts() CacheOpts {
return CacheOpts{
HasBloomFilterSize: 512 * 8 * 1024,
HasBloomFilterHashes: 7,
HasARCCacheSize: 64 * 1024,
}
}

func CachedBlockstore(bs GCBlockstore,
ctx context.Context, opts CacheOpts) (cbs GCBlockstore, err error) {
cbs = bs

if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 ||
opts.HasARCCacheSize < 0 {
return nil, errors.New("all options for cache need to be greater than zero")
}

if opts.HasBloomFilterSize != 0 && opts.HasBloomFilterHashes == 0 {
return nil, errors.New("bloom filter hash count can't be 0 when there is size set")
}
if opts.HasBloomFilterSize != 0 {
cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes,
opts.HasARCCacheSize)
}

return cbs, err
}
4 changes: 3 additions & 1 deletion cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,9 @@ func daemonFunc(req cmds.Request, res cmds.Response) {

// Start assembling node config
ncfg := &core.BuildCfg{
Repo: repo,
Repo: repo,
Permament: true, // It is temporary way to signify that node is permament
//TODO(Kubuxu): refactor Online vs Offline by adding Permement vs Epthemeral
}
offline, _, _ := req.Option(offlineKwd).Bool()
ncfg.Online = !offline
Expand Down
11 changes: 10 additions & 1 deletion core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ type BuildCfg struct {
// If online is set, the node will have networking enabled
Online bool

// If permament then node should run more expensive processes
// that will improve performance in long run
Permament bool

// If NilRepo is set, a repo backed by a nil datastore will be constructed
NilRepo bool

Expand Down Expand Up @@ -131,7 +135,12 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {

var err error
bs := bstore.NewBlockstore(n.Repo.Datastore())
n.Blockstore, err = bstore.BloomCached(bs, 256*1024, kSizeBlockstoreWriteCache)
opts := bstore.DefaultCacheOpts()
if !cfg.Permament {
opts.HasBloomFilterSize = 0
}

n.Blockstore, err = bstore.CachedBlockstore(bs, ctx, opts)
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion exchange/bitswap/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,8 @@ func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance
adapter := net.Adapter(p)
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay))

bstore, err := blockstore.BloomCached(blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)), bloomSize, writeCacheElems)
bstore, err := blockstore.CachedBlockstore(blockstore.NewBlockstore(
ds_sync.MutexWrap(dstore)), ctx, blockstore.DefaultCacheOpts())
if err != nil {
panic(err.Error()) // FIXME perhaps change signature and return error.
}
Expand Down

0 comments on commit 4e67003

Please sign in to comment.