Skip to content
This repository has been archived by the owner on Jun 19, 2023. It is now read-only.

Commit

Permalink
fixup! fix(arc): Per-CID locking. Map CID to lock.
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed Apr 8, 2021
1 parent 1d67be7 commit 3c80623
Showing 1 changed file with 14 additions and 11 deletions.
25 changes: 14 additions & 11 deletions arc_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type arccache struct {
arc *lru.TwoQueueCache

arcLks map[cid.Cid]*sync.Mutex
arcLksMu sync.Mutex
arcLksMu sync.RWMutex

blockstore Blockstore

Expand Down Expand Up @@ -74,12 +74,12 @@ func (b *arccache) hasCachedSync(k cid.Cid) (has bool, size int, ok bool, releas
}
}

// lock the map of cid->locks.
b.arcLksMu.Lock()
defer b.arcLksMu.Unlock()

// check if we have a lock for this content.
// read lock the map of cid->locks.
// This ensures other CID's can be locked when more than one lock/waiting is held on the same CID.
b.arcLksMu.RLock()
lk, hasLk := b.arcLks[k]
b.arcLksMu.RUnlock()
// check if a lock exists for content `k`.
if has && hasLk {
// cache and lock hit.
lk.Lock()
Expand All @@ -89,13 +89,18 @@ func (b *arccache) hasCachedSync(k cid.Cid) (has bool, size int, ok bool, releas
} else if has && !hasLk {
// cache hit and lock miss, create the lock, lock it, and add it to the lockMap
lk = new(sync.Mutex)

b.arcLksMu.Lock()
b.arcLks[k] = lk
b.arcLksMu.Unlock()

lk.Lock()
release = func() { lk.Unlock() }
} else if !has && hasLk {
// cache miss and lock hit, remove lock from map
b.arcLksMu.Lock()
delete(b.arcLks, k)
b.arcLksMu.Unlock()
}
// else cache miss and lock miss, noop
return
Expand Down Expand Up @@ -211,17 +216,15 @@ func (b *arccache) Put(bl blocks.Block) error {
}

func (b *arccache) PutMany(bs []blocks.Block) error {
// all put, get, and delete operations block on this lock, take it here to avoid lock for each key in `bs`.
b.arcLksMu.Lock()
defer b.arcLksMu.Unlock()

var good []blocks.Block
for _, block := range bs {
// call put on block if result is inconclusive or we are sure that
// the block isn't in storage
if has, _, ok := b.hasCached(block.Cid()); !ok || (ok && !has) {
has, _, ok, release := b.hasCachedSync(block.Cid())
if !ok || (ok && !has) {
good = append(good, block)
}
defer release()
}
err := b.blockstore.PutMany(good)
if err != nil {
Expand Down

0 comments on commit 3c80623

Please sign in to comment.