Skip to content

Commit

Permalink
Merge pull request #21178 from spencerkimball/migration-compaction-su…
Browse files Browse the repository at this point in the history
…ggestion

storage: suggest compaction when replicas are deleted
  • Loading branch information
spencerkimball authored Jan 4, 2018
2 parents 99d39b3 + 25744b2 commit 17ae6e7
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func TestCorruptedClusterID(t *testing.T) {

engines := []engine.Engine{e}
_, serverAddr, cfg, node, stopper := createTestNode(util.TestAddr, engines, nil, t)
stopper.Stop(context.TODO())
defer stopper.Stop(context.TODO())
bootstrappedEngines, newEngines, cv, err := inspectEngines(
context.TODO(), engines, cfg.Settings.Version.MinSupportedVersion,
cfg.Settings.Version.ServerVersion, node.clusterID)
Expand Down
58 changes: 58 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3858,3 +3858,61 @@ func TestFailedConfChange(t *testing.T) {
t.Fatal(err)
}
}

// TestStoreRangeRemovalCompactionSuggestion verifies that if a replica
// is removed from a store, a compaction suggestion is made to the
// compactor queue.
func TestStoreRangeRemovalCompactionSuggestion(t *testing.T) {
defer leaktest.AfterTest(t)()
sc := storage.TestStoreConfig(nil)
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 3)

const rangeID = roachpb.RangeID(1)
mtc.replicateRange(rangeID, 1, 2)

repl, err := mtc.stores[0].GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}
ctx := repl.AnnotateCtx(context.Background())

