Skip to content

Commit

Permalink
storage: suggest compaction when replicas are deleted
Browse files Browse the repository at this point in the history
In aggregate, if enough contiguous replicas are migrated away from a
range, we would like to compact the underlying RocksDB storage engine.
To that end, on clear a replica's data, we suggest a compaction.

Fixed various bugs with the compactor queue - in particular, the call
in the main processing loop to stop the timer was misplaced and caused
the timer to be permanently disabled. Also, fixed an indexing error
in log messages and updated comments.

Release note: None
  • Loading branch information
spencerkimball committed Jan 3, 2018
1 parent 09234bf commit 8862b5d
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 32 deletions.
58 changes: 58 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3858,3 +3858,61 @@ func TestFailedConfChange(t *testing.T) {
t.Fatal(err)
}
}

// TestStoreRangeRemovalCompactionSuggestion verifies that if a replica
// is removed from a store, a compaction suggestion is made to the
// compactor queue.
func TestStoreRangeRemovalCompactionSuggestion(t *testing.T) {
defer leaktest.AfterTest(t)()
sc := storage.TestStoreConfig(nil)
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 3)

const rangeID = roachpb.RangeID(1)
mtc.replicateRange(rangeID, 1, 2)

repl, err := mtc.stores[0].GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}
ctx := repl.AnnotateCtx(context.Background())

