Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ARC caching and bloom filter for blockstorage #2885

Merged
merged 4 commits into from
Jul 4, 2016
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions blocks/blockstore/bloom_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
package blockstore

import (
"github.com/ipfs/go-ipfs/blocks"
key "github.com/ipfs/go-ipfs/blocks/key"
lru "gx/ipfs/QmVYxfoJQiZijTgPNHCHgHELvQpbsJNTg6Crmc3dQkj3yy/golang-lru"
bloom "gx/ipfs/QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5/bbloom"
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore"
)

// BloomCached returns Blockstore that caches Has requests using Bloom filter
// Size is size of bloom filter in bytes
func BloomCached(bs Blockstore, bloomSize, lruSize int) (*bloomcache, error) {
bl, err := bloom.New(float64(bloomSize), float64(7))
if err != nil {
return nil, err
}
arc, err := lru.NewARC(lruSize)
if err != nil {
return nil, err
}
bc := &bloomcache{blockstore: bs, bloom: bl, arc: arc}
bc.Invalidate()
go bc.Rebuild()

return bc, nil
}

type bloomcache struct {
bloom *bloom.Bloom
active bool

arc *lru.ARCCache
// This chan is only used for testing to wait for bloom to enable
rebuildChan chan struct{}
blockstore Blockstore

// Statistics
hits uint64
misses uint64
}

func (b *bloomcache) Invalidate() {
b.rebuildChan = make(chan struct{})
b.active = false
}

func (b *bloomcache) BloomActive() bool {
return b.active
}

func (b *bloomcache) Rebuild() {
ctx := context.TODO()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should try and get a context passed in, maybe from the constructor?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like a good idea.

evt := log.EventBegin(ctx, "bloomcache.Rebuild")
defer evt.Done()

ch, err := b.blockstore.AllKeysChan(ctx)
if err != nil {
log.Errorf("AllKeysChan failed in bloomcache rebuild with: %v", err)
return
}
for key := range ch {
b.bloom.AddTS([]byte(key)) // Use binary key, the more compact the better
}
close(b.rebuildChan)
b.active = true
}

func (b *bloomcache) DeleteBlock(k key.Key) error {
if has, ok := b.hasCached(k); ok && !has {
return ErrNotFound
}

b.arc.Remove(k) // Invalidate cache before deleting.
err := b.blockstore.DeleteBlock(k)
if err == nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

switch err {
case nil:
//
case ds.ErrNotFound, ErrNotFound:
//
default:
return err
}

b.arc.Add(k, false)
} else if err == ds.ErrNotFound || err == ErrNotFound {
b.arc.Add(k, false)
return ErrNotFound
}
return err
}

// if ok == false has is inconclusive
// if ok == true then has respons to question: is it contained
func (b *bloomcache) hasCached(k key.Key) (has bool, ok bool) {
if k == "" {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what? we always have the empty key?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was there in the write_cache, I just mimicked it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh weird... i wonder what the reasoning there was

return true, true
}
if b.active {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this field needs to be locked around

Copy link
Member Author

@Kubuxu Kubuxu Jul 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it? I would prefer not to introduce locking here as it is quite slow when compared with speed of bloom.

It would only introduce race if bloom was being deactivated (it isn't in yet, but that doesn't matter) and the pointer to bloom was being changed. If we don't change the pointer and we acquire internal bloom's RWLock, we can then work on the internals of the bloom, otherwise I would have to introduce also separate locks around AddTS as missed adds are much, much worst.

I will leave a comment about that if it seems good to you.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rebuild never gets called outside of the constructor currently (which i'm not super keen on), but even the one call in the constructor will cause a race. What if methods are being called right as its being set to true? setting a boolean isnt guaranteed to be atomic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but when it is set from false to true then race is that b.active is still observed as false. This has no negative side-effects apart from call skipping the bloom filter cache check as bloom filter is just being enabled.

blr := b.bloom.HasTS([]byte(k))
if blr == false { // not contained in bloom is only conclusive answer bloom gives
return blr, true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets hardcode return false, true, just for clarity

}
}
h, ok := b.arc.Get(k)
if ok {
return h.(bool), ok
} else {
return false, ok
}
}

func (b *bloomcache) Has(k key.Key) (bool, error) {
if has, ok := b.hasCached(k); ok {
return has, nil
}

res, err := b.blockstore.Has(k)
if err == nil {
b.arc.Add(k, res)
}
return res, err
}

func (b *bloomcache) Get(k key.Key) (blocks.Block, error) {
if has, ok := b.hasCached(k); ok && !has {
return nil, ErrNotFound
}

bl, err := b.blockstore.Get(k)
if bl == nil && err == ErrNotFound {
b.arc.Add(k, false)
} else if bl != nil {
b.arc.Add(k, true)
}
return bl, err
}

func (b *bloomcache) Put(bl blocks.Block) error {
if has, ok := b.hasCached(bl.Key()); ok && has {
return nil
}

err := b.blockstore.Put(bl)
if err == nil {
b.bloom.AddTS([]byte(bl.Key()))
b.arc.Add(bl.Key(), true)
}
return err
}

func (b *bloomcache) PutMany(bs []blocks.Block) error {
var good []blocks.Block
for _, block := range bs {
if has, ok := b.hasCached(block.Key()); !ok || (ok && !has) {
good = append(good, block)
}
}
err := b.blockstore.PutMany(bs)
if err == nil {
for _, block := range bs {
b.bloom.AddTS([]byte(block.Key()))
}
}
return err
}

func (b *bloomcache) AllKeysChan(ctx context.Context) (<-chan key.Key, error) {
return b.blockstore.AllKeysChan(ctx)
}

func (b *bloomcache) GCLock() Unlocker {
return b.blockstore.(GCBlockstore).GCLock()
}

func (b *bloomcache) PinLock() Unlocker {
return b.blockstore.(GCBlockstore).PinLock()
}

func (b *bloomcache) GCRequested() bool {
return b.blockstore.(GCBlockstore).GCRequested()
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
package blockstore

import (
"testing"

"fmt"
"github.com/ipfs/go-ipfs/blocks"
ds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore"
dsq "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/query"
syncds "gx/ipfs/QmfQzVugPq1w5shWRcLWSeiHF4a2meBX7yVD8Vw7GWJM9o/go-datastore/sync"
"testing"
"time"
)

func TestReturnsErrorWhenSizeNegative(t *testing.T) {
bs := NewBlockstore(syncds.MutexWrap(ds.NewMapDatastore()))
_, err := WriteCached(bs, -1)
if err != nil {
return
_, err := BloomCached(bs, 100, -1)
if err == nil {
t.Fail()
}
_, err = BloomCached(bs, -1, 100)
if err == nil {
t.Fail()
}
t.Fail()
}

func TestRemoveCacheEntryOnDelete(t *testing.T) {
b := blocks.NewBlock([]byte("foo"))
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := WriteCached(bs, 1)
cachedbs, err := BloomCached(bs, 1, 1)
if err != nil {
t.Fatal(err)
}
Expand All @@ -43,7 +47,7 @@ func TestRemoveCacheEntryOnDelete(t *testing.T) {
func TestElideDuplicateWrite(t *testing.T) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))
cachedbs, err := WriteCached(bs, 1)
cachedbs, err := BloomCached(bs, 1, 1)
if err != nil {
t.Fatal(err)
}
Expand All @@ -56,6 +60,37 @@ func TestElideDuplicateWrite(t *testing.T) {
})
cachedbs.Put(b1)
}
func TestHasIsBloomCached(t *testing.T) {
cd := &callbackDatastore{f: func() {}, ds: ds.NewMapDatastore()}
bs := NewBlockstore(syncds.MutexWrap(cd))

for i := 0; i < 1000; i++ {
bs.Put(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i))))
}
cachedbs, err := BloomCached(bs, 256*1024, 128)
if err != nil {
t.Fatal(err)
}

