From 0c5e336ff19c0530cf48459151c5e6d2ef55ad6b Mon Sep 17 00:00:00 2001 From: vyzo Date: Sat, 10 Jul 2021 16:30:27 +0300 Subject: [PATCH] address review comments --- blockstore/splitstore/splitstore.go | 206 ++++++++++++++++------------ 1 file changed, 117 insertions(+), 89 deletions(-) diff --git a/blockstore/splitstore/splitstore.go b/blockstore/splitstore/splitstore.go index 6a13be19cc5..16b922ab2dd 100644 --- a/blockstore/splitstore/splitstore.go +++ b/blockstore/splitstore/splitstore.go @@ -13,6 +13,7 @@ import ( "time" "go.uber.org/multierr" + "golang.org/x/sync/errgroup" "golang.org/x/xerrors" blocks "github.com/ipfs/go-block-format" @@ -110,10 +111,16 @@ type ChainAccessor interface { SubscribeHeadChanges(change func(revert []*types.TipSet, apply []*types.TipSet) error) } +// hotstore is the interface that must be satisfied by the hot blockstore; it is an extension +// of the Blockstore interface with the traits we need for compaction. +type hotstore interface { + bstore.Blockstore + bstore.BlockstoreIterator +} + type SplitStore struct { - compacting int32 // compaction (or warmp up) in progress - critsection int32 // compaction critical section - closing int32 // the splitstore is closing + compacting int32 // compaction (or warmp up) in progress + closing int32 // the splitstore is closing cfg *Config @@ -125,8 +132,8 @@ type SplitStore struct { chain ChainAccessor ds dstore.Datastore - hot bstore.Blockstore cold bstore.Blockstore + hot hotstore markSetEnv MarkSetEnv markSetSize int64 @@ -139,7 +146,7 @@ type SplitStore struct { // transactional protection for concurrent read/writes during compaction txnLk sync.RWMutex txnActive bool - txnViews *sync.WaitGroup + txnViews sync.WaitGroup txnProtect MarkSet txnRefsMx sync.Mutex txnRefs map[cid.Cid]struct{} @@ -162,9 +169,15 @@ func init() { // is backed by the provided hot and cold stores. The returned SplitStore MUST be // attached to the ChainStore with Start in order to trigger compaction. func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Config) (*SplitStore, error) { - // hot blockstore must support BlockstoreIterator - if _, ok := hot.(bstore.BlockstoreIterator); !ok { - return nil, xerrors.Errorf("hot blockstore does not support efficient iteration: %T", hot) + // hot blockstore must support the hotstore interface + hots, ok := hot.(hotstore) + if !ok { + // be specific about what is missing + if _, ok := hot.(bstore.BlockstoreIterator); !ok { + return nil, xerrors.Errorf("hot blockstore does not support efficient iteration: %T", hot) + } + + return nil, xerrors.Errorf("hot blockstore does not support the necessary traits: %T", hot) } // the markset env @@ -177,12 +190,10 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co ss := &SplitStore{ cfg: cfg, ds: ds, - hot: hot, cold: cold, + hot: hots, markSetEnv: markSetEnv, - txnViews: new(sync.WaitGroup), - coldPurgeSize: defaultColdPurgeSize, } @@ -252,18 +263,13 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) { return blk, nil case bstore.ErrNotFound: - if s.debug != nil { - s.mx.Lock() - warm := s.warmupEpoch > 0 - s.mx.Unlock() - if warm { - s.debug.LogReadMiss(cid) - } + if s.isWarm() { + s.debug.LogReadMiss(cid) } blk, err = s.cold.Get(cid) if err == nil { - stats.Record(context.Background(), metrics.SplitstoreMiss.M(1)) + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return blk, err @@ -294,18 +300,13 @@ func (s *SplitStore) GetSize(cid cid.Cid) (int, error) { return size, nil case bstore.ErrNotFound: - if s.debug != nil { - s.mx.Lock() - warm := s.warmupEpoch > 0 - s.mx.Unlock() - if warm { - s.debug.LogReadMiss(cid) - } + if s.isWarm() { + s.debug.LogReadMiss(cid) } size, err = s.cold.GetSize(cid) if err == nil { - stats.Record(context.Background(), metrics.SplitstoreMiss.M(1)) + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return size, err @@ -393,15 +394,21 @@ func (s *SplitStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { return nil, err } - ch := make(chan cid.Cid) + seen := cid.NewSet() + ch := make(chan cid.Cid, 8) // buffer is arbitrary, just enough to avoid context switches go func() { defer cancel() defer close(ch) for _, in := range []<-chan cid.Cid{chHot, chCold} { - for cid := range in { + for c := range in { + // ensure we only emit each key once + if !seen.Visit(c) { + continue + } + select { - case ch <- cid: + case ch <- c: case <-ctx.Done(): return } @@ -443,18 +450,13 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error { err := s.hot.View(cid, cb) switch err { case bstore.ErrNotFound: - if s.debug != nil { - s.mx.Lock() - warm := s.warmupEpoch > 0 - s.mx.Unlock() - if warm { - s.debug.LogReadMiss(cid) - } + if s.isWarm() { + s.debug.LogReadMiss(cid) } err = s.cold.View(cid, cb) if err == nil { - stats.Record(context.Background(), metrics.SplitstoreMiss.M(1)) + stats.Record(s.ctx, metrics.SplitstoreMiss.M(1)) } return err @@ -463,6 +465,12 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error { } } +func (s *SplitStore) isWarm() bool { + s.mx.Lock() + defer s.mx.Unlock() + return s.warmupEpoch > 0 +} + // State tracking func (s *SplitStore) Start(chain ChainAccessor) error { s.chain = chain @@ -527,11 +535,14 @@ func (s *SplitStore) Start(chain ChainAccessor) error { } func (s *SplitStore) Close() error { - atomic.StoreInt32(&s.closing, 1) + if !atomic.CompareAndSwapInt32(&s.closing, 0, 1) { + // already closing + return nil + } - if atomic.LoadInt32(&s.critsection) == 1 { - log.Warn("ongoing compaction in critical section; waiting for it to finish...") - for atomic.LoadInt32(&s.critsection) == 1 { + if atomic.LoadInt32(&s.compacting) == 1 { + log.Warn("close with ongoing compaction in progress; waiting for it to finish...") + for atomic.LoadInt32(&s.compacting) == 1 { time.Sleep(time.Second) } } @@ -549,12 +560,24 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error { curTs := apply[len(apply)-1] epoch := curTs.Height() + // NOTE: there is an implicit invariant assumption that HeadChange is invoked + // synchronously and no other HeadChange can be invoked while one is in + // progress. + // this is guaranteed by the chainstore, and it is pervasive in all lotus + // -- if that ever changes then all hell will break loose in general and + // we will have a rance to protectTipSets here. if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) { // we are currently compacting -- protect the new tipset(s) s.protectTipSets(apply) return nil } + // check if we are actually closing first + if atomic.LoadInt32(&s.closing) == 1 { + atomic.StoreInt32(&s.compacting, 0) + return nil + } + timestamp := time.Unix(int64(curTs.MinTimestamp()), 0) if time.Since(timestamp) > SyncGapTime { // don't attempt compaction before we have caught up syncing @@ -608,7 +631,7 @@ func (s *SplitStore) protectView(c cid.Cid) *sync.WaitGroup { if !s.txnActive { s.txnViews.Add(1) - return s.txnViews + return &s.txnViews } s.trackTxnRef(c) @@ -653,6 +676,8 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) { } s.txnRefsMx.Lock() + defer s.txnRefsMx.Unlock() + quiet := false for _, c := range cids { if isUnitaryObject(c) { @@ -676,7 +701,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) { s.txnRefs[c] = struct{}{} } - s.txnRefsMx.Unlock() + return } @@ -714,6 +739,7 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { workch <- c count++ } + close(workch) if count == 0 { return nil @@ -727,31 +753,23 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { workers = count } - close(workch) - - worker := func(wg *sync.WaitGroup) { - if wg != nil { - defer wg.Done() - } - + worker := func() error { for c := range workch { err := s.doTxnProtect(c, markSet) if err != nil { - log.Warnf("error protecting transactional references: %s", err) - return + return xerrors.Errorf("error protecting transactional references to %s: %w", c, err) } } + return nil } - if workers > 1 { - wg := new(sync.WaitGroup) - for i := 0; i < workers; i++ { - wg.Add(1) - go worker(wg) - } - wg.Wait() - } else { - worker(nil) + g := new(errgroup.Group) + for i := 0; i < workers; i++ { + g.Go(worker) + } + + if err := g.Wait(); err != nil { + return err } log.Infow("protecting transactional refs done", "took", time.Since(startProtect), "protected", count) @@ -761,6 +779,10 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error { // transactionally protect a reference by walking the object and marking. // concurrent markings are short circuited by checking the markset. func (s *SplitStore) doTxnProtect(root cid.Cid, markSet MarkSet) error { + if err := s.checkClosing(); err != nil { + return err + } + // Note: cold objects are deleted heaviest first, so the consituents of an object // cannot be deleted before the object itself. return s.walkObjectIncomplete(root, cid.NewSet(), @@ -918,7 +940,7 @@ func (s *SplitStore) compact(curTs *types.TipSet, wg *sync.WaitGroup) { start = time.Now() err := s.doCompact(curTs) took := time.Since(start).Milliseconds() - stats.Record(context.Background(), metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3)) + stats.Record(s.ctx, metrics.SplitstoreCompactionTimeSeconds.M(float64(took)/1e3)) if err != nil { log.Errorf("COMPACTION ERROR: %s", err) @@ -991,7 +1013,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { var hotCnt, coldCnt int cold := make([]cid.Cid, 0, s.coldPurgeSize) - err = s.hot.(bstore.BlockstoreIterator).ForEachKey(func(c cid.Cid) error { + err = s.hot.ForEachKey(func(c cid.Cid) error { // was it marked? mark, err := markSet.Has(c) if err != nil { @@ -1021,8 +1043,8 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { } log.Infow("compaction stats", "hot", hotCnt, "cold", coldCnt) - stats.Record(context.Background(), metrics.SplitstoreCompactionHot.M(int64(hotCnt))) - stats.Record(context.Background(), metrics.SplitstoreCompactionCold.M(int64(coldCnt))) + stats.Record(s.ctx, metrics.SplitstoreCompactionHot.M(int64(hotCnt))) + stats.Record(s.ctx, metrics.SplitstoreCompactionCold.M(int64(coldCnt))) if err := s.checkClosing(); err != nil { return err @@ -1064,6 +1086,9 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { log.Infow("sorting done", "took", time.Since(startSort)) // 4.1 protect transactional refs once more + // strictly speaking, this is not necessary as purge will do it before deleting each + // batch. however, there is likely a largish number of references accumulated during + // ths sort and this protects before entering pruge context. err = s.protectTxnRefs(markSet) if err != nil { return xerrors.Errorf("error protecting transactional refs: %w", err) @@ -1073,16 +1098,6 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error { return err } - // Enter critical section - log.Info("entering critical section") - atomic.StoreInt32(&s.critsection, 1) - defer atomic.StoreInt32(&s.critsection, 0) - - // check to see if we are closing first; if that's the case just return - if err := s.checkClosing(); err != nil { - return err - } - // 5. purge cold objects from the hotstore, taking protected references into account log.Info("purging cold objects from the hotstore") startPurge := time.Now() @@ -1119,10 +1134,7 @@ func (s *SplitStore) beginTxnProtect() *sync.WaitGroup { s.txnRefs = make(map[cid.Cid]struct{}) s.txnMissing = make(map[cid.Cid]struct{}) - wg := s.txnViews - s.txnViews = nil - - return wg + return &s.txnViews } func (s *SplitStore) beginTxnMarking(markSet MarkSet) { @@ -1141,11 +1153,13 @@ func (s *SplitStore) endTxnProtect() { return } + // release markset memory + s.txnProtect.Close() + s.txnActive = false s.txnProtect = nil s.txnRefs = nil s.txnMissing = nil - s.txnViews = new(sync.WaitGroup) } func (s *SplitStore) walkChain(ts *types.TipSet, boundary abi.ChainEpoch, inclMsgs bool, @@ -1238,6 +1252,11 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro return nil } + // check this before recursing + if err := s.checkClosing(); err != nil { + return err + } + var links []cid.Cid err := s.view(c, func(data []byte) error { return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { @@ -1294,6 +1313,11 @@ func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing return nil } + // check this before recursing + if err := s.checkClosing(); err != nil { + return err + } + var links []cid.Cid err := s.view(c, func(data []byte) error { return cbg.ScanForLinks(bytes.NewReader(data), func(c cid.Cid) { @@ -1522,24 +1546,28 @@ func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error { func(cids []cid.Cid) error { deadCids := deadCids[:0] - again: - if err := s.checkClosing(); err != nil { - return err - } + for { + if err := s.checkClosing(); err != nil { + return err + } - s.txnLk.Lock() - if len(s.txnRefs) > 0 { + s.txnLk.Lock() + if len(s.txnRefs) == 0 { + // keep the lock! + break + } + + // unlock and protect s.txnLk.Unlock() err := s.protectTxnRefs(markSet) if err != nil { return xerrors.Errorf("error protecting transactional refs: %w", err) } - - goto again } defer s.txnLk.Unlock() + for _, c := range cids { live, err := markSet.Has(c) if err != nil {