Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: More improvements for rebalancing/compacting when disks are nearly full #22235

Merged
merged 3 commits into from
Feb 5, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions pkg/storage/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ const (
// defaultThresholdBytes threshold.
defaultThresholdBytesFraction = 0.10 // more than 10% of space will trigger

// defaultThresholdBytesAvailableFraction is the fraction of remaining
// available space on a disk, which, if exceeded by the size of a suggested
// compaction, should trigger the processing of said compaction. This
// threshold is meant to make compaction more aggressive when a store is
// nearly full, since reclaiming space is much more important in such
// scenarios.
defaultThresholdBytesAvailableFraction = 0.10

// defaultMaxSuggestedCompactionRecordAge is the maximum age of a
// suggested compaction record. If not processed within this time
// interval since the compaction was suggested, it will be deleted.
Expand All @@ -73,6 +81,7 @@ type compactorOptions struct {
CompactionMinInterval time.Duration
ThresholdBytes int64
ThresholdBytesFraction float64
ThresholdBytesAvailableFraction float64
MaxSuggestedCompactionRecordAge time.Duration
}

Expand All @@ -81,27 +90,34 @@ func defaultCompactorOptions() compactorOptions {
CompactionMinInterval: defaultCompactionMinInterval,
ThresholdBytes: defaultThresholdBytes,
ThresholdBytesFraction: defaultThresholdBytesFraction,
ThresholdBytesAvailableFraction: defaultThresholdBytesAvailableFraction,
MaxSuggestedCompactionRecordAge: defaultMaxSuggestedCompactionRecordAge,
}
}

type storeCapacityFunc func() (roachpb.StoreCapacity, error)

type doneCompactingFunc func(ctx context.Context)

// A Compactor records suggested compactions and periodically
// makes requests to the engine to reclaim storage space.
type Compactor struct {
eng engine.WithSSTables
capFn storeCapacityFunc
doneFn doneCompactingFunc
ch chan struct{}
opts compactorOptions
Metrics Metrics
}

