Skip to content

Commit

Permalink
Merge pull request #22235 from a-robinson/fulldisk2
Browse files Browse the repository at this point in the history
storage: More improvements for rebalancing/compacting when disks are nearly full
  • Loading branch information
a-robinson authored Feb 5, 2018
2 parents 77b8d13 + 73d9a03 commit 3581b56
Show file tree
Hide file tree
Showing 4 changed files with 127 additions and 33 deletions.
24 changes: 22 additions & 2 deletions pkg/storage/compactor/compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ const (
// defaultThresholdBytes threshold.
defaultThresholdBytesFraction = 0.10 // more than 10% of space will trigger

// defaultThresholdBytesAvailableFraction is the fraction of remaining
// available space on a disk, which, if exceeded by the size of a suggested
// compaction, should trigger the processing of said compaction. This
// threshold is meant to make compaction more aggressive when a store is
// nearly full, since reclaiming space is much more important in such
// scenarios.
defaultThresholdBytesAvailableFraction = 0.10

// defaultMaxSuggestedCompactionRecordAge is the maximum age of a
// suggested compaction record. If not processed within this time
// interval since the compaction was suggested, it will be deleted.
Expand All @@ -73,6 +81,7 @@ type compactorOptions struct {
CompactionMinInterval time.Duration
ThresholdBytes int64
ThresholdBytesFraction float64
ThresholdBytesAvailableFraction float64
MaxSuggestedCompactionRecordAge time.Duration
}

Expand All @@ -81,27 +90,34 @@ func defaultCompactorOptions() compactorOptions {
CompactionMinInterval: defaultCompactionMinInterval,
ThresholdBytes: defaultThresholdBytes,
ThresholdBytesFraction: defaultThresholdBytesFraction,
ThresholdBytesAvailableFraction: defaultThresholdBytesAvailableFraction,
MaxSuggestedCompactionRecordAge: defaultMaxSuggestedCompactionRecordAge,
}
}

type storeCapacityFunc func() (roachpb.StoreCapacity, error)

type doneCompactingFunc func(ctx context.Context)

// A Compactor records suggested compactions and periodically
// makes requests to the engine to reclaim storage space.
type Compactor struct {
eng engine.WithSSTables
capFn storeCapacityFunc
doneFn doneCompactingFunc
ch chan struct{}
opts compactorOptions
Metrics Metrics
}

