Skip to content

Commit

Permalink
storage: suggest compaction when replicas are deleted
Browse files Browse the repository at this point in the history
In aggregate, if enough contiguous replicas are migrated away from a
range, we would like to compact the underlying RocksDB storage engine.
To that end, on clear a replica's data, we suggest a compaction.

Release note: None
  • Loading branch information
spencerkimball committed Jan 3, 2018
1 parent 09234bf commit aa1a097
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 25 deletions.
58 changes: 58 additions & 0 deletions pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3858,3 +3858,61 @@ func TestFailedConfChange(t *testing.T) {
t.Fatal(err)
}
}

// TestStoreRangeRemovalCompactionSuggestion verifies that if a replica
// is removed from a store, a compaction suggestion is made to the
// compactor queue.
func TestStoreRangeRemovalCompactionSuggestion(t *testing.T) {
defer leaktest.AfterTest(t)()
sc := storage.TestStoreConfig(nil)
mtc := &multiTestContext{storeConfig: &sc}
defer mtc.Stop()
mtc.Start(t, 3)

const rangeID = roachpb.RangeID(1)
mtc.replicateRange(rangeID, 1, 2)

repl, err := mtc.stores[0].GetReplica(rangeID)
if err != nil {
t.Fatal(err)
}
ctx := repl.AnnotateCtx(context.Background())

deleteStore := mtc.stores[2]
if err := repl.ChangeReplicas(
ctx,
roachpb.REMOVE_REPLICA,
roachpb.ReplicationTarget{
NodeID: deleteStore.Ident.NodeID,
StoreID: deleteStore.Ident.StoreID,
},
repl.Desc(),
storage.ReasonRebalance,
"",
); err != nil {
t.Fatal(err)
}

testutils.SucceedsSoon(t, func() error {
// Function to check compaction metrics indicating a suggestion
// was queued or a compaction was processed or skipped.
haveCompaction := func(s *storage.Store, exp bool) error {
queued := s.Compactor().Metrics.BytesQueued.Value()
comps := s.Compactor().Metrics.BytesCompacted.Count()
skipped := s.Compactor().Metrics.BytesSkipped.Count()
if exp != (queued > 0 || comps > 0 || skipped > 0) {
return errors.Errorf("%s: expected non-zero compaction metrics? %t; got queued=%d, compactions=%d, skipped=%d",
s, exp, queued, comps, skipped)
}
return nil
}
// Verify that no compaction metrics are showing non-zero bytes in the
// other stores.
for _, s := range mtc.stores {
if err := haveCompaction(s, s == deleteStore); err != nil {
return err
}
}
return nil
})
}
48 changes: 28 additions & 20 deletions pkg/storage/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,21 +112,25 @@ func NewCompactor(eng engine.WithSSTables, capFn storeCapacityFunc) *Compactor {
// provided stopper indicates. Processing is done with a periodicity of
// compactionMinInterval, but only if there are compactions pending.
func (c *Compactor) Start(ctx context.Context, tracer opentracing.Tracer, stopper *stop.Stopper) {
if empty, err := c.isSpanEmpty(
ctx, keys.LocalStoreSuggestedCompactionsMin, keys.LocalStoreSuggestedCompactionsMax,
); err != nil {
log.Warningf(ctx, "failed check whether compaction suggestions exist: %s", err)
} else if !empty {
log.Eventf(ctx, "compactor starting in %s as there are suggested compactions pending", c.opts.CompactionMinInterval)
c.ch <- struct{}{} // wake up the goroutine immediately
}
// Wake up the goroutine immediately.
c.ch <- struct{}{}

stopper.RunWorker(ctx, func(ctx context.Context) {
var timer timeutil.Timer
var timerSet bool
for {
select {
case <-c.ch:
// A new suggestion was made. Examine the compaction queue,
// which updates the bytes queued stat in a timely fashion.
if bytesQueued, err := c.examineQueue(ctx); err != nil {
log.Warningf(ctx, "failed check whether compaction suggestions exist: %s", err)
} else if bytesQueued > 0 {
log.Eventf(ctx, "compactor starting in %s as there are suggested compactions pending", c.opts.CompactionMinInterval)
} else {
// Queue is empty, don't set the timer. This can happen only at startup.
break
}
// Set the wait timer if not already set.
if !timerSet {
timer.Reset(c.opts.CompactionMinInterval)
Expand Down Expand Up @@ -369,22 +373,26 @@ func (c *Compactor) aggregateCompaction(
return false // aggregated successfully
}

// isSpanEmpty returns whether the specified key span is empty (true)
// or contains keys (false).
func (c *Compactor) isSpanEmpty(ctx context.Context, start, end roachpb.Key) (bool, error) {
// If there are any suggested compactions, start the compaction timer.
var empty = true
// examineQueue returns the total number of bytes queued and updates the
// BytesQueued gauge.
func (c *Compactor) examineQueue(ctx context.Context) (int64, error) {
var totalBytes int64
if err := c.eng.Iterate(
engine.MVCCKey{Key: start},
engine.MVCCKey{Key: end},
func(_ engine.MVCCKeyValue) (bool, error) {
empty = false
return true, nil // don't continue iteration
engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMin},
engine.MVCCKey{Key: keys.LocalStoreSuggestedCompactionsMax},
func(kv engine.MVCCKeyValue) (bool, error) {
var c storagebase.Compaction
if err := protoutil.Unmarshal(kv.Value, &c); err != nil {
return false, err
}
totalBytes += c.Bytes
return false, nil // continue iteration
},
); err != nil {
return false, err
return 0, err
}
return empty, nil
c.Metrics.BytesQueued.Update(totalBytes)
return totalBytes, nil
}

// SuggestCompaction writes the specified compaction to persistent
Expand Down
6 changes: 2 additions & 4 deletions pkg/storage/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,10 +567,8 @@ func TestCompactorCleansUpOldRecords(t *testing.T) {
return fmt.Errorf("expected skipped bytes %d; got %d", e, a)
}
// Verify compaction queue is empty.
if empty, err := compactor.isSpanEmpty(
context.Background(), keys.LocalStoreSuggestedCompactionsMin, keys.LocalStoreSuggestedCompactionsMax,
); err != nil || !empty {
return fmt.Errorf("compaction queue not empty or err: %t, %v", empty, err)
if bytesQueued, err := compactor.examineQueue(context.Background()); err != nil || bytesQueued > 0 {
return fmt.Errorf("compaction queue not empty (%d bytes) or err %v", bytesQueued, err)
}
return nil
})
Expand Down
13 changes: 12 additions & 1 deletion pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -775,11 +775,22 @@ func (r *Replica) destroyDataRaftMuLocked(

// NB: this uses the local descriptor instead of the consistent one to match
// the data on disk.
if err := clearRangeData(ctx, r.Desc(), ms.KeyCount, r.store.Engine(), batch); err != nil {
desc := r.Desc()
if err := clearRangeData(ctx, desc, ms.KeyCount, r.store.Engine(), batch); err != nil {
return err
}
clearTime := timeutil.Now()

// Suggest the cleared range to the compactor queue.
r.store.compactor.SuggestCompaction(ctx, storagebase.SuggestedCompaction{
StartKey: roachpb.Key(desc.StartKey),
EndKey: roachpb.Key(desc.EndKey),
Compaction: storagebase.Compaction{
Bytes: ms.Total(),
SuggestedAtNanos: clearTime.UnixNano(),
},
})

// Save a tombstone to ensure that replica IDs never get reused.
if err := r.setTombstoneKey(ctx, batch, &consistentDesc); err != nil {
return err
Expand Down
3 changes: 3 additions & 0 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -1771,6 +1771,9 @@ func (s *Store) DB() *client.DB { return s.cfg.DB }
// Gossip accessor.
func (s *Store) Gossip() *gossip.Gossip { return s.cfg.Gossip }

// Compactor accessor.
func (s *Store) Compactor() *compactor.Compactor { return s.compactor }

// Stopper accessor.
func (s *Store) Stopper() *stop.Stopper { return s.stopper }

Expand Down

0 comments on commit aa1a097

Please sign in to comment.