Skip to content

Commit

Permalink
final review
Browse files Browse the repository at this point in the history
You know, I'm actually pretty confident in this code.
  • Loading branch information
Stebalien committed Jul 10, 2021
1 parent 870a47f commit 723cfee
Showing 1 changed file with 90 additions and 20 deletions.
110 changes: 90 additions & 20 deletions blockstore/splitstore/splitstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,12 @@ func init() {
// attached to the ChainStore with Start in order to trigger compaction.
func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Config) (*SplitStore, error) {
// hot blockstore must support BlockstoreIterator
// nit: can we just define a local type?
// So we can store this instead of casting multiple times?
type hotStore interface {
bstore.Blockstore
bstore.BlockstoreIterator
}
if _, ok := hot.(bstore.BlockstoreIterator); !ok {
return nil, xerrors.Errorf("hot blockstore does not support efficient iteration: %T", hot)
}
Expand All @@ -181,6 +187,8 @@ func Open(path string, ds dstore.Datastore, hot, cold bstore.Blockstore, cfg *Co
cold: cold,
markSetEnv: markSetEnv,

// nit: this doesn't need to be allocated as long as we don't move it.
// fyi, you _can_ return &s.txnViews.
txnViews: new(sync.WaitGroup),

coldPurgeSize: defaultColdPurgeSize,
Expand Down Expand Up @@ -263,6 +271,7 @@ func (s *SplitStore) Get(cid cid.Cid) (blocks.Block, error) {

blk, err = s.cold.Get(cid)
if err == nil {
// nit: use the stored context?
stats.Record(context.Background(), metrics.SplitstoreMiss.M(1))

}
Expand Down Expand Up @@ -393,20 +402,46 @@ func (s *SplitStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, err
}

ch := make(chan cid.Cid)
// arbitrary but >= 1 helps avoid context switching in this case
ch := make(chan cid.Cid, 8)
go func() {
defer cancel()
defer close(ch)

for _, in := range []<-chan cid.Cid{chHot, chCold} {
for cid := range in {
select {
case ch <- cid:
case <-ctx.Done():
return
// uber nit unless we have a reason to do this in order?
// Of course, this code is more complicated and also probably overkill.
// I actually kind of prefer the old code.
// But I'll just leave this here because I already wrote it.
for chHot != nil && chCold != nil {
var c cid.Cid
var ok bool
select {
case c, ok = <-chHot:
if !ok {
chHot = nil
continue
}
case c, ok = <-chCold:
if !ok {
chCold = nil
continue
}
case <-ctx.Done():
return
}

select {
case ch <- c:
case <-ctx.Done():
return
}
}

// Also note: We're _supposed_ to guarantee that CIDs are only returned once. We
// _probably_ need to put the CIDs in a set to check that.
//
// That might actually be a reason to read through the hotstore before the
// coldstore?
}()

return ch, nil
Expand Down Expand Up @@ -435,14 +470,18 @@ func (s *SplitStore) View(cid cid.Cid, cb func([]byte) error) error {
// view can't have its data pointer deleted, which would be catastrophic.
// Note that we can't just RLock for the duration of the view, as this could
// lead to deadlock with recursive views.
wg := s.protectView(cid)
if wg != nil {
defer wg.Done()
}

// What happens if the transaction ends while a view is open? What happens if a transaction
// ends then re-opens while a view is open? Highly unlikely... but I wonder if we should
// just use the wait-group regardless?

// ultra-nit: you _can_ make this _slightly_ simpler by returning a possibly no-op function:
defer s.protectView(cid)()

err := s.hot.View(cid, cb)
switch err {
case bstore.ErrNotFound:
// I see we repeat this pattern a lot. Can we put it in a helper?
if s.debug != nil {
s.mx.Lock()
warm := s.warmupEpoch > 0
Expand Down Expand Up @@ -529,6 +568,7 @@ func (s *SplitStore) Start(chain ChainAccessor) error {
func (s *SplitStore) Close() error {
atomic.StoreInt32(&s.closing, 1)

// I'm _pretty_ sure we can just use a lock here now. Doesn't even need to be a rwmutex.
if atomic.LoadInt32(&s.critsection) == 1 {
log.Warn("ongoing compaction in critical section; waiting for it to finish...")
for atomic.LoadInt32(&s.critsection) == 1 {
Expand All @@ -551,6 +591,15 @@ func (s *SplitStore) HeadChange(_, apply []*types.TipSet) error {

if !atomic.CompareAndSwapInt32(&s.compacting, 0, 1) {
// we are currently compacting -- protect the new tipset(s)

// race: we call beginTxnProtect _after_ setting compacting.
// 1. HeadChange triggers a compaction but stops immediately after setting this
// flag.
// 2. HeadChange tries ties to trigger another compaction, sees one is already in
// progress, and calls protectTipSets.
// 3. protectTipSets doesn't do anything because txnActive is still false.
//
// Fix: use a lock and only release it once everything is "setup" correctly.
s.protectTipSets(apply)
return nil
}
Expand Down Expand Up @@ -676,6 +725,7 @@ func (s *SplitStore) trackTxnRefMany(cids []cid.Cid) {

s.txnRefs[c] = struct{}{}
}
// defer?
s.txnRefsMx.Unlock()
return
}
Expand Down Expand Up @@ -729,6 +779,8 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {

close(workch)

// FYI, https://pkg.go.dev/golang.org/x/sync/errgroup is a wonderful package.
// Example: https://github.com/filecoin-project/specs-actors/blob/ab02f795d994cd9105eefe2760a1846cfcfef192/actors/migration/nv13/top.go#L62-L221
worker := func(wg *sync.WaitGroup) {
if wg != nil {
defer wg.Done()
Expand All @@ -737,6 +789,9 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
for c := range workch {
err := s.doTxnProtect(c, markSet)
if err != nil {
// I would _NOT_ proceed if we fail here. Any errors like
// this in GC should cause us to abort and (to avoid
// deleting anything).
log.Warnf("error protecting transactional references: %s", err)
return
}
Expand All @@ -751,6 +806,7 @@ func (s *SplitStore) protectTxnRefs(markSet MarkSet) error {
}
wg.Wait()
} else {
// This really isn't a necessary optimization.
worker(nil)
}

Expand Down Expand Up @@ -1063,6 +1119,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
}
log.Infow("sorting done", "took", time.Since(startSort))

// Can we add a comment explaining why we might want to do this here instead of just waiting till we purge?
// 4.1 protect transactional refs once more
err = s.protectTxnRefs(markSet)
if err != nil {
Expand Down Expand Up @@ -1093,6 +1150,7 @@ func (s *SplitStore) doCompact(curTs *types.TipSet) error {
log.Infow("purging cold objects from hotstore done", "took", time.Since(startPurge))

// we are done; do some housekeeping
// We have a bunch of ways to return early without invoking this. Maybe we need to defer it? Or set some kind of dirty bit?
s.endTxnProtect()
s.gcHotstore()

Expand Down Expand Up @@ -1126,6 +1184,7 @@ func (s *SplitStore) beginTxnProtect() *sync.WaitGroup {
}

func (s *SplitStore) beginTxnMarking(markSet MarkSet) {
// No lock? Have we tested this with the golang race detector?
markSet.SetConcurrent()

s.txnLk.Lock()
Expand Down Expand Up @@ -1259,6 +1318,7 @@ func (s *SplitStore) walkObject(c cid.Cid, walked *cid.Set, f func(cid.Cid) erro
return nil
}

// Maybe check closing in here? This could run for a while.
// like walkObject, but the object may be potentially incomplete (references missing)
func (s *SplitStore) walkObjectIncomplete(c cid.Cid, walked *cid.Set, f, missing func(cid.Cid) error) error {
if !walked.Visit(c) {
Expand Down Expand Up @@ -1522,24 +1582,31 @@ func (s *SplitStore) purge(cids []cid.Cid, markSet MarkSet) error {
func(cids []cid.Cid) error {
deadCids := deadCids[:0]

again:
if err := s.checkClosing(); err != nil {
return err
}
for {
if err := s.checkClosing(); err != nil {
return err
}

s.txnLk.Lock()
if len(s.txnRefs) > 0 {
s.txnLk.Lock()
if len(s.txnRefs) == 0 {
// keep the lock when we hit the desired condition.
break
}
s.txnLk.Unlock()

// Doesn't really matter but...
//
// It's slightly strange that we're effectively looping _twice_.
// Maybe break protectTxnRefs into two functions? One for the loop,
// and one to do the actual thing?
//
err := s.protectTxnRefs(markSet)
if err != nil {
return xerrors.Errorf("error protecting transactional refs: %w", err)
}

goto again
}

defer s.txnLk.Unlock()

for _, c := range cids {
live, err := markSet.Has(c)
if err != nil {
Expand Down Expand Up @@ -1635,6 +1702,8 @@ func (s *SplitStore) waitForMissingRefs(markSet MarkSet) {
}
}

// Do we really want to proceed here? Maybe we should just abort and try again later?
// Also, aren't these errors?
if len(missing) > 0 {
log.Warnf("still missing %d references", len(missing))
for c := range missing {
Expand Down Expand Up @@ -1699,6 +1768,7 @@ func bytesToUint64(buf []byte) uint64 {
return i
}

// So, I get the name... I think? But maybe flip it and say `isStateObject` (or `notStateObject`)?
func isUnitaryObject(c cid.Cid) bool {
pre := c.Prefix()
switch pre.Codec {
Expand Down

0 comments on commit 723cfee

Please sign in to comment.