Skip to content

Commit

Permalink
signal chain in and out of sync to compaction workers
Browse files Browse the repository at this point in the history
  • Loading branch information
ZenGround0 committed Mar 20, 2023
1 parent 43da108 commit 4561a0e
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 14 deletions.
11 changes: 11 additions & 0 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ type SplitStore struct {
ctx context.Context
cancel func()

outOfSync int32 // for fast checking
chainSyncMx sync.RWMutex
chainSyncCond sync.Cond
chainSyncFinished bool // protected by chainSyncMx

debug *debugLog

// transactional protection for concurrent read/writes during compaction
Expand Down Expand Up @@ -261,6 +266,7 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co

ss.txnViewsCond.L = &ss.txnViewsMx
ss.txnSyncCond.L = &ss.txnSyncMx
ss.chainSyncCond.L = &ss.chainSyncMx
ss.ctx, ss.cancel = context.WithCancel(context.Background())

ss.reifyCond.L = &ss.reifyMx
Expand Down Expand Up @@ -822,6 +828,11 @@ func (s *SplitStore) Close() error {
s.txnSyncCond.Broadcast()
s.txnSyncMx.Unlock()

s.chainSyncMx.Lock()
s.chainSyncFinished = true
s.chainSyncCond.Broadcast()
s.chainSyncMx.Unlock()

log.Warn("close with ongoing compaction in progress; waiting for it to finish...")
for atomic.LoadInt32(&s.compacting) == 1 {
time.Sleep(time.Second)
Expand Down
75 changes: 61 additions & 14 deletions blockstore/splitstore/splitstore_compact.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,35 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {
// Regardless, we put a mutex in HeadChange just to be safe

if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
// we are currently compacting -- protect the new tipset(s)
// we are currently compacting
// 1. Signal sync condition to yield compaction when out of sync and resume when in sync
timestamp := time.Unix(int64(curTs.MinTimestamp()), 0)
if CheckSyncGap && time.Since(timestamp) > SyncGapTime {
/* Chain out of sync */
if atomic.CompareAndSwapInt32(&s.outOfSync, 0, 1) {
// transition from in sync to out of sync
s.chainSyncMx.Lock()
s.chainSyncFinished = false
s.chainSyncMx.Unlock()
}
// already out of sync, no signaling necessary

}
// TODO: ok to use hysteresis with no transitions between 30s and 1m?
if time.Since(timestamp) < SyncWaitTime {
/* Chain in sync */
if atomic.CompareAndSwapInt32(&s.outOfSync, 0, 0) {
// already in sync, no signaling necessary
} else {
// transition from out of sync to in sync
s.chainSyncMx.Lock()
s.chainSyncFinished = true
s.chainSyncCond.Broadcast()
s.chainSyncMx.Unlock()
}

}
// 2. protect the new tipset(s)
s.protectTipSets(apply)
return nil
}
Expand Down Expand Up @@ -427,7 +455,7 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
// transactionally protect a reference by walking the object and marking.
// concurrent markings are short circuited by checking the markset.
func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) (int64, error) {
if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return 0, err
}

Expand Down Expand Up @@ -545,7 +573,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
defer coldSet.Close() //nolint:errcheck

if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand Down Expand Up @@ -617,7 +645,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {

log.Infow("marking done", "took", time.Since(startMark), "marked", *count)

if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand All @@ -627,7 +655,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
return xerrors.Errorf("error protecting transactional refs: %w", err)
}

if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand Down Expand Up @@ -704,7 +732,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(hotCnt))
stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(coldCnt))

if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand All @@ -713,7 +741,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
// possibly delete objects we didn't have when we were collecting cold objects)
s.waitForMissingRefs(markSet)

if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand All @@ -733,7 +761,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
log.Infow("moving done", "took", time.Since(startMove))

if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand Down Expand Up @@ -764,7 +792,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}

// wait for the head to catch up so that the current tipset is marked
s.waitForSync()
s.waitForTxnSync()

if err := s.checkClosing(); err != nil {
return err
Expand Down Expand Up @@ -865,7 +893,7 @@ func (s *SplitStore) beginCriticalSection(markSet MarkSet) error {
return nil
}

func (s *SplitStore) waitForSync() {
func (s *SplitStore) waitForTxnSync() {
log.Info("waiting for sync")
if !CheckSyncGap {
log.Warnf("If you see this outside of test it is a serious splitstore issue")
Expand All @@ -884,6 +912,25 @@ func (s *SplitStore) waitForSync() {
}
}

// Block compaction operations if chain sync has fallen behind
func (s *SplitStore) waitForSync() {
if atomic.LoadInt32(&s.outOfSync) == 0 {
return
}
s.chainSyncMx.RLock()
defer s.chainSyncMx.RUnlock()

for !s.chainSyncFinished {
s.chainSyncCond.Wait()
}
}

// Combined sync and closing check
func (s *SplitStore) checkYield() error {
s.waitForSync()
return s.checkClosing()
}

func (s *SplitStore) endTxnProtect() {
s.txnLk.Lock()
defer s.txnLk.Unlock()
Expand Down Expand Up @@ -1037,7 +1084,7 @@ func (s *SplitStore) walkChain(ts *types.TipSet, inclState, inclMsgs abi.ChainEp

for len(toWalk) > 0 {
// walking can take a while, so check this with every opportunity
if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}

Expand Down Expand Up @@ -1106,7 +1153,7 @@ func (s *SplitStore) walkObject(c cid.Cid, visitor ObjectVisitor, f func(cid.Cid
}

// check this before recursing
if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return 0, err
}

Expand Down Expand Up @@ -1175,7 +1222,7 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, visitor ObjectVisitor, f, m
}

// check this before recursing
if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return sz, err
}

Expand Down Expand Up @@ -1262,7 +1309,7 @@ func (s *SplitStore) moveColdBlocks(coldr *ColdSetReader) error {
batch := make([]blocks.Block, 0, batchSize)

err := coldr.ForEach(func(c cid.Cid) error {
if err := s.checkClosing(); err != nil {
if err := s.checkYield(); err != nil {
return err
}
blk, err := s.hot.Get(s.ctx, c)
Expand Down

0 comments on commit 4561a0e

Please sign in to comment.