From 3035aa893543a0c1b72d3bc17f2d9c251544537f Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Mon, 4 Jul 2016 20:08:18 +0200 Subject: [PATCH 1/7] blocks/blockstore: add CacheOpts - structure of cache config License: MIT Signed-off-by: Jakub Sztandera --- blocks/blockstore/caching.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) create mode 100644 blocks/blockstore/caching.go diff --git a/blocks/blockstore/caching.go b/blocks/blockstore/caching.go new file mode 100644 index 00000000000..5657059d89b --- /dev/null +++ b/blocks/blockstore/caching.go @@ -0,0 +1,16 @@ +package blockstore + +// Next to each option is it aproximate memory usage per unit +type CacheOpts struct { + HasBloomFilterSize int // 1 bit + HasBloomFilterHashes int // No size, 7 is usually best, consult bloom papers + HasARCCacheSize int // 32 bytes +} + +func DefaultCacheOpts() CacheOpts { + return CacheOpts{ + HasBloomFilterSize: 512 * 8 * 1024, + HasBloomFilterHashes: 7, + HasARCCacheSize: 64 * 1024, + } +} From 016d3d97ef5ef7963b320278fea19c593b4d6cf4 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Mon, 4 Jul 2016 23:02:29 +0200 Subject: [PATCH 2/7] blocks/blockstore: introduce context passing to blockstore License: MIT Signed-off-by: Jakub Sztandera --- blocks/blockstore/bloom_cache.go | 26 ++++++++++++++++++-------- blocks/blockstore/bloom_cache_test.go | 24 ++++++++++++++++++------ blocks/blockstore/caching.go | 26 ++++++++++++++++++++++++++ core/builder.go | 2 +- exchange/bitswap/testutils.go | 3 ++- 5 files changed, 65 insertions(+), 16 deletions(-) diff --git a/blocks/blockstore/bloom_cache.go b/blocks/blockstore/bloom_cache.go index 3e4038869a9..d9f93b3e855 100644 --- a/blocks/blockstore/bloom_cache.go +++ b/blocks/blockstore/bloom_cache.go @@ -11,10 +11,10 @@ import ( "sync/atomic" ) -// BloomCached returns Blockstore that caches Has requests using Bloom filter +// bloomCached returns Blockstore that caches Has requests using Bloom filter // Size is size of bloom filter in bytes -func BloomCached(bs Blockstore, bloomSize, lruSize int) (*bloomcache, error) { - bl, err := bloom.New(float64(bloomSize), float64(7)) +func bloomCached(bs Blockstore, ctx context.Context, bloomSize, hashCount, lruSize int) (*bloomcache, error) { + bl, err := bloom.New(float64(bloomSize), float64(hashCount)) if err != nil { return nil, err } @@ -24,7 +24,7 @@ func BloomCached(bs Blockstore, bloomSize, lruSize int) (*bloomcache, error) { } bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc} bc.Invalidate() - go bc.Rebuild() + go bc.Rebuild(ctx) return bc, nil } @@ -52,8 +52,7 @@ func (b *bloomcache) BloomActive() bool { return atomic.LoadInt32(&b.active) != 0 } -func (b *bloomcache) Rebuild() { - ctx := context.TODO() +func (b *bloomcache) Rebuild(ctx context.Context) { evt := log.EventBegin(ctx, "bloomcache.Rebuild") defer evt.Done() @@ -62,8 +61,19 @@ func (b *bloomcache) Rebuild() { log.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err) return } - for key := range ch { - b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better + finish := false + for !finish { + select { + case key, ok := <-ch: + if ok { + b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better + } else { + finish = true + } + case <-ctx.Done(): + log.Warning("Cache rebuild closed by context finishing.") + return + } } close(b.rebuildChan) atomic.StoreInt32(&b.active, 1) diff --git a/blocks/blockstore/bloom_cache_test.go b/blocks/blockstore/bloom_cache_test.go index 3b8bb8b9116..768bca750f3 100644 --- a/blocks/blockstore/bloom_cache_test.go +++ b/blocks/blockstore/bloom_cache_test.go @@ -8,18 +8,29 @@ import ( "github.com/ipfs/go-ipfs/blocks" + context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore" dsq "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/query" syncds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/sync" ) +func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) { + opts := DefaultCacheOpts() + bbs, err := CachedBlockstore(bs, ctx, opts) + if err == nil { + return bbs.(*bloomcache), nil + } else { + return nil, err + } +} + func TestReturnsErrorWhenSizeNegative(t *testing.T) { bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) - _, err := BloomCached(bs, 100, -1) + _, err := bloomCached(bs, nil, 100, 1, -1) if err == nil { t.Fail() } - _, err = BloomCached(bs, -1, 100) + _, err = bloomCached(bs, nil, -1, 1, 100) if err == nil { t.Fail() } @@ -29,7 +40,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) { b := blocks.NewBlock([]byte("foo")) cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} bs := NewBlockstore(syncds.MutexWrap(cd)) - cachedbs, err := BloomCached(bs, 1, 1) + cachedbs, err := testBloomCached(bs, nil) if err != nil { t.Fatal(err) } @@ -53,7 +64,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) { func TestElideDuplicateWrite(t *testing.T) { cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} bs := NewBlockstore(syncds.MutexWrap(cd)) - cachedbs, err := BloomCached(bs, 1, 1) + cachedbs, err := testBloomCached(bs, nil) if err != nil { t.Fatal(err) } @@ -73,14 +84,15 @@ func TestHasIsBloomCached(t *testing.T) { for i := 0; i < 1000; i++ { bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i)))) } - cachedbs, err := BloomCached(bs, 256*1024, 128) + ctx, _ := context.WithTimeout(context.Background(), 1*time.Second) + cachedbs, err := testBloomCached(bs, ctx) if err != nil { t.Fatal(err) } select { case <-cachedbs.rebuildChan: - case <-time.After(1 * time.Second): + case <-ctx.Done(): t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded()) } diff --git a/blocks/blockstore/caching.go b/blocks/blockstore/caching.go index 5657059d89b..b1121f6b178 100644 --- a/blocks/blockstore/caching.go +++ b/blocks/blockstore/caching.go @@ -1,5 +1,11 @@ package blockstore +import ( + "errors" + + context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" +) + // Next to each option is it aproximate memory usage per unit type CacheOpts struct { HasBloomFilterSize int // 1 bit @@ -14,3 +20,23 @@ func DefaultCacheOpts() CacheOpts { HasARCCacheSize: 64 * 1024, } } + +func CachedBlockstore(bs GCBlockstore, + ctx context.Context, opts CacheOpts) (cbs GCBlockstore, err error) { + if ctx == nil { + ctx = context.TODO() // For tests + } + + if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 || + opts.HasARCCacheSize < 0 { + return nil, errors.New("all options for cache need to be greater than zero") + } + + if opts.HasBloomFilterSize != 0 && opts.HasBloomFilterHashes == 0 { + return nil, errors.New("bloom filter hash count can't be 0 when there is size set") + } + cbs, err = bloomCached(bs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes, + opts.HasARCCacheSize) + + return cbs, err +} diff --git a/core/builder.go b/core/builder.go index f602db1d6b1..6095e2c2487 100644 --- a/core/builder.go +++ b/core/builder.go @@ -131,7 +131,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { var err error bs := bstore.NewBlockstore(n.Repo.Datastore()) - n.Blockstore, err = bstore.BloomCached(bs, 256*1024, kSizeBlockstoreWriteCache) + n.Blockstore, err = bstore.CachedBlockstore(bs, ctx, bstore.DefaultCacheOpts()) if err != nil { return err } diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index d42278e710c..a4be8d06fa3 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -93,7 +93,8 @@ func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance adapter := net.Adapter(p) dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay)) - bstore, err := blockstore.BloomCached(blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)), bloomSize, writeCacheElems) + bstore, err := blockstore.CachedBlockstore(blockstore.NewBlockstore( + ds_sync.MutexWrap(dstore)), ctx, blockstore.DefaultCacheOpts()) if err != nil { panic(err.Error()) // FIXME perhaps change signature and return error. } From 61a3d127e715af7d06d359fba117f340c3c46086 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 5 Jul 2016 15:17:52 +0200 Subject: [PATCH 3/7] blocks/blockstore: improve logic a bit License: MIT Signed-off-by: Jakub Sztandera --- blocks/blockstore/caching.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/blocks/blockstore/caching.go b/blocks/blockstore/caching.go index b1121f6b178..3decc9e8d3e 100644 --- a/blocks/blockstore/caching.go +++ b/blocks/blockstore/caching.go @@ -26,6 +26,7 @@ func CachedBlockstore(bs GCBlockstore, if ctx == nil { ctx = context.TODO() // For tests } + cbs = bs if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 || opts.HasARCCacheSize < 0 { @@ -35,8 +36,10 @@ func CachedBlockstore(bs GCBlockstore, if opts.HasBloomFilterSize != 0 && opts.HasBloomFilterHashes == 0 { return nil, errors.New("bloom filter hash count can't be 0 when there is size set") } - cbs, err = bloomCached(bs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes, - opts.HasARCCacheSize) + if opts.HasBloomFilterSize != 0 { + cbs, err = bloomCached(cbs, ctx, opts.HasBloomFilterSize, opts.HasBloomFilterHashes, + opts.HasARCCacheSize) + } return cbs, err } From e85a39c61092dad7c294a209585b3ddd83efd387 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Fri, 8 Jul 2016 19:15:14 +0200 Subject: [PATCH 4/7] core: do not run bloom filter if in a temoporary node mode License: MIT Signed-off-by: Jakub Sztandera --- cmd/ipfs/daemon.go | 3 ++- core/builder.go | 11 ++++++++++- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 5eea1595b82..6ef0b1def45 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -229,7 +229,8 @@ func daemonFunc(req cmds.Request, res cmds.Response) { // Start assembling node config ncfg := &core.BuildCfg{ - Repo: repo, + Repo: repo, + Permament: true, } offline, _, _ := req.Option(offlineKwd).Bool() ncfg.Online = !offline diff --git a/core/builder.go b/core/builder.go index 6095e2c2487..62e9319b947 100644 --- a/core/builder.go +++ b/core/builder.go @@ -27,6 +27,10 @@ type BuildCfg struct { // If online is set, the node will have networking enabled Online bool + // If permament then node should run more expensive processes + // that will improve performance in long run + Permament bool + // If NilRepo is set, a repo backed by a nil datastore will be constructed NilRepo bool @@ -131,7 +135,12 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { var err error bs := bstore.NewBlockstore(n.Repo.Datastore()) - n.Blockstore, err = bstore.CachedBlockstore(bs, ctx, bstore.DefaultCacheOpts()) + opts := bstore.DefaultCacheOpts() + if !cfg.Permament { + opts.HasBloomFilterSize = 0 + } + + n.Blockstore, err = bstore.CachedBlockstore(bs, ctx, opts) if err != nil { return err } From af7778213c6200229a80bc1610499d4ba3685f73 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Tue, 5 Jul 2016 16:59:23 +0200 Subject: [PATCH 5/7] block/blockstore: bloomcache PutMany logic was not adding to ARC License: MIT Signed-off-by: Jakub Sztandera --- blocks/blockstore/bloom_cache.go | 1 + 1 file changed, 1 insertion(+) diff --git a/blocks/blockstore/bloom_cache.go b/blocks/blockstore/bloom_cache.go index d9f93b3e855..1a4c57ac47e 100644 --- a/blocks/blockstore/bloom_cache.go +++ b/blocks/blockstore/bloom_cache.go @@ -169,6 +169,7 @@ func (b *bloomcache) PutMany(bs []blocks.Block) error { if err == nil { for _, block := range bs { b.bloom.AddTS([]byte(block.Key())) + b.arc.Add(block.Key(), true) } } return err From 58526b25b52275faf29fba4f035a1de6a73187dc Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Sun, 10 Jul 2016 15:16:42 +0200 Subject: [PATCH 6/7] blocks/blockstore: shift insertion of TODO context to tests License: MIT Signed-off-by: Jakub Sztandera --- blocks/blockstore/bloom_cache_test.go | 7 +++++-- blocks/blockstore/caching.go | 3 --- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/blocks/blockstore/bloom_cache_test.go b/blocks/blockstore/bloom_cache_test.go index 768bca750f3..bddcee56c69 100644 --- a/blocks/blockstore/bloom_cache_test.go +++ b/blocks/blockstore/bloom_cache_test.go @@ -15,6 +15,9 @@ import ( ) func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) { + if ctx == nil { + ctx = context.TODO() + } opts := DefaultCacheOpts() bbs, err := CachedBlockstore(bs, ctx, opts) if err == nil { @@ -26,11 +29,11 @@ func testBloomCached(bs GCBlockstore, ctx context.Context) (*bloomcache, error) func TestReturnsErrorWhenSizeNegative(t *testing.T) { bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) - _, err := bloomCached(bs, nil, 100, 1, -1) + _, err := bloomCached(bs, context.TODO(), 100, 1, -1) if err == nil { t.Fail() } - _, err = bloomCached(bs, nil, -1, 1, 100) + _, err = bloomCached(bs, context.TODO(), -1, 1, 100) if err == nil { t.Fail() } diff --git a/blocks/blockstore/caching.go b/blocks/blockstore/caching.go index 3decc9e8d3e..689a9b5fc12 100644 --- a/blocks/blockstore/caching.go +++ b/blocks/blockstore/caching.go @@ -23,9 +23,6 @@ func DefaultCacheOpts() CacheOpts { func CachedBlockstore(bs GCBlockstore, ctx context.Context, opts CacheOpts) (cbs GCBlockstore, err error) { - if ctx == nil { - ctx = context.TODO() // For tests - } cbs = bs if opts.HasBloomFilterSize < 0 || opts.HasBloomFilterHashes < 0 || From d9bafb688d77adfc371f8fee03248b6905f5d268 Mon Sep 17 00:00:00 2001 From: Jakub Sztandera Date: Sun, 10 Jul 2016 15:19:28 +0200 Subject: [PATCH 7/7] cmd/ipfs: add note about the plan for refactor License: MIT Signed-off-by: Jakub Sztandera --- cmd/ipfs/daemon.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 6ef0b1def45..52264715fb4 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -230,7 +230,8 @@ func daemonFunc(req cmds.Request, res cmds.Response) { // Start assembling node config ncfg := &core.BuildCfg{ Repo: repo, - Permament: true, + Permament: true, // It is temporary way to signify that node is permament + //TODO(Kubuxu): refactor Online vs Offline by adding Permement vs Epthemeral } offline, _, _ := req.Option(offlineKwd).Bool() ncfg.Online = !offline