diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 14c8b4f52943..de2c6002e036 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -3858,3 +3858,61 @@ func TestFailedConfChange(t *testing.T) { t.Fatal(err) } } + +// TestStoreRangeRemovalCompactionSuggestion verifies that if a replica +// is removed from a store, a compaction suggestion is made to the +// compactor queue. +func TestStoreRangeRemovalCompactionSuggestion(t *testing.T) { + defer leaktest.AfterTest(t)() + sc := storage.TestStoreConfig(nil) + mtc := &multiTestContext{storeConfig: &sc} + defer mtc.Stop() + mtc.Start(t, 3) + + const rangeID = roachpb.RangeID(1) + mtc.replicateRange(rangeID, 1, 2) + + repl, err := mtc.stores[0].GetReplica(rangeID) + if err != nil { + t.Fatal(err) + } + ctx := repl.AnnotateCtx(context.Background()) + + deleteStore := mtc.stores[2] + if err := repl.ChangeReplicas( + ctx, + roachpb.REMOVE_REPLICA, + roachpb.ReplicationTarget{ + NodeID: deleteStore.Ident.NodeID, + StoreID: deleteStore.Ident.StoreID, + }, + repl.Desc(), + storage.ReasonRebalance, + "", + ); err != nil { + t.Fatal(err) + } + + testutils.SucceedsSoon(t, func() error { + // Function to check compaction metrics indicating a suggestion + // was queued or a compaction was processed or skipped. + haveCompaction := func(s *storage.Store, exp bool) error { + queued := s.Compactor().Metrics.BytesQueued.Value() + comps := s.Compactor().Metrics.BytesCompacted.Count() + skipped := s.Compactor().Metrics.BytesSkipped.Count() + if exp != (queued > 0 || comps > 0 || skipped > 0) { + return errors.Errorf("%s: expected non-zero compaction metrics? %t; got queued=%d, compactions=%d, skipped=%d", + s, exp, queued, comps, skipped) + } + return nil + } + // Verify that no compaction metrics are showing non-zero bytes in the + // other stores. + for _, s := range mtc.stores { + if err := haveCompaction(s, s == deleteStore); err != nil { + return err + } + } + return nil + }) +} diff --git a/pkg/storage/compactor/compactor.go b/pkg/storage/compactor/compactor.go index a62123e6e770..5f63d6beabc8 100644 --- a/pkg/storage/compactor/compactor.go +++ b/pkg/storage/compactor/compactor.go @@ -112,21 +112,27 @@ func NewCompactor(eng engine.WithSSTables, capFn storeCapacityFunc) *Compactor { // provided stopper indicates. Processing is done with a periodicity of // compactionMinInterval, but only if there are compactions pending. func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stopper *stop.Stopper) { - if empty, err := c.isSpanEmpty( - ctx, keys.LocalStoreSuggestedCompactionsMin, keys.LocalStoreSuggestedCompactionsMax, - ); err != nil { - log.Warningf(ctx, "failed check whether compaction suggestions exist: %s", err) - } else if !empty { - log.Eventf(ctx, "compactor starting in %s as there are suggested compactions pending", c.opts.CompactionMinInterval) - c.ch <- struct{}{} // wake up the goroutine immediately - } + // Wake up immediately to examine the queue and set the bytes queued metric. + c.ch <- struct{}{} stopper.RunWorker(ctx, func(ctx context.Context) { var timer timeutil.Timer + defer timer.Stop() var timerSet bool + for { select { case <-c.ch: + // A new suggestion was made. Examine the compaction queue, + // which returns the number of bytes queued. + if bytesQueued, err := c.examineQueue(ctx); err != nil { + log.Warningf(ctx, "failed check whether compaction suggestions exist: %s", err) + } else if bytesQueued > 0 { + log.Eventf(ctx, "compactor starting in %s as there are suggested compactions pending", c.opts.CompactionMinInterval) + } else { + // Queue is empty, don't set the timer. This can happen only at startup. + break + } // Set the wait timer if not already set. if !timerSet { timer.Reset(c.opts.CompactionMinInterval) @@ -135,7 +141,6 @@ func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stoppe case <-timer.C: timer.Read = true - timer.Stop() spanCtx, cleanup := tracing.EnsureContext(ctx, tracer, "process suggested compactions") ok, err := c.processSuggestions(spanCtx) if err != nil { @@ -143,8 +148,8 @@ func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stoppe } cleanup() if ok { - // Everything has been processed. Wait for the next - // suggested compaction before resetting the timer. + // The queue was processed. Wait for the next suggested + // compaction before resetting timer. timerSet = false break } @@ -185,7 +190,7 @@ func (aggr aggregatedCompaction) String() string { if len(aggr.suggestions) == 1 { seqFmt = fmt.Sprintf("#%d/%d", aggr.startIdx+1, aggr.total) } else { - seqFmt = fmt.Sprintf("#%d-%d/%d", aggr.startIdx+1, aggr.startIdx+len(aggr.suggestions)+1, aggr.total) + seqFmt = fmt.Sprintf("#%d-%d/%d", aggr.startIdx+1, aggr.startIdx+len(aggr.suggestions), aggr.total) } return fmt.Sprintf("%s (%s-%s) for %s", seqFmt, aggr.StartKey, aggr.EndKey, humanizeutil.IBytes(aggr.Bytes)) } @@ -195,7 +200,7 @@ func (aggr aggregatedCompaction) String() string { // exceed the absolute or fractional size thresholds. If suggested // compactions don't meet thresholds, they're discarded if they're // older than maxSuggestedCompactionRecordAge. Returns a boolean -// indicating whether the processing occurred. +// indicating whether the queue was successfully processed. func (c *Compactor) processSuggestions(ctx context.Context) (bool, error) { // Collect all suggestions. var suggestions []storagebase.SuggestedCompaction @@ -332,7 +337,11 @@ func (c *Compactor) processCompaction( log.Fatal(ctx, err) // should never happen on a batch } } - return aggr.Bytes, nil + + if shouldProcess { + return aggr.Bytes, nil + } + return 0, nil } // aggregateCompaction merges sc into aggr, to create a new suggested @@ -369,22 +378,26 @@ func (c *Compactor) aggregateCompaction( return false // aggregated successfully } -// isSpanEmpty returns whether the specified key span is empty (true) -// or contains keys (false). -func (c *Compactor) isSpanEmpty(ctx context.Context, start, end roachpb.Key) (bool, error) { - // If there are any suggested compactions, start the compaction timer. - var empty = true +// examineQueue returns the total number of bytes queued and updates the +// BytesQueued gauge. +func (c *Compactor) examineQueue(ctx context.Context) (int64, error) { + var totalBytes int64 if err := c.eng.Iterate( - engine.MVCCKey{Key: start}, - engine.MVCCKey{Key: end}, - func(_ engine.MVCCKeyValue) (bool, error) { - empty = false - return true, nil // don't continue iteration + engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMin}, + engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMax}, + func(kv engine.MVCCKeyValue) (bool, error) { + var c storagebase.Compaction + if err := protoutil.Unmarshal(kv.Value, &c); err != nil { + return false, err + } + totalBytes += c.Bytes + return false, nil // continue iteration }, ); err != nil { - return false, err + return 0, err } - return empty, nil + c.Metrics.BytesQueued.Update(totalBytes) + return totalBytes, nil } // SuggestCompaction writes the specified compaction to persistent diff --git a/pkg/storage/compactor/compactor_test.go b/pkg/storage/compactor/compactor_test.go index f4430e18ee0c..09d4a7cd2930 100644 --- a/pkg/storage/compactor/compactor_test.go +++ b/pkg/storage/compactor/compactor_test.go @@ -505,7 +505,7 @@ func TestCompactorProcessingInitialization(t *testing.T) { defer cleanup() // Add a suggested compaction -- this won't get processed by this - // compactor for two minutes. + // compactor for an hour. compactor.opts.CompactionMinInterval = time.Hour compactor.SuggestCompaction(context.Background(), storagebase.SuggestedCompaction{ StartKey: key("a"), EndKey: key("b"), @@ -567,10 +567,8 @@ func TestCompactorCleansUpOldRecords(t *testing.T) { return fmt.Errorf("expected skipped bytes %d; got %d", e, a) } // Verify compaction queue is empty. - if empty, err := compactor.isSpanEmpty( - context.Background(), keys.LocalStoreSuggestedCompactionsMin, keys.LocalStoreSuggestedCompactionsMax, - ); err != nil || !empty { - return fmt.Errorf("compaction queue not empty or err: %t, %v", empty, err) + if bytesQueued, err := compactor.examineQueue(context.Background()); err != nil || bytesQueued > 0 { + return fmt.Errorf("compaction queue not empty (%d bytes) or err %v", bytesQueued, err) } return nil }) diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index 0ee414527321..6cafe0c588d4 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -775,11 +775,22 @@ func (r *Replica) destroyDataRaftMuLocked( // NB: this uses the local descriptor instead of the consistent one to match // the data on disk. - if err := clearRangeData(ctx, r.Desc(), ms.KeyCount, r.store.Engine(), batch); err != nil { + desc := r.Desc() + if err := clearRangeData(ctx, desc, ms.KeyCount, r.store.Engine(), batch); err != nil { return err } clearTime := timeutil.Now() + // Suggest the cleared range to the compactor queue. + r.store.compactor.SuggestCompaction(ctx, storagebase.SuggestedCompaction{ + StartKey: roachpb.Key(desc.StartKey), + EndKey: roachpb.Key(desc.EndKey), + Compaction: storagebase.Compaction{ + Bytes: ms.Total(), + SuggestedAtNanos: clearTime.UnixNano(), + }, + }) + // Save a tombstone to ensure that replica IDs never get reused. if err := r.setTombstoneKey(ctx, batch, &consistentDesc); err != nil { return err diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 8a61847eee9f..db5886bc3e22 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -1771,6 +1771,9 @@ func (s *Store) DB() *client.DB { return s.cfg.DB } // Gossip accessor. func (s *Store) Gossip() *gossip.Gossip { return s.cfg.Gossip } +// Compactor accessor. +func (s *Store) Compactor() *compactor.Compactor { return s.compactor } + // Stopper accessor. func (s *Store) Stopper() *stop.Stopper { return s.stopper }