deleteStore := mtc.stores[2]
if err := repl.ChangeReplicas(
ctx,
roachpb.REMOVE_REPLICA,
roachpb.ReplicationTarget{
NodeID: deleteStore.Ident.NodeID,
StoreID: deleteStore.Ident.StoreID,
},
repl.Desc(),
storage.ReasonRebalance,
"",
); err != nil {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
// Function to check compaction metrics indicating a suggestion
// was queued or a compaction was processed or skipped.
haveCompaction := func(s *storage.Store, exp bool) error {
queued := s.Compactor().Metrics.BytesQueued.Value()
comps := s.Compactor().Metrics.BytesCompacted.Count()
skipped := s.Compactor().Metrics.BytesSkipped.Count()
if exp != (queued > 0 || comps > 0 || skipped > 0) {
return errors.Errorf("%s: expected non-zero compaction metrics? %t; got queued=%d, compactions=%d, skipped=%d",
s, exp, queued, comps, skipped)
}
return nil
}
// Verify that no compaction metrics are showing non-zero bytes in the
// other stores.
for _, s := range mtc.stores {
if err := haveCompaction(s, s == deleteStore); err != nil {
return err
}
}
return nil
})
}
71 changes: 42 additions & 29 deletions pkg/storage/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,30 @@ func NewCompactor(eng engine.WithSSTables, capFn storeCapacityFunc) *Compactor {
// provided stopper indicates. Processing is done with a periodicity of
// compactionMinInterval, but only if there are compactions pending.
func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stopper *stop.Stopper) {
if empty, err := c.isSpanEmpty(
ctx, keys.LocalStoreSuggestedCompactionsMin, keys.LocalStoreSuggestedCompactionsMax,
); err != nil {
log.Warningf(ctx, "failed check whether compaction suggestions exist: %s", err)
} else if !empty {
log.Eventf(ctx, "compactor starting in %s as there are suggested compactions pending", c.opts.CompactionMinInterval)
c.ch <- struct{}{} // wake up the goroutine immediately
}
// Wake up immediately to examine the queue and set the bytes queued metric.
c.ch <- struct{}{}

stopper.RunWorker(ctx, func(ctx context.Context) {
var timer timeutil.Timer
defer timer.Stop()
var timerSet bool

for {
select {
case <-stopper.ShouldStop():
return

case <-c.ch:
// A new suggestion was made. Examine the compaction queue,
// which returns the number of bytes queued.
if bytesQueued, err := c.examineQueue(ctx); err != nil {
log.Warningf(ctx, "failed check whether compaction suggestions exist: %s", err)
} else if bytesQueued > 0 {
log.Eventf(ctx, "compactor starting in %s as there are suggested compactions pending", c.opts.CompactionMinInterval)
} else {
// Queue is empty, don't set the timer. This can happen only at startup.
break
}
// Set the wait timer if not already set.
if !timerSet {
timer.Reset(c.opts.CompactionMinInterval)
Expand All @@ -135,26 +144,22 @@ func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stoppe

case <-timer.C:
timer.Read = true
timer.Stop()
spanCtx, cleanup := tracing.EnsureContext(ctx, tracer, "process suggested compactions")
ok, err := c.processSuggestions(spanCtx)
if err != nil {
log.Warningf(spanCtx, "failed processing suggested compactions: %s", err)
}
cleanup()
if ok {
// Everything has been processed. Wait for the next
// suggested compaction before resetting the timer.
// The queue was processed. Wait for the next suggested
// compaction before resetting timer.
timerSet = false
break
}
// Reset the timer to re-attempt processing after the minimum
// compaction interval.
timer.Reset(c.opts.CompactionMinInterval)
timerSet = true

case <-stopper.ShouldStop():
return
}
}
})
Expand Down Expand Up @@ -185,7 +190,7 @@ func (aggr aggregatedCompaction) String() string {
if len(aggr.suggestions) == 1 {
seqFmt = fmt.Sprintf("#%d/%d", aggr.startIdx+1, aggr.total)
} else {
seqFmt = fmt.Sprintf("#%d-%d/%d", aggr.startIdx+1, aggr.startIdx+len(aggr.suggestions)+1, aggr.total)
seqFmt = fmt.Sprintf("#%d-%d/%d", aggr.startIdx+1, aggr.startIdx+len(aggr.suggestions), aggr.total)
}
return fmt.Sprintf("%s (%s-%s) for %s", seqFmt, aggr.StartKey, aggr.EndKey, humanizeutil.IBytes(aggr.Bytes))
}
Expand All @@ -195,7 +200,7 @@ func (aggr aggregatedCompaction) String() string {
// exceed the absolute or fractional size thresholds. If suggested
// compactions don't meet thresholds, they're discarded if they're
// older than maxSuggestedCompactionRecordAge. Returns a boolean
// indicating whether the processing occurred.
// indicating whether the queue was successfully processed.
func (c *Compactor) processSuggestions(ctx context.Context) (bool, error) {
// Collect all suggestions.
var suggestions []storagebase.SuggestedCompaction
Expand Down Expand Up @@ -332,7 +337,11 @@ func (c *Compactor) processCompaction(
log.Fatal(ctx, err) // should never happen on a batch
}
}
return aggr.Bytes, nil

if shouldProcess {
return aggr.Bytes, nil
}
return 0, nil
}

// aggregateCompaction merges sc into aggr, to create a new suggested
Expand Down Expand Up @@ -369,22 +378,26 @@ func (c *Compactor) aggregateCompaction(
return false // aggregated successfully
}

// isSpanEmpty returns whether the specified key span is empty (true)
// or contains keys (false).
func (c *Compactor) isSpanEmpty(ctx context.Context, start, end roachpb.Key) (bool, error) {
// If there are any suggested compactions, start the compaction timer.
var empty = true
// examineQueue returns the total number of bytes queued and updates the
// BytesQueued gauge.
func (c *Compactor) examineQueue(ctx context.Context) (int64, error) {
var totalBytes int64
if err := c.eng.Iterate(
engine.MVCCKey{Key: start},
engine.MVCCKey{Key: end},
func(_ engine.MVCCKeyValue) (bool, error) {
empty = false
return true, nil // don't continue iteration
engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMin},
engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMax},
func(kv engine.MVCCKeyValue) (bool, error) {
var c storagebase.Compaction
if err := protoutil.Unmarshal(kv.Value, &c); err != nil {
return false, err
}
totalBytes += c.Bytes
return false, nil // continue iteration
},
); err != nil {
return false, err
return 0, err
}
return empty, nil
c.Metrics.BytesQueued.Update(totalBytes)
return totalBytes, nil
}

// SuggestCompaction writes the specified compaction to persistent
Expand Down
8 changes: 3 additions & 5 deletions pkg/storage/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func TestCompactorProcessingInitialization(t *testing.T) {
defer cleanup()

// Add a suggested compaction -- this won't get processed by this
// compactor for two minutes.
// compactor for an hour.
compactor.opts.CompactionMinInterval = time.Hour
compactor.SuggestCompaction(context.Background(), storagebase.SuggestedCompaction{
StartKey: key("a"), EndKey: key("b"),
Expand Down Expand Up @@ -567,10 +567,8 @@ func TestCompactorCleansUpOldRecords(t *testing.T) {
return fmt.Errorf("expected skipped bytes %d; got %d", e, a)
}
// Verify compaction queue is empty.
if empty, err := compactor.isSpanEmpty(
context.Background(), keys.LocalStoreSuggestedCompactionsMin, keys.LocalStoreSuggestedCompactionsMax,
); err != nil || !empty {
return fmt.Errorf("compaction queue not empty or err: %t, %v", empty, err)
if bytesQueued, err := compactor.examineQueue(context.Background()); err != nil || bytesQueued > 0 {
return fmt.Errorf("compaction queue not empty (%d bytes) or err %v", bytesQueued, err)
}
return nil
})
Expand Down
13 changes: 12 additions & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,11 +775,22 @@ func (r *Replica) destroyDataRaftMuLocked(

// NB: this uses the local descriptor instead of the consistent one to match
// the data on disk.
if err := clearRangeData(ctx, r.Desc(), ms.KeyCount, r.store.Engine(), batch); err != nil {
desc := r.Desc()
if err := clearRangeData(ctx, desc, ms.KeyCount, r.store.Engine(), batch); err != nil {
return err
}
clearTime := timeutil.Now()

// Suggest the cleared range to the compactor queue.
r.store.compactor.SuggestCompaction(ctx, storagebase.SuggestedCompaction{
StartKey: roachpb.Key(desc.StartKey),
EndKey: roachpb.Key(desc.EndKey),
Compaction: storagebase.Compaction{
Bytes: ms.Total(),
SuggestedAtNanos: clearTime.UnixNano(),
},
})

// Save a tombstone to ensure that replica IDs never get reused.
if err := r.setTombstoneKey(ctx, batch, &consistentDesc); err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1772,6 +1772,9 @@ func (s *Store) DB() *client.DB { return s.cfg.DB }
// Gossip accessor.
func (s *Store) Gossip() *gossip.Gossip { return s.cfg.Gossip }

// Compactor accessor.
func (s *Store) Compactor() *compactor.Compactor { return s.compactor }

// Stopper accessor.
func (s *Store) Stopper() *stop.Stopper { return s.stopper }

Expand Down

0 comments on commit 17ae6e7

Please sign in to comment.