// NewCompactor returns a compactor for the specified storage engine.
func NewCompactor(eng engine.WithSSTables, capFn storeCapacityFunc) *Compactor {
func NewCompactor(
eng engine.WithSSTables, capFn storeCapacityFunc, doneFn doneCompactingFunc,
) *Compactor {
return &Compactor{
eng: eng,
capFn: capFn,
doneFn: doneFn,
ch: make(chan struct{}, 1),
opts: defaultCompactorOptions(),
Metrics: makeMetrics(),
Expand Down Expand Up @@ -301,7 +317,8 @@ func (c *Compactor) processCompaction(
delBatch engine.Batch,
) (int64, error) {
shouldProcess := aggr.Bytes >= c.opts.ThresholdBytes ||
aggr.Bytes >= int64(float64(capacity.LogicalBytes)*c.opts.ThresholdBytesFraction)
aggr.Bytes >= int64(float64(capacity.LogicalBytes)*c.opts.ThresholdBytesFraction) ||
aggr.Bytes >= int64(float64(capacity.Available)*c.opts.ThresholdBytesAvailableFraction)

if shouldProcess {
startTime := timeutil.Now()
Expand All @@ -314,6 +331,9 @@ func (c *Compactor) processCompaction(
c.Metrics.CompactionSuccesses.Inc(1)
duration := timeutil.Since(startTime)
c.Metrics.CompactingNanos.Inc(int64(duration))
if c.doneFn != nil {
c.doneFn(ctx)
}
log.Infof(ctx, "processed compaction %s in %s", aggr, duration)
} else {
log.VEventf(ctx, 2, "skipping compaction(s) %s", aggr)
Expand Down
66 changes: 58 additions & 8 deletions pkg/storage/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"sort"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -87,13 +88,15 @@ func (we *wrappedEngine) GetCompactions() []roachpb.Span {
return append([]roachpb.Span(nil), we.mu.compactions...)
}

func testSetup(capFn storeCapacityFunc) (*Compactor, *wrappedEngine, func()) {
func testSetup(capFn storeCapacityFunc) (*Compactor, *wrappedEngine, *int32, func()) {
stopper := stop.NewStopper()
eng := newWrappedEngine()
stopper.AddCloser(eng)
compactor := NewCompactor(eng, capFn)
compactionCount := new(int32)
doneFn := func(_ context.Context) { atomic.AddInt32(compactionCount, 1) }
compactor := NewCompactor(eng, capFn, doneFn)
compactor.Start(context.Background(), tracing.NewTracer(), stopper)
return compactor, eng, func() {
return compactor, eng, compactionCount, func() {
stopper.Stop(context.Background())
}
}
Expand All @@ -107,11 +110,13 @@ func TestCompactorThresholds(t *testing.T) {
defer leaktest.AfterTest(t)()

fractionThresh := defaultThresholdBytesFraction*defaultThresholdBytes + 1
fractionAvailableThresh := defaultThresholdBytesAvailableFraction*defaultThresholdBytes + 1
nowNanos := timeutil.Now().UnixNano()
testCases := []struct {
name string
suggestions []storagebase.SuggestedCompaction
logicalBytes int64 // logical byte count to return with store capacity
availableBytes int64 // available byte count to return with store capacity
expBytesCompacted int64
expCompactions []roachpb.Span
expUncompacted []roachpb.Span
Expand All @@ -129,6 +134,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: 0,
expCompactions: nil,
expUncompacted: []roachpb.Span{
Expand All @@ -148,6 +154,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("b")},
Expand All @@ -166,11 +173,31 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes,
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: int64(fractionThresh),
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("b")},
},
},
// Single suggestion over the fractional bytes available threshold.
{
name: "single suggestion over fractional bytes available threshold",
suggestions: []storagebase.SuggestedCompaction{
{
StartKey: key("a"), EndKey: key("b"),
Compaction: storagebase.Compaction{
Bytes: int64(fractionAvailableThresh),
SuggestedAtNanos: nowNanos,
},
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes,
expBytesCompacted: int64(fractionAvailableThresh),
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("b")},
},
},
// Double suggestion which in aggregate exceed absolute bytes threshold.
{
name: "double suggestion over absolute threshold",
Expand All @@ -191,6 +218,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("c")},
Expand All @@ -216,6 +244,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("b")},
Expand All @@ -241,6 +270,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("d")},
Expand All @@ -266,6 +296,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes,
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: int64(fractionThresh),
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("c")},
Expand All @@ -292,6 +323,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("f")},
Expand All @@ -318,6 +350,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: 0,
expCompactions: nil,
expUncompacted: []roachpb.Span{
Expand Down Expand Up @@ -346,6 +379,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes * 2,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("b")},
Expand Down Expand Up @@ -373,6 +407,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("b")},
Expand Down Expand Up @@ -415,6 +450,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("zzz")},
Expand All @@ -427,9 +463,10 @@ func TestCompactorThresholds(t *testing.T) {
capacityFn := func() (roachpb.StoreCapacity, error) {
return roachpb.StoreCapacity{
LogicalBytes: test.logicalBytes,
Available: test.availableBytes,
}, nil
}
compactor, we, cleanup := testSetup(capacityFn)
compactor, we, compactionCount, cleanup := testSetup(capacityFn)
defer cleanup()
// Shorten wait times for compactor processing.
compactor.opts.CompactionMinInterval = time.Millisecond
Expand All @@ -456,6 +493,9 @@ func TestCompactorThresholds(t *testing.T) {
if a, e := compactor.Metrics.CompactionSuccesses.Count(), int64(len(test.expCompactions)); a != e {
return fmt.Errorf("expected compactions %d; got %d", e, a)
}
if a, e := atomic.LoadInt32(compactionCount), int32(len(test.expCompactions)); a != e {
return fmt.Errorf("expected compactions %d; got %d", e, a)
}
if len(test.expCompactions) == 0 {
if cn := compactor.Metrics.CompactingNanos.Count(); cn > 0 {
return fmt.Errorf("expected compaction time to be 0; got %d", cn)
Expand Down Expand Up @@ -501,7 +541,7 @@ func TestCompactorProcessingInitialization(t *testing.T) {
capacityFn := func() (roachpb.StoreCapacity, error) {
return roachpb.StoreCapacity{LogicalBytes: 100 * defaultThresholdBytes}, nil
}
compactor, we, cleanup := testSetup(capacityFn)
compactor, we, compactionCount, cleanup := testSetup(capacityFn)
defer cleanup()

// Add a suggested compaction -- this won't get processed by this
Expand All @@ -518,7 +558,8 @@ func TestCompactorProcessingInitialization(t *testing.T) {
// Create a new fast compactor with a short wait time for processing,
// using the same engine so that it sees a non-empty queue.
stopper := stop.NewStopper()
fastCompactor := NewCompactor(we, capacityFn)
doneFn := func(_ context.Context) { atomic.AddInt32(compactionCount, 1) }
fastCompactor := NewCompactor(we, capacityFn, doneFn)
fastCompactor.opts.CompactionMinInterval = time.Millisecond
fastCompactor.Start(context.Background(), tracing.NewTracer(), stopper)
defer stopper.Stop(context.Background())
Expand All @@ -529,6 +570,9 @@ func TestCompactorProcessingInitialization(t *testing.T) {
if !reflect.DeepEqual(expComps, comps) {
return fmt.Errorf("expected %+v; got %+v", expComps, comps)
}
if a, e := atomic.LoadInt32(compactionCount), int32(1); a != e {
return fmt.Errorf("expected %d; got %d", e, a)
}
return nil
})
}
Expand All @@ -539,9 +583,12 @@ func TestCompactorCleansUpOldRecords(t *testing.T) {
defer leaktest.AfterTest(t)()

capacityFn := func() (roachpb.StoreCapacity, error) {
return roachpb.StoreCapacity{LogicalBytes: 100 * defaultThresholdBytes}, nil
return roachpb.StoreCapacity{
LogicalBytes: 100 * defaultThresholdBytes,
Available: 100 * defaultThresholdBytes,
}, nil
}
compactor, we, cleanup := testSetup(capacityFn)
compactor, we, compactionCount, cleanup := testSetup(capacityFn)
compactor.opts.CompactionMinInterval = time.Millisecond
compactor.opts.MaxSuggestedCompactionRecordAge = 1 * time.Millisecond
defer cleanup()
Expand All @@ -566,6 +613,9 @@ func TestCompactorCleansUpOldRecords(t *testing.T) {
if a, e := compactor.Metrics.BytesSkipped.Count(), compactor.opts.ThresholdBytes-1; a != e {
return fmt.Errorf("expected skipped bytes %d; got %d", e, a)
}
if a, e := atomic.LoadInt32(compactionCount), int32(0); a != e {
return fmt.Errorf("expected compactions processed %d; got %d", e, a)
}
// Verify compaction queue is empty.
if bytesQueued, err := compactor.examineQueue(context.Background()); err != nil || bytesQueued > 0 {
return fmt.Errorf("compaction queue not empty (%d bytes) or err %v", bytesQueued, err)
Expand Down
42 changes: 20 additions & 22 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,11 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
s.tsCache = tscache.New(cfg.Clock, cfg.TimestampCachePageSize, tsCacheMetrics)
s.metrics.registry.AddMetricStruct(tsCacheMetrics)

s.compactor = compactor.NewCompactor(s.engine.(engine.WithSSTables), s.Capacity)
s.compactor = compactor.NewCompactor(
s.engine.(engine.WithSSTables),
s.Capacity,
func(ctx context.Context) { s.asyncGossipStore(ctx, "compactor-initiated rocksdb compaction") },
)
s.metrics.registry.AddMetricStruct(s.compactor.Metrics)

s.snapshotApplySem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit)
Expand Down Expand Up @@ -1427,6 +1431,18 @@ func (s *Store) systemGossipUpdate(cfg config.SystemConfig) {
})
}

func (s *Store) asyncGossipStore(ctx context.Context, reason string) {
if err := s.stopper.RunAsyncTask(
ctx, fmt.Sprintf("storage.Store: gossip on %s", reason),
func(ctx context.Context) {
if err := s.GossipStore(ctx); err != nil {
log.Warningf(ctx, "error gossiping on %s: %s", reason, err)
}
}); err != nil {
log.Warningf(ctx, "unable to gossip on %s: %s", reason, err)
}
}

// GossipStore broadcasts the store on the gossip network.
func (s *Store) GossipStore(ctx context.Context) error {
// This should always return immediately and acts as a sanity check that we
Expand Down Expand Up @@ -1511,16 +1527,7 @@ func (s *Store) maybeGossipOnCapacityChange(ctx context.Context, cce capacityCha
// Reset countdowns to avoid unnecessary gossiping.
atomic.StoreInt32(&s.gossipRangeCountdown, 0)
atomic.StoreInt32(&s.gossipLeaseCountdown, 0)
// Send using an async task because GossipStore needs the store mutex.
if err := s.stopper.RunAsyncTask(
ctx, "storage.Store: gossip on capacity change",
func(ctx context.Context) {
if err := s.GossipStore(ctx); err != nil {
log.Warningf(ctx, "error gossiping on capacity change: %s", err)
}
}); err != nil {
log.Warningf(ctx, "unable to gossip on capacity change: %s", err)
}
s.asyncGossipStore(ctx, "capacity change")
}
}

Expand All @@ -1534,16 +1541,7 @@ func (s *Store) recordNewWritesPerSecond(newVal float64) {
return
}
if newVal < oldVal*.5 || newVal > oldVal*1.5 {
ctx := s.AnnotateCtx(context.TODO())
if err := s.stopper.RunAsyncTask(
ctx, "storage.Store: gossip on writes-per-second change",
func(ctx context.Context) {
if err := s.GossipStore(ctx); err != nil {
log.Warningf(ctx, "error gossiping on writes-per-second change: %s", err)
}
}); err != nil {
log.Warningf(ctx, "unable to gossip on writes-per-second change: %s", err)
}