deleteStore := mtc.stores[2]
if err := repl.ChangeReplicas(
ctx,
roachpb.REMOVE_REPLICA,
roachpb.ReplicationTarget{
NodeID: deleteStore.Ident.NodeID,
StoreID: deleteStore.Ident.StoreID,
},
repl.Desc(),
storage.ReasonRebalance,
"",
); err != nil {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
// Function to check compaction metrics indicating a suggestion
// was queued or a compaction was processed or skipped.
haveCompaction := func(s *storage.Store, exp bool) error {
queued := s.Compactor().Metrics.BytesQueued.Value()
comps := s.Compactor().Metrics.BytesCompacted.Count()
skipped := s.Compactor().Metrics.BytesSkipped.Count()
if exp != (queued > 0 || comps > 0 || skipped > 0) {
return errors.Errorf("%s: expected non-zero compaction metrics? %t; got queued=%d, compactions=%d, skipped=%d",
s, exp, queued, comps, skipped)
}
return nil
}
// Verify that no compaction metrics are showing non-zero bytes in the
// other stores.
for _, s := range mtc.stores {
if err := haveCompaction(s, s == deleteStore); err != nil {
return err
}
}
return nil
})
}
65 changes: 39 additions & 26 deletions pkg/storage/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,27 @@ func NewCompactor(eng engine.WithSSTables, capFn storeCapacityFunc) *Compactor {
// provided stopper indicates. Processing is done with a periodicity of
// compactionMinInterval, but only if there are compactions pending.
func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stopper *stop.Stopper) {
if empty, err := c.isSpanEmpty(
ctx, keys.LocalStoreSuggestedCompactionsMin, keys.LocalStoreSuggestedCompactionsMax,
); err != nil {
log.Warningf(ctx, "failed check whether compaction suggestions exist: %s", err)
} else if !empty {
log.Eventf(ctx, "compactor starting in %s as there are suggested compactions pending", c.opts.CompactionMinInterval)
c.ch <- struct{}{} // wake up the goroutine immediately
}
// Wake up immediately to examine the queue and set the bytes queued metric.
c.ch <- struct{}{}

stopper.RunWorker(ctx, func(ctx context.Context) {
var timer timeutil.Timer
defer timer.Stop()
var timerSet bool

for {
select {
case <-c.ch:
// A new suggestion was made. Examine the compaction queue,
// which returns the number of bytes queued.
if bytesQueued, err := c.examineQueue(ctx); err != nil {
log.Warningf(ctx, "failed check whether compaction suggestions exist: %s", err)
} else if bytesQueued > 0 {
log.Eventf(ctx, "compactor starting in %s as there are suggested compactions pending", c.opts.CompactionMinInterval)
} else {
// Queue is empty, don't set the timer. This can happen only at startup.
break
}
// Set the wait timer if not already set.
if !timerSet {
timer.Reset(c.opts.CompactionMinInterval)
Expand All @@ -135,16 +141,15 @@ func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stoppe

case <-timer.C:
timer.Read = true
timer.Stop()
spanCtx, cleanup := tracing.EnsureContext(ctx, tracer, "process suggested compactions")
ok, err := c.processSuggestions(spanCtx)
if err != nil {
log.Warningf(spanCtx, "failed processing suggested compactions: %s", err)
}
cleanup()
if ok {
// Everything has been processed. Wait for the next
// suggested compaction before resetting the timer.
// The queue was processed. Wait for the next suggested
// compaction before resetting timer.
timerSet = false
break
}
Expand Down Expand Up @@ -185,7 +190,7 @@ func (aggr aggregatedCompaction) String() string {
if len(aggr.suggestions) == 1 {
seqFmt = fmt.Sprintf("#%d/%d", aggr.startIdx+1, aggr.total)
} else {
seqFmt = fmt.Sprintf("#%d-%d/%d", aggr.startIdx+1, aggr.startIdx+len(aggr.suggestions)+1, aggr.total)
seqFmt = fmt.Sprintf("#%d-%d/%d", aggr.startIdx+1, aggr.startIdx+len(aggr.suggestions), aggr.total)
}
return fmt.Sprintf("%s (%s-%s) for %s", seqFmt, aggr.StartKey, aggr.EndKey, humanizeutil.IBytes(aggr.Bytes))
}
Expand All @@ -195,7 +200,7 @@ func (aggr aggregatedCompaction) String() string {
// exceed the absolute or fractional size thresholds. If suggested
// compactions don't meet thresholds, they're discarded if they're
// older than maxSuggestedCompactionRecordAge. Returns a boolean
// indicating whether the processing occurred.
// indicating whether the queue was successfully processed.
func (c *Compactor) processSuggestions(ctx context.Context) (bool, error) {
// Collect all suggestions.
var suggestions []storagebase.SuggestedCompaction
Expand Down Expand Up @@ -332,7 +337,11 @@ func (c *Compactor) processCompaction(
log.Fatal(ctx, err) // should never happen on a batch
}
}
return aggr.Bytes, nil

if shouldProcess {
return aggr.Bytes, nil
}
return 0, nil
}

// aggregateCompaction merges sc into aggr, to create a new suggested
Expand Down Expand Up @@ -369,22 +378,26 @@ func (c *Compactor) aggregateCompaction(
return false // aggregated successfully
}

// isSpanEmpty returns whether the specified key span is empty (true)
// or contains keys (false).
func (c *Compactor) isSpanEmpty(ctx context.Context, start, end roachpb.Key) (bool, error) {
// If there are any suggested compactions, start the compaction timer.
var empty = true
// examineQueue returns the total number of bytes queued and updates the
// BytesQueued gauge.
func (c *Compactor) examineQueue(ctx context.Context) (int64, error) {
var totalBytes int64
if err := c.eng.Iterate(
engine.MVCCKey{Key: start},
engine.MVCCKey{Key: end},
func(_ engine.MVCCKeyValue) (bool, error) {
empty = false
return true, nil // don't continue iteration
engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMin},
engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMax},
func(kv engine.MVCCKeyValue) (bool, error) {
var c storagebase.Compaction
if err := protoutil.Unmarshal(kv.Value, &c); err != nil {
return false, err
}
totalBytes += c.Bytes
return false, nil // continue iteration
},
); err != nil {
return false, err
return 0, err
}
return empty, nil
c.Metrics.BytesQueued.Update(totalBytes)
return totalBytes, nil
}

// SuggestCompaction writes the specified compaction to persistent
Expand Down
8 changes: 3 additions & 5 deletions pkg/storage/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func TestCompactorProcessingInitialization(t *testing.T) {
defer cleanup()

// Add a suggested compaction -- this won't get processed by this
// compactor for two minutes.
// compactor for an hour.
compactor.opts.CompactionMinInterval = time.Hour
compactor.SuggestCompaction(context.Background(), storagebase.SuggestedCompaction{
StartKey: key("a"), EndKey: key("b"),
Expand Down Expand Up @@ -567,10 +567,8 @@ func TestCompactorCleansUpOldRecords(t *testing.T) {
return fmt.Errorf("expected skipped bytes %d; got %d", e, a)
}
// Verify compaction queue is empty.
if empty, err := compactor.isSpanEmpty(
context.Background(), keys.LocalStoreSuggestedCompactionsMin, keys.LocalStoreSuggestedCompactionsMax,
); err != nil || !empty {
return fmt.Errorf("compaction queue not empty or err: %t, %v", empty, err)
if bytesQueued, err := compactor.examineQueue(context.Background()); err != nil || bytesQueued > 0 {
return fmt.Errorf("compaction queue not empty (%d bytes) or err %v", bytesQueued, err)
}
return nil
})
Expand Down
13 changes: 12 additions & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,11 +775,22 @@ func (r *Replica) destroyDataRaftMuLocked(

// NB: this uses the local descriptor instead of the consistent one to match
// the data on disk.
if err := clearRangeData(ctx, r.Desc(), ms.KeyCount, r.store.Engine(), batch); err != nil {
desc := r.Desc()
if err := clearRangeData(ctx, desc, ms.KeyCount, r.store.Engine(), batch); err != nil {
return err
}
clearTime := timeutil.Now()

// Suggest the cleared range to the compactor queue.
r.store.compactor.SuggestCompaction(ctx, storagebase.SuggestedCompaction{
StartKey: roachpb.Key(desc.StartKey),
EndKey: roachpb.Key(desc.EndKey),
Compaction: storagebase.Compaction{
Bytes: ms.Total(),
SuggestedAtNanos: clearTime.UnixNano(),
},
})

// Save a tombstone to ensure that replica IDs never get reused.
if err := r.setTombstoneKey(ctx, batch, &consistentDesc); err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,9 @@ func (s *Store) DB() *client.DB { return s.cfg.DB }
// Gossip accessor.
func (s *Store) Gossip() *gossip.Gossip { return s.cfg.Gossip }

// Compactor accessor.
func (s *Store) Compactor() *compactor.Compactor { return s.compactor }

// Stopper accessor.
func (s *Store) Stopper() *stop.Stopper { return s.stopper }

Expand Down

0 comments on commit 8862b5d

Please sign in to comment.