Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

fix(arc): Per-CID locking. Map CID to lock. #66

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 101 additions & 7 deletions arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package blockstore

import (
"context"
"sync"

lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
Expand All @@ -17,7 +18,11 @@ type cacheSize int
// size. This provides block access-time improvements, allowing
// to short-cut many searches without querying the underlying datastore.
type arccache struct {
cache *lru.TwoQueueCache
cache *lru.TwoQueueCache

arcLks map[cid.Cid]*sync.Mutex
arcLksMu sync.RWMutex

blockstore Blockstore
viewer Viewer

Expand All @@ -33,7 +38,8 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
if err != nil {
return nil, err
}
c := &arccache{cache: cache, blockstore: bs}
arcLks := make(map[cid.Cid]*sync.Mutex)
c := &arccache{cache: cache, arcLks: arcLks, blockstore: bs}
c.hits = metrics.NewCtx(ctx, "arc.hits_total", "Number of ARC cache hits").Counter()
c.total = metrics.NewCtx(ctx, "arc_total", "Total number of ARC cache requests").Counter()
if v, ok := bs.(Viewer); ok {
Expand All @@ -43,7 +49,9 @@ func newARCCachedBS(ctx context.Context, bs Blockstore, lruSize int) (*arccache,
}

func (b *arccache) DeleteBlock(k cid.Cid) error {
if has, _, ok := b.queryCache(k); ok && !has {
has, _, ok, release := b.queryCacheSync(k)
defer release()
if ok && !has {
return nil
}

Expand All @@ -68,7 +76,9 @@ func (b *arccache) Has(k cid.Cid) (bool, error) {
}

func (b *arccache) GetSize(k cid.Cid) (int, error) {
if has, blockSize, ok := b.queryCache(k); ok {
has, blockSize, ok, release := b.queryCacheSync(k)
defer release()
if ok {
if !has {
// don't have it, return
return -1, ErrNotFound
Expand Down Expand Up @@ -119,7 +129,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
return nil, ErrNotFound
}

if has, _, ok := b.queryCache(k); ok && !has {
has, _, ok, release := b.queryCacheSync(k)
defer release()
if ok && !has {
return nil, ErrNotFound
}

Expand All @@ -133,7 +145,9 @@ func (b *arccache) Get(k cid.Cid) (blocks.Block, error) {
}

func (b *arccache) Put(bl blocks.Block) error {
if has, _, ok := b.queryCache(bl.Cid()); ok && has {
has, _, ok, release := b.queryCacheSync(bl.Cid())
defer release()
if ok && has {
return nil
}

Expand All @@ -146,13 +160,22 @@ func (b *arccache) Put(bl blocks.Block) error {

func (b *arccache) PutMany(bs []blocks.Block) error {
var good []blocks.Block
var releases []func()
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage
if has, _, ok := b.queryCache(block.Cid()); !ok || (ok && !has) {
if has, _, ok, release := b.queryCacheSync(block.Cid()); !ok || (ok && !has) {
good = append(good, block)
releases = append(releases, release)
}
}

defer func() {
for _, release := range releases {
release()
}
}()

err := b.blockstore.PutMany(good)
if err != nil {
return err
Expand Down Expand Up @@ -208,6 +231,77 @@ func (b *arccache) queryCache(k cid.Cid) (exists bool, size int, ok bool) {
return false, -1, false
}

// queryCacheSync checks if the CID is in the cache. If so, it returns:
//
// * exists (bool): whether the CID is known to exist or not.
// * size (int): the size if cached, or -1 if not cached.
// * ok (bool): whether present in the cache.
// * release (func): method to be called by caller that releases lock held on `k`
//
// When ok is false, the answer in inconclusive and the caller must ignore the
// other two return values. Querying the underying store is necessary.
//
// When ok is true, exists carries the correct answer, and size carries the
// size, if known, or -1 if not.
func (b *arccache) queryCacheSync(k cid.Cid) (exists bool, size int, ok bool, release func()) {
exists = false
size = -1
ok = false
release = func() {}

b.total.Inc()
if !k.Defined() {
log.Error("undefined cid in arccache")
// Return cache invalid so the call to blockstore happens
// in case of invalid key and correct error is created.
return
}

h, ok := b.cache.Get(string(k.Hash()))
if ok {
b.hits.Inc()
switch h := h.(type) {
case cacheHave:
exists = bool(h)
size = -1
ok = true
case cacheSize:
exists = true
size = int(h)
ok = true
}
}
// read lock the map of cid->locks.
// This ensures other CID's can be locked when more than one lock/waiting is held on the same CID.
b.arcLksMu.RLock()
lk, hasLk := b.arcLks[k]
b.arcLksMu.RUnlock()
// check if a lock exists for content `k`.
if exists && hasLk {
// cache and lock hit.
lk.Lock()
release = func() { lk.Unlock() }
return
} else if exists && !hasLk {
// cache hit and lock miss, create the lock, lock it, and add it to the lockMap
lk = new(sync.Mutex)

b.arcLksMu.Lock()
b.arcLks[k] = lk
b.arcLksMu.Unlock()

lk.Lock()
release = func() { lk.Unlock() }
} else if !exists && hasLk {
// cache miss and lock hit, remove lock from map
b.arcLksMu.Lock()
delete(b.arcLks, k)
b.arcLksMu.Unlock()
}
// else cache miss and lock miss, noop
return
}

func (b *arccache) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return b.blockstore.AllKeysChan(ctx)
}
Expand Down