diff --git a/blocks/blockstore/bloom_cache.go b/blocks/blockstore/bloom_cache.go new file mode 100644 index 00000000000..3e4038869a9 --- /dev/null +++ b/blocks/blockstore/bloom_cache.go @@ -0,0 +1,181 @@ +package blockstore + +import ( + "github.com/ipfs/go-ipfs/blocks" + key "github.com/ipfs/go-ipfs/blocks/key" + lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" + bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom" + context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" + ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore" + + "sync/atomic" +) + +// BloomCached returns Blockstore that caches Has requests using Bloom filter +// Size is size of bloom filter in bytes +func BloomCached(bs Blockstore, bloomSize, lruSize int) (*bloomcache, error) { + bl, err := bloom.New(float64(bloomSize), float64(7)) + if err != nil { + return nil, err + } + arc, err := lru.NewARC(lruSize) + if err != nil { + return nil, err + } + bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc} + bc.Invalidate() + go bc.Rebuild() + + return bc, nil +} + +type bloomcache struct { + bloom *bloom.Bloom + active int32 + + arc *lru.ARCCache + // This chan is only used for testing to wait for bloom to enable + rebuildChan chan struct{} + blockstore Blockstore + + // Statistics + hits uint64 + misses uint64 +} + +func (b *bloomcache) Invalidate() { + b.rebuildChan = make(chan struct{}) + atomic.StoreInt32(&b.active, 0) +} + +func (b *bloomcache) BloomActive() bool { + return atomic.LoadInt32(&b.active) != 0 +} + +func (b *bloomcache) Rebuild() { + ctx := context.TODO() + evt := log.EventBegin(ctx, "bloomcache.Rebuild") + defer evt.Done() + + ch, err := b.blockstore.AllKeysChan(ctx) + if err != nil { + log.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err) + return + } + for key := range ch { + b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better + } + close(b.rebuildChan) + atomic.StoreInt32(&b.active, 1) +} + +func (b *bloomcache) DeleteBlock(k key.Key) error { + if has, ok := b.hasCached(k); ok && !has { + return ErrNotFound + } + + b.arc.Remove(k) // Invalidate cache before deleting. + err := b.blockstore.DeleteBlock(k) + switch err { + case nil: + b.arc.Add(k, false) + case ds.ErrNotFound, ErrNotFound: + b.arc.Add(k, false) + default: + return err + } + return nil +} + +// if ok == false has is inconclusive +// if ok == true then has respons to question: is it contained +func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) { + if k == "" { + // Return cache invalid so call to blockstore + // in case of invalid key is forwarded deeper + return false, false + } + if b.BloomActive() { + blr := b.bloom.HasTS([]byte(k)) + if blr == false { // not contained in bloom is only conclusive answer bloom gives + return false, true + } + } + h, ok := b.arc.Get(k) + if ok { + return h.(bool), ok + } else { + return false, false + } +} + +func (b *bloomcache) Has(k key.Key) (bool, error) { + if has, ok := b.hasCached(k); ok { + return has, nil + } + + res, err := b.blockstore.Has(k) + if err == nil { + b.arc.Add(k, res) + } + return res, err +} + +func (b *bloomcache) Get(k key.Key) (blocks.Block, error) { + if has, ok := b.hasCached(k); ok && !has { + return nil, ErrNotFound + } + + bl, err := b.blockstore.Get(k) + if bl == nil && err == ErrNotFound { + b.arc.Add(k, false) + } else if bl != nil { + b.arc.Add(k, true) + } + return bl, err +} + +func (b *bloomcache) Put(bl blocks.Block) error { + if has, ok := b.hasCached(bl.Key()); ok && has { + return nil + } + + err := b.blockstore.Put(bl) + if err == nil { + b.bloom.AddTS([]byte(bl.Key())) + b.arc.Add(bl.Key(), true) + } + return err +} + +func (b *bloomcache) PutMany(bs []blocks.Block) error { + var good []blocks.Block + for _, block := range bs { + if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) { + good = append(good, block) + } + } + err := b.blockstore.PutMany(bs) + if err == nil { + for _, block := range bs { + b.bloom.AddTS([]byte(block.Key())) + } + } + return err +} + +func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { + return b.blockstore.AllKeysChan(ctx) +} + +func (b *bloomcache) GCLock() Unlocker { + return b.blockstore.(GCBlockstore).GCLock() +} + +func (b *bloomcache) PinLock() Unlocker { + return b.blockstore.(GCBlockstore).PinLock() +} + +func (b *bloomcache) GCRequested() bool { + return b.blockstore.(GCBlockstore).GCRequested() +} diff --git a/blocks/blockstore/write_cache_test.go b/blocks/blockstore/bloom_cache_test.go similarity index 59% rename from blocks/blockstore/write_cache_test.go rename to blocks/blockstore/bloom_cache_test.go index 966ff061094..3b8bb8b9116 100644 --- a/blocks/blockstore/write_cache_test.go +++ b/blocks/blockstore/bloom_cache_test.go @@ -1,9 +1,13 @@ package blockstore import ( + "fmt" + "sync" "testing" + "time" "github.com/ipfs/go-ipfs/blocks" + ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore" dsq "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/query" syncds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/sync" @@ -11,24 +15,30 @@ import ( func TestReturnsErrorWhenSizeNegative(t *testing.T) { bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore())) - _, err := WriteCached(bs, -1) - if err != nil { - return + _, err := BloomCached(bs, 100, -1) + if err == nil { + t.Fail() + } + _, err = BloomCached(bs, -1, 100) + if err == nil { + t.Fail() } - t.Fail() } func TestRemoveCacheEntryOnDelete(t *testing.T) { b := blocks.NewBlock([]byte("foo")) cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} bs := NewBlockstore(syncds.MutexWrap(cd)) - cachedbs, err := WriteCached(bs, 1) + cachedbs, err := BloomCached(bs, 1, 1) if err != nil { t.Fatal(err) } cachedbs.Put(b) + cd.Lock() writeHitTheDatastore := false + cd.Unlock() + cd.SetFunc(func() { writeHitTheDatastore = true }) @@ -43,7 +53,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) { func TestElideDuplicateWrite(t *testing.T) { cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} bs := NewBlockstore(syncds.MutexWrap(cd)) - cachedbs, err := WriteCached(bs, 1) + cachedbs, err := BloomCached(bs, 1, 1) if err != nil { t.Fatal(err) } @@ -56,36 +66,78 @@ func TestElideDuplicateWrite(t *testing.T) { }) cachedbs.Put(b1) } +func TestHasIsBloomCached(t *testing.T) { + cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()} + bs := NewBlockstore(syncds.MutexWrap(cd)) + + for i := 0; i < 1000; i++ { + bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i)))) + } + cachedbs, err := BloomCached(bs, 256*1024, 128) + if err != nil { + t.Fatal(err) + } + + select { + case <-cachedbs.rebuildChan: + case <-time.After(1 * time.Second): + t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded()) + } + + cacheFails := 0 + cd.SetFunc(func() { + cacheFails++ + }) + + for i := 0; i < 1000; i++ { + cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Key()) + } + + if float64(cacheFails)/float64(1000) > float64(0.05) { + t.Fatal("Bloom filter has cache miss rate of more than 5%") + } +} type callbackDatastore struct { + sync.Mutex f func() ds ds.Datastore } -func (c *callbackDatastore) SetFunc(f func()) { c.f = f } +func (c *callbackDatastore) SetFunc(f func()) { + c.Lock() + defer c.Unlock() + c.f = f +} + +func (c *callbackDatastore) CallF() { + c.Lock() + defer c.Unlock() + c.f() +} func (c *callbackDatastore) Put(key ds.Key, value interface{}) (err error) { - c.f() + c.CallF() return c.ds.Put(key, value) } func (c *callbackDatastore) Get(key ds.Key) (value interface{}, err error) { - c.f() + c.CallF() return c.ds.Get(key) } func (c *callbackDatastore) Has(key ds.Key) (exists bool, err error) { - c.f() + c.CallF() return c.ds.Has(key) } func (c *callbackDatastore) Delete(key ds.Key) (err error) { - c.f() + c.CallF() return c.ds.Delete(key) } func (c *callbackDatastore) Query(q dsq.Query) (dsq.Results, error) { - c.f() + c.CallF() return c.ds.Query(q) } diff --git a/blocks/blockstore/write_cache.go b/blocks/blockstore/write_cache.go deleted file mode 100644 index fbeee25ba78..00000000000 --- a/blocks/blockstore/write_cache.go +++ /dev/null @@ -1,78 +0,0 @@ -package blockstore - -import ( - "github.com/ipfs/go-ipfs/blocks" - key "github.com/ipfs/go-ipfs/blocks/key" - "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru" - context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" -) - -// WriteCached returns a blockstore that caches up to |size| unique writes (bs.Put). -func WriteCached(bs Blockstore, size int) (*writecache, error) { - c, err := lru.New(size) - if err != nil { - return nil, err - } - return &writecache{blockstore: bs, cache: c}, nil -} - -type writecache struct { - cache *lru.Cache // pointer b/c Cache contains a Mutex as value (complicates copying) - blockstore Blockstore -} - -func (w *writecache) DeleteBlock(k key.Key) error { - defer log.EventBegin(context.TODO(), "writecache.BlockRemoved", &k).Done() - w.cache.Remove(k) - return w.blockstore.DeleteBlock(k) -} - -func (w *writecache) Has(k key.Key) (bool, error) { - if _, ok := w.cache.Get(k); ok { - return true, nil - } - return w.blockstore.Has(k) -} - -func (w *writecache) Get(k key.Key) (blocks.Block, error) { - return w.blockstore.Get(k) -} - -func (w *writecache) Put(b blocks.Block) error { - k := b.Key() - if _, ok := w.cache.Get(k); ok { - return nil - } - defer log.EventBegin(context.TODO(), "writecache.BlockAdded", &k).Done() - - w.cache.Add(b.Key(), struct{}{}) - return w.blockstore.Put(b) -} - -func (w *writecache) PutMany(bs []blocks.Block) error { - var good []blocks.Block - for _, b := range bs { - if _, ok := w.cache.Get(b.Key()); !ok { - good = append(good, b) - k := b.Key() - defer log.EventBegin(context.TODO(), "writecache.BlockAdded", &k).Done() - } - } - return w.blockstore.PutMany(good) -} - -func (w *writecache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) { - return w.blockstore.AllKeysChan(ctx) -} - -func (w *writecache) GCLock() Unlocker { - return w.blockstore.(GCBlockstore).GCLock() -} - -func (w *writecache) PinLock() Unlocker { - return w.blockstore.(GCBlockstore).PinLock() -} - -func (w *writecache) GCRequested() bool { - return w.blockstore.(GCBlockstore).GCRequested() -} diff --git a/core/builder.go b/core/builder.go index bb2b89b0cfe..f602db1d6b1 100644 --- a/core/builder.go +++ b/core/builder.go @@ -131,7 +131,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error { var err error bs := bstore.NewBlockstore(n.Repo.Datastore()) - n.Blockstore, err = bstore.WriteCached(bs, kSizeBlockstoreWriteCache) + n.Blockstore, err = bstore.BloomCached(bs, 256*1024, kSizeBlockstoreWriteCache) if err != nil { return err } diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index 6449412ba0d..4df1d4fb61b 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -87,12 +87,13 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration { // just a much better idea. func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance { bsdelay := delay.Fixed(0) + const bloomSize = 512 const writeCacheElems = 100 adapter := net.Adapter(p) dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay)) - bstore, err := blockstore.WriteCached(blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)), writeCacheElems) + bstore, err := blockstore.BloomCached(blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)), bloomSize, writeCacheElems) if err != nil { panic(err.Error()) // FIXME perhaps change signature and return error. } diff --git a/package.json b/package.json index b15267fd0de..c0528df2c18 100644 --- a/package.json +++ b/package.json @@ -177,6 +177,12 @@ "hash": "Qmb1DA2A9LS2wR4FFweB4uEDomFsdmnw1VLawLE1yQzudj", "name": "base32", "version": "0.0.0" + }, + { + "author": "kubuxu", + "hash": "QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5", + "name": "bbloom", + "version": "0.0.2" } ], "gxVersion": "0.4.0",