Skip to content

Commit

Permalink
fix view waiting issues with the WaitGroup
Browse files Browse the repository at this point in the history
    
We can add after Wait is called, which is problematic with WaitGroups.
This instead uses a mx/cond combo and waits while the count is > 0.
The only downside is that we might needlessly wait for (a bunch) of views
that started while the txn is active, but we can live with that.
  • Loading branch information
vyzo committed Jul 13, 2021
1 parent 04abd19 commit 257423e
Showing 1 changed file with 40 additions and 21 deletions.
61 changes: 40 additions & 21 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,15 @@ type SplitStore struct {
debug *debugLog

// transactional protection for concurrent read/writes during compaction
txnLk sync.RWMutex
txnActive bool
txnViews sync.WaitGroup
txnProtect MarkSet
txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{}
txnMissing map[cid.Cid]struct{}
txnLk sync.RWMutex
txnViewsMx sync.Mutex
txnViewsCond sync.Cond
txnViews int
txnActive bool
txnProtect MarkSet
txnRefsMx sync.Mutex
txnRefs map[cid.Cid]struct{}
txnMissing map[cid.Cid]struct{}
}

var _ bstore.Blockstore = (*SplitStore)(nil)
Expand Down Expand Up @@ -199,6 +201,7 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
coldPurgeSize: defaultColdPurgeSize,
}

ss.txnViewsCond.L = &ss.txnViewsMx
ss.ctx, ss.cancel = context.WithCancel(context.Background())

if enableDebugLog {
Expand Down Expand Up @@ -444,10 +447,8 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
// view can't have its data pointer deleted, which would be catastrophic.
// Note that we can't just RLock for the duration of the view, as this could
// lead to deadlock with recursive views.
wg := s.protectView(cid)
if wg != nil {
defer wg.Done()
}
s.protectView(cid)
defer s.viewDone()

err := s.hot.View(cid, cb)
switch err {
Expand Down Expand Up @@ -594,15 +595,15 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {

if epoch-s.baseEpoch > CompactionThreshold {
// it's time to compact -- prepare the transaction and go!
wg := s.beginTxnProtect()
s.beginTxnProtect()
go func() {
defer atomic.StoreInt32(&s.compacting, 0)
defer s.endTxnProtect()

log.Info("compacting splitstore")
start := time.Now()

s.compact(curTs, wg)
s.compact(curTs)

log.Infow("compaction done", "took", time.Since(start))
}()
Expand Down Expand Up @@ -632,16 +633,36 @@ func (s *SplitStore) protectTipSets(apply []*types.TipSet) {
}

// transactionally protect a view
func (s *SplitStore) protectView(c cid.Cid) *sync.WaitGroup {
func (s *SplitStore) protectView(c cid.Cid) {
s.txnLk.RLock()
defer s.txnLk.RUnlock()

s.txnViews.Add(1)
if s.txnActive {
s.trackTxnRef(c)
}

return &s.txnViews
s.txnViewsMx.Lock()
s.txnViews++
s.txnViewsMx.Unlock()
}

func (s *SplitStore) viewDone() {
s.txnViewsMx.Lock()
defer s.txnViewsMx.Unlock()

s.txnViews--
if s.txnViews == 0 {
s.txnViewsCond.Signal()
}
}

func (s *SplitStore) viewWait() {
s.txnViewsMx.Lock()
defer s.txnViewsMx.Unlock()

for s.txnViews > 0 {
s.txnViewsCond.Wait()
}
}

// transactionally protect a reference to an object
Expand Down Expand Up @@ -933,10 +954,10 @@ func (s *SplitStore) doWarmup(curTs *types.TipSet) error {
// - We sort cold objects heaviest first, so as to never delete the consituents of a DAG before the DAG itself (which would leave dangling references)
// - We delete in small batches taking a lock; each batch is checked again for marks, from the concurrent transactional mark, so as to never delete anything live
// - We then end the transaction and compact/gc the hotstore.
func (s *SplitStore) compact(curTs *types.TipSet, wg *sync.WaitGroup) {
func (s *SplitStore) compact(curTs *types.TipSet) {
log.Info("waiting for active views to complete")
start := time.Now()
wg.Wait()
s.viewWait()
log.Infow("waiting for active views done", "took", time.Since(start))

start = time.Now()
Expand Down Expand Up @@ -1126,7 +1147,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return nil
}

func (s *SplitStore) beginTxnProtect() *sync.WaitGroup {
func (s *SplitStore) beginTxnProtect() {
log.Info("preparing compaction transaction")

s.txnLk.Lock()
Expand All @@ -1135,8 +1156,6 @@ func (s *SplitStore) beginTxnProtect() *sync.WaitGroup {
s.txnActive = true
s.txnRefs = make(map[cid.Cid]struct{})
s.txnMissing = make(map[cid.Cid]struct{})

return &s.txnViews
}

func (s *SplitStore) beginTxnMarking(markSet MarkSet) {
Expand Down

0 comments on commit 257423e

Please sign in to comment.