// NewCompactor returns a compactor for the specified storage engine.
func NewCompactor(eng engine.WithSSTables, capFn storeCapacityFunc) *Compactor {
func NewCompactor(
eng engine.WithSSTables, capFn storeCapacityFunc, doneFn doneCompactingFunc,
) *Compactor {
return &Compactor{
eng: eng,
capFn: capFn,
doneFn: doneFn,
ch: make(chan struct{}, 1),
opts: defaultCompactorOptions(),
Metrics: makeMetrics(),
Expand Down Expand Up @@ -301,7 +317,8 @@ func (c *Compactor) processCompaction(
delBatch engine.Batch,
) (int64, error) {
shouldProcess := aggr.Bytes >= c.opts.ThresholdBytes ||
aggr.Bytes >= int64(float64(capacity.LogicalBytes)*c.opts.ThresholdBytesFraction)
aggr.Bytes >= int64(float64(capacity.LogicalBytes)*c.opts.ThresholdBytesFraction) ||
aggr.Bytes >= int64(float64(capacity.Available)*c.opts.ThresholdBytesAvailableFraction)

if shouldProcess {
startTime := timeutil.Now()
Expand All @@ -314,6 +331,9 @@ func (c *Compactor) processCompaction(
c.Metrics.CompactionSuccesses.Inc(1)
duration := timeutil.Since(startTime)
c.Metrics.CompactingNanos.Inc(int64(duration))
if c.doneFn != nil {
c.doneFn(ctx)
}
log.Infof(ctx, "processed compaction %s in %s", aggr, duration)
} else {
log.VEventf(ctx, 2, "skipping compaction(s) %s", aggr)
Expand Down
66 changes: 58 additions & 8 deletions pkg/storage/compactor/compactor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"fmt"
"reflect"
"sort"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -87,13 +88,15 @@ func (we *wrappedEngine) GetCompactions() []roachpb.Span {
return append([]roachpb.Span(nil), we.mu.compactions...)
}

func testSetup(capFn storeCapacityFunc) (*Compactor, *wrappedEngine, func()) {
func testSetup(capFn storeCapacityFunc) (*Compactor, *wrappedEngine, *int32, func()) {
stopper := stop.NewStopper()
eng := newWrappedEngine()
stopper.AddCloser(eng)
compactor := NewCompactor(eng, capFn)
compactionCount := new(int32)
doneFn := func(_ context.Context) { atomic.AddInt32(compactionCount, 1) }
compactor := NewCompactor(eng, capFn, doneFn)
compactor.Start(context.Background(), tracing.NewTracer(), stopper)
return compactor, eng, func() {
return compactor, eng, compactionCount, func() {
stopper.Stop(context.Background())
}
}
Expand All @@ -107,11 +110,13 @@ func TestCompactorThresholds(t *testing.T) {
defer leaktest.AfterTest(t)()

fractionThresh := defaultThresholdBytesFraction*defaultThresholdBytes + 1
fractionAvailableThresh := defaultThresholdBytesAvailableFraction*defaultThresholdBytes + 1
nowNanos := timeutil.Now().UnixNano()
testCases := []struct {
name string
suggestions []storagebase.SuggestedCompaction
logicalBytes int64 // logical byte count to return with store capacity
availableBytes int64 // available byte count to return with store capacity
expBytesCompacted int64
expCompactions []roachpb.Span
expUncompacted []roachpb.Span
Expand All @@ -129,6 +134,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: 0,
expCompactions: nil,
expUncompacted: []roachpb.Span{
Expand All @@ -148,6 +154,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("b")},
Expand All @@ -166,11 +173,31 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes,
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: int64(fractionThresh),
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("b")},
},
},
// Single suggestion over the fractional bytes available threshold.
{
name: "single suggestion over fractional bytes available threshold",
suggestions: []storagebase.SuggestedCompaction{
{
StartKey: key("a"), EndKey: key("b"),
Compaction: storagebase.Compaction{
Bytes: int64(fractionAvailableThresh),
SuggestedAtNanos: nowNanos,
},
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes,
expBytesCompacted: int64(fractionAvailableThresh),
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("b")},
},
},
// Double suggestion which in aggregate exceed absolute bytes threshold.
{
name: "double suggestion over absolute threshold",
Expand All @@ -191,6 +218,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("c")},
Expand All @@ -216,6 +244,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("b")},
Expand All @@ -241,6 +270,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("d")},
Expand All @@ -266,6 +296,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes,
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: int64(fractionThresh),
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("c")},
Expand All @@ -292,6 +323,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("f")},
Expand All @@ -318,6 +350,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: 0,
expCompactions: nil,
expUncompacted: []roachpb.Span{
Expand Down Expand Up @@ -346,6 +379,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes * 2,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("b")},
Expand Down Expand Up @@ -373,6 +407,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("b")},
Expand Down Expand Up @@ -415,6 +450,7 @@ func TestCompactorThresholds(t *testing.T) {
},
},
logicalBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
availableBytes: defaultThresholdBytes * 100, // not going to trigger fractional threshold
expBytesCompacted: defaultThresholdBytes,
expCompactions: []roachpb.Span{
{Key: key("a"), EndKey: key("zzz")},
Expand All @@ -427,9 +463,10 @@ func TestCompactorThresholds(t *testing.T) {
capacityFn := func() (roachpb.StoreCapacity, error) {
return roachpb.StoreCapacity{
LogicalBytes: test.logicalBytes,
Available: test.availableBytes,
}, nil
}
compactor, we, cleanup := testSetup(capacityFn)
compactor, we, compactionCount, cleanup := testSetup(capacityFn)
defer cleanup()
// Shorten wait times for compactor processing.
compactor.opts.CompactionMinInterval = time.Millisecond
Expand All @@ -456,6 +493,9 @@ func TestCompactorThresholds(t *testing.T) {
if a, e := compactor.Metrics.CompactionSuccesses.Count(), int64(len(test.expCompactions)); a != e {
return fmt.Errorf("expected compactions %d; got %d", e, a)
}
if a, e := atomic.LoadInt32(compactionCount), int32(len(test.expCompactions)); a != e {
return fmt.Errorf("expected compactions %d; got %d", e, a)
}
if len(test.expCompactions) == 0 {
if cn := compactor.Metrics.CompactingNanos.Count(); cn > 0 {
return fmt.Errorf("expected compaction time to be 0; got %d", cn)
Expand Down Expand Up @@ -501,7 +541,7 @@ func TestCompactorProcessingInitialization(t *testing.T) {
capacityFn := func() (roachpb.StoreCapacity, error) {
return roachpb.StoreCapacity{LogicalBytes: 100 * defaultThresholdBytes}, nil
}
compactor, we, cleanup := testSetup(capacityFn)
compactor, we, compactionCount, cleanup := testSetup(capacityFn)
defer cleanup()

// Add a suggested compaction -- this won't get processed by this
Expand All @@ -518,7 +558,8 @@ func TestCompactorProcessingInitialization(t *testing.T) {
// Create a new fast compactor with a short wait time for processing,
// using the same engine so that it sees a non-empty queue.
stopper := stop.NewStopper()
fastCompactor := NewCompactor(we, capacityFn)
doneFn := func(_ context.Context) { atomic.AddInt32(compactionCount, 1) }
fastCompactor := NewCompactor(we, capacityFn, doneFn)
fastCompactor.opts.CompactionMinInterval = time.Millisecond
fastCompactor.Start(context.Background(), tracing.NewTracer(), stopper)
defer stopper.Stop(context.Background())
Expand All @@ -529,6 +570,9 @@ func TestCompactorProcessingInitialization(t *testing.T) {
if !reflect.DeepEqual(expComps, comps) {
return fmt.Errorf("expected %+v; got %+v", expComps, comps)
}
if a, e := atomic.LoadInt32(compactionCount), int32(1); a != e {
return fmt.Errorf("expected %d; got %d", e, a)
}
return nil
})
}
Expand All @@ -539,9 +583,12 @@ func TestCompactorCleansUpOldRecords(t *testing.T) {
defer leaktest.AfterTest(t)()

capacityFn := func() (roachpb.StoreCapacity, error) {
return roachpb.StoreCapacity{LogicalBytes: 100 * defaultThresholdBytes}, nil
return roachpb.StoreCapacity{
LogicalBytes: 100 * defaultThresholdBytes,
Available: 100 * defaultThresholdBytes,
}, nil
}
compactor, we, cleanup := testSetup(capacityFn)
compactor, we, compactionCount, cleanup := testSetup(capacityFn)
compactor.opts.CompactionMinInterval = time.Millisecond
compactor.opts.MaxSuggestedCompactionRecordAge = 1 * time.Millisecond
defer cleanup()
Expand All @@ -566,6 +613,9 @@ func TestCompactorCleansUpOldRecords(t *testing.T) {
if a, e := compactor.Metrics.BytesSkipped.Count(), compactor.opts.ThresholdBytes-1; a != e {
return fmt.Errorf("expected skipped bytes %d; got %d", e, a)
}
if a, e := atomic.LoadInt32(compactionCount), int32(0); a != e {
return fmt.Errorf("expected compactions processed %d; got %d", e, a)
}
// Verify compaction queue is empty.
if bytesQueued, err := compactor.examineQueue(context.Background()); err != nil || bytesQueued > 0 {
return fmt.Errorf("compaction queue not empty (%d bytes) or err %v", bytesQueued, err)
Expand Down
42 changes: 20 additions & 22 deletions pkg/storage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -855,7 +855,11 @@ func NewStore(cfg StoreConfig, eng engine.Engine, nodeDesc *roachpb.NodeDescript
s.tsCache = tscache.New(cfg.Clock, cfg.TimestampCachePageSize, tsCacheMetrics)
s.metrics.registry.AddMetricStruct(tsCacheMetrics)

s.compactor = compactor.NewCompactor(s.engine.(engine.WithSSTables), s.Capacity)
s.compactor = compactor.NewCompactor(
s.engine.(engine.WithSSTables),
s.Capacity,
func(ctx context.Context) { s.asyncGossipStore(ctx, "compactor-initiated rocksdb compaction") },
)
s.metrics.registry.AddMetricStruct(s.compactor.Metrics)

s.snapshotApplySem = make(chan struct{}, cfg.concurrentSnapshotApplyLimit)
Expand Down Expand Up @@ -1427,6 +1431,18 @@ func (s *Store) systemGossipUpdate(cfg config.SystemConfig) {
})
}

func (s *Store) asyncGossipStore(ctx context.Context, reason string) {
if err := s.stopper.RunAsyncTask(
ctx, fmt.Sprintf("storage.Store: gossip on %s", reason),
func(ctx context.Context) {
if err := s.GossipStore(ctx); err != nil {
log.Warningf(ctx, "error gossiping on %s: %s", reason, err)
}
}); err != nil {
log.Warningf(ctx, "unable to gossip on %s: %s", reason, err)
}
}

// GossipStore broadcasts the store on the gossip network.
func (s *Store) GossipStore(ctx context.Context) error {
// This should always return immediately and acts as a sanity check that we
Expand Down Expand Up @@ -1511,16 +1527,7 @@ func (s *Store) maybeGossipOnCapacityChange(ctx context.Context, cce capacityCha
// Reset countdowns to avoid unnecessary gossiping.
atomic.StoreInt32(&s.gossipRangeCountdown, 0)
atomic.StoreInt32(&s.gossipLeaseCountdown, 0)
// Send using an async task because GossipStore needs the store mutex.
if err := s.stopper.RunAsyncTask(
ctx, "storage.Store: gossip on capacity change",
func(ctx context.Context) {
if err := s.GossipStore(ctx); err != nil {
log.Warningf(ctx, "error gossiping on capacity change: %s", err)
}
}); err != nil {
log.Warningf(ctx, "unable to gossip on capacity change: %s", err)
}
s.asyncGossipStore(ctx, "capacity change")
}
}

Expand All @@ -1534,16 +1541,7 @@ func (s *Store) recordNewWritesPerSecond(newVal float64) {
return
}
if newVal < oldVal*.5 || newVal > oldVal*1.5 {
ctx := s.AnnotateCtx(context.TODO())
if err := s.stopper.RunAsyncTask(
ctx, "storage.Store: gossip on writes-per-second change",
func(ctx context.Context) {
if err := s.GossipStore(ctx); err != nil {
log.Warningf(ctx, "error gossiping on writes-per-second change: %s", err)
}
}); err != nil {
log.Warningf(ctx, "unable to gossip on writes-per-second change: %s", err)
}
s.asyncGossipStore(context.TODO(), "writes-per-second change")
}
}

Expand Down Expand Up @@ -2818,7 +2816,7 @@ func (s *Store) reserveSnapshot(
// getting stuck behind large snapshots managed by the replicate queue.
} else if header.CanDecline {
storeDesc, ok := s.cfg.StorePool.getStoreDescriptor(s.StoreID())
if ok && !maxCapacityCheck(storeDesc) {
if ok && (!maxCapacityCheck(storeDesc) || header.RangeSize > storeDesc.Capacity.Available) {
return nil, snapshotStoreTooFullMsg, nil
}
select {
Expand Down
Loading

0 comments on commit 3581b56

Please sign in to comment.