Skip to content

Commit

Permalink
compactor: Gossip store capacity after each compaction run
Browse files Browse the repository at this point in the history
This helps all nodes' allocators have more up-to-date capacity
information sooner after a significant change.

Touches #21400

Release note: None
  • Loading branch information
a-robinson committed Feb 5, 2018
1 parent 88cf20c commit 9ac0083
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 29 deletions.
11 changes: 10 additions & 1 deletion pkg/storage/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,27 @@ func defaultCompactorOptions() compactorOptions {

type storeCapacityFunc func() (roachpb.StoreCapacity, error)

type doneCompactingFunc func(ctx context.Context)

// A Compactor records suggested compactions and periodically
// makes requests to the engine to reclaim storage space.
type Compactor struct {
eng engine.WithSSTables
capFn storeCapacityFunc
doneFn doneCompactingFunc
ch chan struct{}
opts compactorOptions
Metrics Metrics
}

// NewCompactor returns a compactor for the specified storage engine.
func NewCompactor(eng engine.WithSSTables, capFn storeCapacityFunc) *Compactor {
func NewCompactor(
eng engine.WithSSTables, capFn storeCapacityFunc, doneFn doneCompactingFunc,
) *Compactor {
return &Compactor{
eng: eng,
capFn: capFn,
doneFn: doneFn,
ch: make(chan struct{}, 1),
opts: defaultCompactorOptions(),
Metrics: makeMetrics(),
Expand Down Expand Up @@ -325,6 +331,9 @@ func (c *Compactor) processCompaction(
c.Metrics.CompactionSuccesses.Inc(1)
duration := timeutil.Since(startTime)
c.Metrics.CompactingNanos.Inc(int64(duration))
if c.doneFn != nil {
c.doneFn(ctx)
}
log.Infof(ctx, "processed compaction %s in %s", aggr, duration)
} else {
log.VEventf(ctx, 2, "skipping compaction(s) %s", aggr)
Expand Down
27 changes: 20 additions & 7 deletions pkg/storage/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"sort"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -87,13 +88,15 @@ func (we *wrappedEngine) GetCompactions() []roachpb.Span {
return append([]roachpb.Span(nil), we.mu.compactions...)
}

func testSetup(capFn storeCapacityFunc) (*Compactor, *wrappedEngine, func()) {
func testSetup(capFn storeCapacityFunc) (*Compactor, *wrappedEngine, *int32, func()) {
stopper := stop.NewStopper()
eng := newWrappedEngine()
stopper.AddCloser(eng)
compactor := NewCompactor(eng, capFn)
compactionCount := new(int32)
doneFn := func(_ context.Context) { atomic.AddInt32(compactionCount, 1) }
compactor := NewCompactor(eng, capFn, doneFn)
compactor.Start(context.Background(), tracing.NewTracer(), stopper)
return compactor, eng, func() {
return compactor, eng, compactionCount, func() {
stopper.Stop(context.Background())
}
}
Expand Down Expand Up @@ -463,7 +466,7 @@ func TestCompactorThresholds(t *testing.T) {
Available: test.availableBytes,
}, nil
}
compactor, we, cleanup := testSetup(capacityFn)
compactor, we, compactionCount, cleanup := testSetup(capacityFn)
defer cleanup()
// Shorten wait times for compactor processing.
compactor.opts.CompactionMinInterval = time.Millisecond
Expand All @@ -490,6 +493,9 @@ func TestCompactorThresholds(t *testing.T) {
if a, e := compactor.Metrics.CompactionSuccesses.Count(), int64(len(test.expCompactions)); a != e {
return fmt.Errorf("expected compactions %d; got %d", e, a)
}
if a, e := atomic.LoadInt32(compactionCount), int32(len(test.expCompactions)); a != e {
return fmt.Errorf("expected compactions %d; got %d", e, a)
}
if len(test.expCompactions) == 0 {
if cn := compactor.Metrics.CompactingNanos.Count(); cn > 0 {
return fmt.Errorf("expected compaction time to be 0; got %d", cn)
Expand Down Expand Up @@ -535,7 +541,7 @@ func TestCompactorProcessingInitialization(t *testing.T) {
capacityFn := func() (roachpb.StoreCapacity, error) {
return roachpb.StoreCapacity{LogicalBytes: 100 * defaultThresholdBytes}, nil
}
compactor, we, cleanup := testSetup(capacityFn)
compactor, we, compactionCount, cleanup := testSetup(capacityFn)
defer cleanup()

// Add a suggested compaction -- this won't get processed by this
Expand All @@ -552,7 +558,8 @@ func TestCompactorProcessingInitialization(t *testing.T) {
// Create a new fast compactor with a short wait time for processing,
// using the same engine so that it sees a non-empty queue.
stopper := stop.NewStopper()
fastCompactor := NewCompactor(we, capacityFn)
doneFn := func(_ context.Context) { atomic.AddInt32(compactionCount, 1) }
fastCompactor := NewCompactor(we, capacityFn, doneFn)
fastCompactor.opts.CompactionMinInterval = time.Millisecond
fastCompactor.Start(context.Background(), tracing.NewTracer(), stopper)
defer stopper.Stop(context.Background())
Expand All @@ -563,6 +570,9 @@ func TestCompactorProcessingInitialization(t *testing.T) {
if !reflect.DeepEqual(expComps, comps) {
return fmt.Errorf("expected %+v; got %+v", expComps, comps)
}
if a, e := atomic.LoadInt32(compactionCount), int32(1); a != e {
return fmt.Errorf("expected %d; got %d", e, a)
}
return nil
})
}
Expand All @@ -578,7 +588,7 @@ func TestCompactorCleansUpOldRecords(t *testing.T) {
Available: 100 * defaultThresholdBytes,
}, nil
}
compactor, we, cleanup := testSetup(capacityFn)
compactor, we, compactionCount, cleanup := testSetup(capacityFn)
compactor.opts.CompactionMinInterval = time.Millisecond
compactor.opts.MaxSuggestedCompactionRecordAge = 1 * time.Millisecond
defer cleanup()
Expand All @@ -603,6 +613,9 @@ func TestCompactorCleansUpOldRecords(t *testing.T) {
if a, e := compactor.Metrics.BytesSkipped.Count(), compactor.opts.ThresholdBytes-1; a != e {
return fmt.Errorf("expected skipped bytes %d; got %d", e, a)
}
if a, e := atomic.LoadInt32(compactionCount), int32(0); a != e {
return fmt.Errorf("expected compactions processed %d; got %d", e, a)
}
// Verify compaction queue is empty.
if bytesQueued, err := compactor.examineQueue(context.Background()); err != nil || bytesQueued > 0 {
return fmt.Errorf("compaction queue not empty (%d bytes) or err %v", bytesQueued, err)
Expand Down
40 changes: 19 additions & 21 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,11 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
s.tsCache = tscache.New(cfg.Clock, cfg.TimestampCachePageSize, tsCacheMetrics)
s.metrics.registry.AddMetricStruct(tsCacheMetrics)

s.compactor = compactor.NewCompactor(s.engine.(engine.WithSSTables), s.Capacity)
s.compactor = compactor.NewCompactor(
s.engine.(engine.WithSSTables),
s.Capacity,
func(ctx context.Context) { s.asyncGossipStore(ctx, "compactor-initiated rocksdb compaction") },
)
s.metrics.registry.AddMetricStruct(s.compactor.Metrics)

s.snapshotApplySem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit)
Expand Down Expand Up @@ -1427,6 +1431,18 @@ func (s *Store) systemGossipUpdate(cfg config.SystemConfig) {
})
}

func (s *Store) asyncGossipStore(ctx context.Context, reason string) {
if err := s.stopper.RunAsyncTask(
ctx, fmt.Sprintf("storage.Store: gossip on %s", reason),
func(ctx context.Context) {
if err := s.GossipStore(ctx); err != nil {
log.Warningf(ctx, "error gossiping on %s: %s", reason, err)
}
}); err != nil {
log.Warningf(ctx, "unable to gossip on %s: %s", reason, err)
}
}

// GossipStore broadcasts the store on the gossip network.
func (s *Store) GossipStore(ctx context.Context) error {
// This should always return immediately and acts as a sanity check that we
Expand Down Expand Up @@ -1511,16 +1527,7 @@ func (s *Store) maybeGossipOnCapacityChange(ctx context.Context, cce capacityCha
// Reset countdowns to avoid unnecessary gossiping.
atomic.StoreInt32(&s.gossipRangeCountdown, 0)
atomic.StoreInt32(&s.gossipLeaseCountdown, 0)
// Send using an async task because GossipStore needs the store mutex.
if err := s.stopper.RunAsyncTask(
ctx, "storage.Store: gossip on capacity change",
func(ctx context.Context) {
if err := s.GossipStore(ctx); err != nil {
log.Warningf(ctx, "error gossiping on capacity change: %s", err)
}
}); err != nil {
log.Warningf(ctx, "unable to gossip on capacity change: %s", err)
}
s.asyncGossipStore(ctx, "capacity change")
}
}

Expand All @@ -1534,16 +1541,7 @@ func (s *Store) recordNewWritesPerSecond(newVal float64) {
return
}
if newVal < oldVal*.5 || newVal > oldVal*1.5 {
ctx := s.AnnotateCtx(context.TODO())
if err := s.stopper.RunAsyncTask(
ctx, "storage.Store: gossip on writes-per-second change",
func(ctx context.Context) {
if err := s.GossipStore(ctx); err != nil {
log.Warningf(ctx, "error gossiping on writes-per-second change: %s", err)
}
}); err != nil {
log.Warningf(ctx, "unable to gossip on writes-per-second change: %s", err)
}
s.asyncGossipStore(context.TODO(), "writes-per-second change")
}
}

Expand Down

0 comments on commit 9ac0083

Please sign in to comment.