select {
case <-cachedbs.rebuildChan:
case <-time.After(1 * time.Second):
t.Fatalf("Timeout wating for rebuild: %d", cachedbs.bloom.ElementsAdded())
}

cacheFails := 0
cd.SetFunc(func() {
cacheFails++
})

for i := 0; i < 1000; i++ {
cachedbs.Has(blocks.NewBlock([]byte(fmt.Sprintf("data: %d", i+2000))).Key())
}

if float64(cacheFails)/float64(1000) > float64(0.05) {
t.Fatal("Bloom filter has cache miss rate of more than 5%")
}
}

type callbackDatastore struct {
f func()
Expand Down
78 changes: 0 additions & 78 deletions blocks/blockstore/write_cache.go

This file was deleted.

2 changes: 1 addition & 1 deletion core/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {

var err error
bs := bstore.NewBlockstore(n.Repo.Datastore())
n.Blockstore, err = bstore.WriteCached(bs, kSizeBlockstoreWriteCache)
n.Blockstore, err = bstore.BloomCached(bs, 256*1024, kSizeBlockstoreWriteCache)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets make this a constant

Copy link
Member Author

@Kubuxu Kubuxu Jul 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am introducing CacheOpts in the ARC caching of whole blocks I am working on right now. Would it be ok if I added it in that PR?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion exchange/bitswap/testutils.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,12 +87,13 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
// just a much better idea.
func Session(ctx context.Context, net tn.Network, p testutil.Identity) Instance {
bsdelay := delay.Fixed(0)
const bloomSize = 512
const writeCacheElems = 100

adapter := net.Adapter(p)
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), bsdelay))

bstore, err := blockstore.WriteCached(blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)), writeCacheElems)
bstore, err := blockstore.BloomCached(blockstore.NewBlockstore(ds_sync.MutexWrap(dstore)), bloomSize, writeCacheElems)
if err != nil {
panic(err.Error()) // FIXME perhaps change signature and return error.
}
Expand Down
6 changes: 6 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@
"hash": "Qmb1DA2A9LS2wR4FFweB4uEDomFsdmnw1VLawLE1yQzudj",
"name": "base32",
"version": "0.0.0"
},
{
"author": "kubuxu",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why couldnt we use the code in blocks/bloom ?

Copy link
Member Author

@Kubuxu Kubuxu Jul 4, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I spent few hours tweaking this one around.
It has performance order of magnitude higher than others I've found.

Also original author was very smart about how he hashes and uses higher unused hashbits to expand lower bits for the multiple hash functions bloom filter requires.

"hash": "QmWQ2SJisXwcCLsUXLwYCKSfyExXjFRW2WbBH5sqCUnwX5",
"name": "bbloom",
"version": "0.0.2"
}
],
"gxVersion": "0.4.0",
Expand Down