Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: suggest compaction when replicas are deleted #21178

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/server/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,7 +392,7 @@ func TestCorruptedClusterID(t *testing.T) {

engines := []engine.Engine{e}
_, serverAddr, cfg, node, stopper := createTestNode(util.TestAddr, engines, nil, t)
stopper.Stop(context.TODO())
defer stopper.Stop(context.TODO())
bootstrappedEngines, newEngines, cv, err := inspectEngines(
context.TODO(), engines, cfg.Settings.Version.MinSupportedVersion,
cfg.Settings.Version.ServerVersion, node.clusterID)
Expand Down
58 changes: 58 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3858,3 +3858,61 @@ func TestFailedConfChange(t *testing.T) {
t.Fatal(err)
}
}

// TestStoreRangeRemovalCompactionSuggestion verifies that if a replica
// is removed from a store, a compaction suggestion is made to the
// compactor queue.
func TestStoreRangeRemovalCompactionSuggestion(t *testing.T) {
defer leaktest.AfterTest(t)()
sc := storage.TestStoreConfig(nil)
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 3)

const rangeID = roachpb.RangeID(1)
mtc.replicateRange(rangeID, 1, 2)

repl, err := mtc.stores[0].GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}
ctx := repl.AnnotateCtx(context.Background())

deleteStore := mtc.stores[2]
if err := repl.ChangeReplicas(
ctx,
roachpb.REMOVE_REPLICA,
roachpb.ReplicationTarget{
NodeID: deleteStore.Ident.NodeID,
StoreID: deleteStore.Ident.StoreID,
},
repl.Desc(),
storage.ReasonRebalance,
"",
); err != nil {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
// Function to check compaction metrics indicating a suggestion
// was queued or a compaction was processed or skipped.
haveCompaction := func(s *storage.Store, exp bool) error {
queued := s.Compactor().Metrics.BytesQueued.Value()
comps := s.Compactor().Metrics.BytesCompacted.Count()
skipped := s.Compactor().Metrics.BytesSkipped.Count()
if exp != (queued > 0 || comps > 0 || skipped > 0) {
return errors.Errorf("%s: expected non-zero compaction metrics? %t; got queued=%d, compactions=%d, skipped=%d",
s, exp, queued, comps, skipped)
}
return nil
}
// Verify that no compaction metrics are showing non-zero bytes in the
// other stores.
for _, s := range mtc.stores {
if err := haveCompaction(s, s == deleteStore); err != nil {
return err
}
}
return nil
})
}
71 changes: 42 additions & 29 deletions pkg/storage/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,30 @@ func NewCompactor(eng engine.WithSSTables, capFn storeCapacityFunc) *Compactor {
// provided stopper indicates. Processing is done with a periodicity of
// compactionMinInterval, but only if there are compactions pending.
func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stopper *stop.Stopper) {
if empty, err := c.isSpanEmpty(
ctx, keys.LocalStoreSuggestedCompactionsMin, keys.LocalStoreSuggestedCompactionsMax,
); err != nil {
log.Warningf(ctx, "failed check whether compaction suggestions exist: %s", err)
} else if !empty {
log.Eventf(ctx, "compactor starting in %s as there are suggested compactions pending", c.opts.CompactionMinInterval)
c.ch <- struct{}{} // wake up the goroutine immediately
}
// Wake up immediately to examine the queue and set the bytes queued metric.
c.ch <- struct{}{}

stopper.RunWorker(ctx, func(ctx context.Context) {
var timer timeutil.Timer
defer timer.Stop()
var timerSet bool

for {
select {
case <-stopper.ShouldStop():
return

case <-c.ch:
// A new suggestion was made. Examine the compaction queue,
// which returns the number of bytes queued.
if bytesQueued, err := c.examineQueue(ctx); err != nil {
log.Warningf(ctx, "failed check whether compaction suggestions exist: %s", err)
} else if bytesQueued > 0 {
log.Eventf(ctx, "compactor starting in %s as there are suggested compactions pending", c.opts.CompactionMinInterval)
} else {
// Queue is empty, don't set the timer. This can happen only at startup.
break
}
// Set the wait timer if not already set.
if !timerSet {
timer.Reset(c.opts.CompactionMinInterval)
Expand All @@ -135,26 +144,22 @@ func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stoppe

case <-timer.C:
timer.Read = true
timer.Stop()
spanCtx, cleanup := tracing.EnsureContext(ctx, tracer, "process suggested compactions")
ok, err := c.processSuggestions(spanCtx)
if err != nil {
log.Warningf(spanCtx, "failed processing suggested compactions: %s", err)
}
cleanup()
if ok {
// Everything has been processed. Wait for the next
// suggested compaction before resetting the timer.
// The queue was processed. Wait for the next suggested
// compaction before resetting timer.
timerSet = false
break
}
// Reset the timer to re-attempt processing after the minimum
// compaction interval.
timer.Reset(c.opts.CompactionMinInterval)
timerSet = true

case <-stopper.ShouldStop():
return
}
}
})
Expand Down Expand Up @@ -185,7 +190,7 @@ func (aggr aggregatedCompaction) String() string {
if len(aggr.suggestions) == 1 {
seqFmt = fmt.Sprintf("#%d/%d", aggr.startIdx+1, aggr.total)
} else {
seqFmt = fmt.Sprintf("#%d-%d/%d", aggr.startIdx+1, aggr.startIdx+len(aggr.suggestions)+1, aggr.total)
seqFmt = fmt.Sprintf("#%d-%d/%d", aggr.startIdx+1, aggr.startIdx+len(aggr.suggestions), aggr.total)
}
return fmt.Sprintf("%s (%s-%s) for %s", seqFmt, aggr.StartKey, aggr.EndKey, humanizeutil.IBytes(aggr.Bytes))
}
Expand All @@ -195,7 +200,7 @@ func (aggr aggregatedCompaction) String() string {
// exceed the absolute or fractional size thresholds. If suggested
// compactions don't meet thresholds, they're discarded if they're
// older than maxSuggestedCompactionRecordAge. Returns a boolean
// indicating whether the processing occurred.
// indicating whether the queue was successfully processed.
func (c *Compactor) processSuggestions(ctx context.Context) (bool, error) {
// Collect all suggestions.
var suggestions []storagebase.SuggestedCompaction
Expand Down Expand Up @@ -332,7 +337,11 @@ func (c *Compactor) processCompaction(
log.Fatal(ctx, err) // should never happen on a batch
}
}
return aggr.Bytes, nil

if shouldProcess {
return aggr.Bytes, nil
}
return 0, nil
}

// aggregateCompaction merges sc into aggr, to create a new suggested
Expand Down Expand Up @@ -369,22 +378,26 @@ func (c *Compactor) aggregateCompaction(
return false // aggregated successfully
}

// isSpanEmpty returns whether the specified key span is empty (true)
// or contains keys (false).
func (c *Compactor) isSpanEmpty(ctx context.Context, start, end roachpb.Key) (bool, error) {
// If there are any suggested compactions, start the compaction timer.
var empty = true
// examineQueue returns the total number of bytes queued and updates the
// BytesQueued gauge.
func (c *Compactor) examineQueue(ctx context.Context) (int64, error) {
var totalBytes int64
if err := c.eng.Iterate(
engine.MVCCKey{Key: start},
engine.MVCCKey{Key: end},
func(_ engine.MVCCKeyValue) (bool, error) {
empty = false
return true, nil // don't continue iteration
engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMin},
engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMax},
func(kv engine.MVCCKeyValue) (bool, error) {
var c storagebase.Compaction
if err := protoutil.Unmarshal(kv.Value, &c); err != nil {
return false, err
}
totalBytes += c.Bytes
return false, nil // continue iteration
},
); err != nil {
return false, err
return 0, err
}
return empty, nil
c.Metrics.BytesQueued.Update(totalBytes)
return totalBytes, nil
}

// SuggestCompaction writes the specified compaction to persistent
Expand Down
8 changes: 3 additions & 5 deletions pkg/storage/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ func TestCompactorProcessingInitialization(t *testing.T) {
defer cleanup()

// Add a suggested compaction -- this won't get processed by this
// compactor for two minutes.
// compactor for an hour.
compactor.opts.CompactionMinInterval = time.Hour
compactor.SuggestCompaction(context.Background(), storagebase.SuggestedCompaction{
StartKey: key("a"), EndKey: key("b"),
Expand Down Expand Up @@ -567,10 +567,8 @@ func TestCompactorCleansUpOldRecords(t *testing.T) {
return fmt.Errorf("expected skipped bytes %d; got %d", e, a)
}
// Verify compaction queue is empty.
if empty, err := compactor.isSpanEmpty(
context.Background(), keys.LocalStoreSuggestedCompactionsMin, keys.LocalStoreSuggestedCompactionsMax,
); err != nil || !empty {
return fmt.Errorf("compaction queue not empty or err: %t, %v", empty, err)
if bytesQueued, err := compactor.examineQueue(context.Background()); err != nil || bytesQueued > 0 {
return fmt.Errorf("compaction queue not empty (%d bytes) or err %v", bytesQueued, err)
}
return nil
})
Expand Down
13 changes: 12 additions & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,11 +775,22 @@ func (r *Replica) destroyDataRaftMuLocked(

// NB: this uses the local descriptor instead of the consistent one to match
// the data on disk.
if err := clearRangeData(ctx, r.Desc(), ms.KeyCount, r.store.Engine(), batch); err != nil {
desc := r.Desc()
if err := clearRangeData(ctx, desc, ms.KeyCount, r.store.Engine(), batch); err != nil {
return err
}
clearTime := timeutil.Now()

// Suggest the cleared range to the compactor queue.
r.store.compactor.SuggestCompaction(ctx, storagebase.SuggestedCompaction{
StartKey: roachpb.Key(desc.StartKey),
EndKey: roachpb.Key(desc.EndKey),
Compaction: storagebase.Compaction{
Bytes: ms.Total(),
SuggestedAtNanos: clearTime.UnixNano(),
},
})

// Save a tombstone to ensure that replica IDs never get reused.
if err := r.setTombstoneKey(ctx, batch, &consistentDesc); err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,9 @@ func (s *Store) DB() *client.DB { return s.cfg.DB }
// Gossip accessor.
func (s *Store) Gossip() *gossip.Gossip { return s.cfg.Gossip }

// Compactor accessor.
func (s *Store) Compactor() *compactor.Compactor { return s.compactor }

// Stopper accessor.
func (s *Store) Stopper() *stop.Stopper { return s.stopper }

Expand Down