Skip to content

Commit

Permalink
storage: recompute raft log size in Raft log queue when zero
Browse files Browse the repository at this point in the history
When a node starts, it has no knowledge of the true size of its
replicas' Raft logs. However, without this knowledge, log truncations on
replicas that have no regular write activity will not take place (unless
the log is too long).

This doesn't matter in most circumstances (though it wastes a bit of
space) but if it does (reliably hit by users) it can be problematic
since snapshots contain the historical Raft log, and snapshots are
refused when they contain a large amount of logs. In particular in
conjunction with IMPORT/RESTORE (which reliably creates log entries
weighing in at 25mb each) this often resulted in ranges that were unable
to rebalance until somehow convinced to truncate their logs. More
generally, any workload in which ~100 entries could combine to a size of
more than 16mb would trigger this problem if a node was restarted and
became leaseholder after the restart while retaining such a log.

We are planning to omit the historical log from snapshots in the near
future (solving this problem), but in the interim, add a mechanism that
recomputes the raft log size if it is zero (and thus definitely
unknown).

Fixes #33071.

Release note (bug fix): Prevent a situation in which snapshots would be
refused repeatedly over long periods of time, with error messages such
as "aborting snapshot because raft log is too large" appearing in the
logs, and often accompanied by under-replicated ranges in the UI.
  • Loading branch information
tbg committed Feb 27, 2019
1 parent aa0943d commit ffe2d1d
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 42 deletions.
2 changes: 1 addition & 1 deletion pkg/storage/client_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -666,7 +666,7 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) {
// compute its size.
repl.RaftLock()
realSize, err := storage.ComputeRaftLogSize(
repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(),
context.Background(), repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(),
)
size := repl.GetRaftLogSize()
repl.RaftUnlock()
Expand Down
1 change: 1 addition & 0 deletions pkg/storage/client_replica_gc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) {
dir := repl1.SideloadedRaftMuLocked().Dir()
repl1.RaftUnlock()
_, err := os.Stat(dir)

if os.IsNotExist(err) {
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/storage/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ func (r *Replica) ShouldBackpressureWrites() bool {
return r.shouldBackpressureWrites()
}

// GetRaftLogSize returns the raft log size.
// GetRaftLogSize returns the approximate raft log size. See r.mu.raftLogSize
// for details.
func (r *Replica) GetRaftLogSize() int64 {
r.mu.RLock()
defer r.mu.RUnlock()
Expand Down
66 changes: 59 additions & 7 deletions pkg/storage/raft_log_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,12 +342,7 @@ func (td *truncateDecision) String() string {

func (td *truncateDecision) NumTruncatableIndexes() int {
if td.NewFirstIndex < td.Input.FirstIndex {
log.Fatalf(
context.Background(),
"invalid truncate decision: first index would move from %d to %d",
td.Input.FirstIndex,
td.NewFirstIndex,
)
return 0
}
return int(td.NewFirstIndex - td.Input.FirstIndex)
}
Expand Down Expand Up @@ -490,7 +485,35 @@ func (rlq *raftLogQueue) shouldQueue(
log.Warning(ctx, err)
return false, 0
}
return decision.ShouldTruncate(), float64(decision.Input.LogSize)

shouldQ, _, prio := rlq.shouldQueueImpl(ctx, decision)
return shouldQ, prio
}

// shouldQueueImpl returns whether the given truncate decision should lead to
// a log truncation. This is either the case if the decision says so or of
// we want to recompute the log size (in which case `recomputeRaftLogSize` and
// `shouldQ` are both true and a reasonable priority is returned).
func (rlq *raftLogQueue) shouldQueueImpl(
ctx context.Context, decision truncateDecision,
) (shouldQ bool, recomputeRaftLogSize bool, priority float64) {
if decision.ShouldTruncate() {
return true, false, float64(decision.Input.LogSize)
}
if decision.Input.LogSize > 0 ||
decision.Input.LastIndex == decision.Input.FirstIndex {

return false, false, 0
}
// We have a nonempty log (first index != last index) and think that its size is
// zero, which can't be true (even an empty entry's Size() is nonzero). We queue
// the replica; processing it will force a recomputation. For the priority, we
// have to pick one as we usually use the log size which is not available here.
// Going half-way between zero and the MaxLogSize should give a good tradeoff
// between processing the recomputation quickly, and not starving replicas which
// see a significant amount of write traffic until they run over and truncate
// more aggressively than they need to.
return true, true, 1.0 + float64(decision.Input.MaxLogSize)/2.0
}

// process truncates the raft log of the range if the replica is the raft
Expand All @@ -502,6 +525,35 @@ func (rlq *raftLogQueue) process(ctx context.Context, r *Replica, _ *config.Syst
return err
}

if _, recompute, _ := rlq.shouldQueueImpl(ctx, decision); recompute {
log.VEventf(ctx, 2, "recomputing raft log based on decision %+v", decision)

// We need to hold raftMu both to access the sideloaded storage and to
// make sure concurrent Raft activity doesn't foul up our update to the
// cached in-memory values.
r.raftMu.Lock()
n, err := ComputeRaftLogSize(ctx, r.RangeID, r.Engine(), r.raftMu.sideloaded)
if err == nil {
r.mu.Lock()
r.mu.raftLogSize = n
r.mu.raftLogLastCheckSize = n
r.mu.Unlock()
}
r.raftMu.Unlock()

if err != nil {
return errors.Wrap(err, "recomputing raft log size")
}

log.VEventf(ctx, 2, "recomputed raft log size to %s", humanizeutil.IBytes(n))

// Override the decision, now that an accurate log size is available.
decision, err = newTruncateDecision(ctx, r)
if err != nil {
return err
}
}

// Can and should the raft logs be truncated?
if decision.ShouldTruncate() {
if n := decision.NumNewRaftSnapshots(); log.V(1) || n > 0 && rlq.logSnapshots.ShouldProcess(timeutil.Now()) {
Expand Down
164 changes: 147 additions & 17 deletions pkg/storage/raft_log_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package storage

import (
"bytes"
"context"
"fmt"
"math"
Expand All @@ -23,7 +24,9 @@ import (
"time"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand Down Expand Up @@ -111,6 +114,7 @@ func TestGetQuorumIndex(t *testing.T) {

func TestComputeTruncateDecision(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

const targetSize = 1000

Expand Down Expand Up @@ -209,23 +213,48 @@ func TestComputeTruncateDecision(t *testing.T) {
"should truncate: false [truncate 0 entries to first index 2 (chosen via: first index)]",
}}
for i, c := range testCases {
status := raft.Status{
Progress: make(map[uint64]raft.Progress),
}
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{RecentActive: true, State: raft.ProgressStateReplicate, Match: v, Next: v + 1}
}
decision := computeTruncateDecision(truncateDecisionInput{
RaftStatus: status,
LogSize: c.raftLogSize,
MaxLogSize: targetSize,
FirstIndex: c.firstIndex,
LastIndex: c.lastIndex,
PendingPreemptiveSnapshotIndex: c.pendingSnapshot,
t.Run(fmt.Sprintf("%+v", c), func(t *testing.T) {
status := raft.Status{
Progress: make(map[uint64]raft.Progress),
}
for j, v := range c.progress {
status.Progress[uint64(j)] = raft.Progress{RecentActive: true, State: raft.ProgressStateReplicate, Match: v, Next: v + 1}
}
input := truncateDecisionInput{
RaftStatus: status,
LogSize: c.raftLogSize,
MaxLogSize: targetSize,
FirstIndex: c.firstIndex,
LastIndex: c.lastIndex,
PendingPreemptiveSnapshotIndex: c.pendingSnapshot,
}
decision := computeTruncateDecision(input)
if act, exp := decision.String(), c.exp; act != exp {
t.Errorf("%d: got:\n%s\nwanted:\n%s", i, act, exp)
}

// Verify the triggers that queue a range for recomputation. In essence,
// when the raft log size is zero we'll want to suggest a truncation and
// also a recomputation. Before the log size is zero, we'll just see the
// decision play out as before. With a zero log size, we always want to
// truncate and recompute.
// The real tests for this are in TestRaftLogQueueShouldQueue, but this is
// some nice extra coverage.
should, recompute, prio := (*raftLogQueue)(nil).shouldQueueImpl(ctx, decision)
assert.Equal(t, decision.ShouldTruncate(), should)
assert.False(t, recompute)
assert.Equal(t, decision.ShouldTruncate(), prio != 0)
input.LogSize = 0
input.RaftStatus.RaftState = raft.StateLeader
if input.LastIndex <= input.FirstIndex {
input.LastIndex = input.FirstIndex + 1
}
decision = computeTruncateDecision(input)
should, recompute, prio = (*raftLogQueue)(nil).shouldQueueImpl(ctx, decision)
assert.True(t, should)
assert.True(t, prio > 0)
assert.True(t, recompute)
})
if act, exp := decision.String(), c.exp; act != exp {
t.Errorf("%d: got:\n%s\nwanted:\n%s", i, act, exp)
}
}
}

Expand Down Expand Up @@ -334,7 +363,7 @@ func verifyLogSizeInSync(t *testing.T, r *Replica) {
r.mu.Lock()
raftLogSize := r.mu.raftLogSize
r.mu.Unlock()
actualRaftLogSize, err := ComputeRaftLogSize(r.RangeID, r.Engine(), r.SideloadedRaftMuLocked())
actualRaftLogSize, err := ComputeRaftLogSize(context.Background(), r.RangeID, r.Engine(), r.SideloadedRaftMuLocked())
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -783,3 +812,104 @@ func TestTruncateLog(t *testing.T) {
t.Errorf("invalid term 0 for truncated entry")
}
}

func TestRaftLogQueueShouldQueueRecompute(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
var rlq *raftLogQueue

_ = ctx
_ = rlq

// NB: Cases for which decision.ShouldTruncate() is true are tested in
// TestComputeTruncateDecision, so here the decision itself is never
// positive.
var decision truncateDecision
decision.Input.MaxLogSize = 1000

verify := func(shouldQ bool, recompute bool, prio float64) {
t.Helper()
isQ, isR, isP := rlq.shouldQueueImpl(ctx, decision)
assert.Equal(t, shouldQ, isQ)
assert.Equal(t, recompute, isR)
assert.Equal(t, prio, isP)
}

verify(false, false, 0)

// Check all the boxes: unknown log size, leader, and non-empty log.
decision.Input.LogSize = 0
decision.Input.FirstIndex = 10
decision.Input.LastIndex = 20

verify(true, true, 1+float64(decision.Input.MaxLogSize)/2)

golden := decision

// Check all boxes except that log is not empty.
decision.Input.LogSize = 1
verify(false, false, 0)

// Check all boxes except that log is empty.
decision = golden
decision.Input.LastIndex = decision.Input.FirstIndex
verify(false, false, 0)
}

// TestTruncateLogRecompute checks that if raftLogSize is zero, the raft log
// queue picks up the replica, recomputes the log size, and considers a
// truncation.
func TestTruncateLogRecompute(t *testing.T) {
defer leaktest.AfterTest(t)()

ctx := context.Background()
dir, cleanup := testutils.TempDir(t)
defer cleanup()

cache := engine.NewRocksDBCache(1 << 20)
defer cache.Release()
eng, err := engine.NewRocksDB(engine.RocksDBConfig{Dir: dir}, cache)
if err != nil {
t.Fatal(err)
}
defer eng.Close()

tc := testContext{
engine: eng,
}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)
tc.repl.store.SetRaftLogQueueActive(false)

key := roachpb.Key("a")
repl := tc.store.LookupReplica(keys.MustAddr(key))

var v roachpb.Value
v.SetBytes(bytes.Repeat([]byte("x"), RaftLogQueueStaleSize*5))
put := roachpb.NewPut(key, v)
var ba roachpb.BatchRequest
ba.Add(put)
ba.RangeID = repl.RangeID

if _, pErr := tc.store.Send(ctx, ba); pErr != nil {
t.Fatal(pErr)
}

decision, err := newTruncateDecision(ctx, repl)
assert.NoError(t, err)
assert.True(t, decision.ShouldTruncate())

repl.mu.Lock()
repl.mu.raftLogSize = 0
repl.mu.raftLogLastCheckSize = 0
repl.mu.Unlock()

// Force a raft log queue run. The result should be a nonzero Raft log of
// size below the threshold (though we won't check that since it could have
// grown over threshold again; we compute instead that its size is correct).
tc.store.SetRaftLogQueueActive(true)
tc.store.MustForceRaftLogScanAndProcess()
verifyLogSizeInSync(t, repl)
}
18 changes: 10 additions & 8 deletions pkg/storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,16 @@ type Replica struct {
// already finished snapshot "pending" for extended periods of time
// (preventing log truncation).
snapshotLogTruncationConstraints map[uuid.UUID]snapTruncationInfo
// raftLogSize is the approximate size in bytes of the persisted raft log.
// The value is not persisted and is computed lazily, paced by the raft
// log truncation queue which will recompute the log size when it finds
// it uninitialized. This recomputation mechanism isn't relevant for
// ranges which see regular write activity (for those the log size will
// deviate from zero quickly, and so it won't be recomputed but will
// undercount until the first truncation is carried out), but it prevents
// a large dormant Raft log from sitting around forever.
// raftLogSize is the approximate size in bytes of the persisted raft
// log, including sideloaded entries' payloads. The value itself is not
// persisted and is computed lazily, paced by the raft log truncation
// queue which will recompute the log size when it finds it
// uninitialized. This recomputation mechanism isn't relevant for ranges
// which see regular write activity (for those the log size will deviate
// from zero quickly, and so it won't be recomputed but will undercount
// until the first truncation is carried out), but it prevents a large
// dormant Raft log from sitting around forever, which has caused problems
// in the past.
raftLogSize int64
// raftLogLastCheckSize is the value of raftLogSize the last time the Raft
// log was checked for truncation or at the time of the last Raft log
Expand Down
19 changes: 11 additions & 8 deletions pkg/storage/replica_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -2461,31 +2461,34 @@ func handleTruncatedStateBelowRaft(
return true, nil
}

// ComputeRaftLogSize computes the size of the Raft log from the storage engine.
// This will iterate over the Raft log and sideloaded files, so depending on the
// size of these it can be mildly to extremely expensive and thus should not be
// called frequently.
// ComputeRaftLogSize computes the size (in bytes) of the Raft log from the
// storage engine. This will iterate over the Raft log and sideloaded files, so
// depending on the size of these it can be mildly to extremely expensive and
// thus should not be called frequently.
//
// The sideloaded storage may be nil, in which case it is treated as empty.
func ComputeRaftLogSize(
rangeID roachpb.RangeID, reader engine.Reader, sideloaded SideloadStorage,
ctx context.Context, rangeID roachpb.RangeID, reader engine.Reader, sideloaded SideloadStorage,
) (int64, error) {
prefix := keys.RaftLogPrefix(rangeID)
prefixEnd := prefix.PrefixEnd()
iter := reader.NewIterator(engine.IterOptions{
LowerBound: prefix,
UpperBound: prefix.PrefixEnd(),
UpperBound: prefixEnd,
})
defer iter.Close()
from := engine.MakeMVCCMetadataKey(prefix)
to := engine.MakeMVCCMetadataKey(prefix.PrefixEnd())
to := engine.MakeMVCCMetadataKey(prefixEnd)
ms, err := iter.ComputeStats(from, to, 0 /* nowNanos */)
if err != nil {
return 0, err
}
var totalSideloaded int64
if sideloaded != nil {
var err error
_, totalSideloaded, err = sideloaded.TruncateTo(context.TODO(), 1)
// Truncating all indexes strictly smaller than zero is a no-op but
// gives us the number of bytes in the storage back.
_, totalSideloaded, err = sideloaded.TruncateTo(ctx, 0)
if err != nil {
return 0, err
}
Expand Down

0 comments on commit ffe2d1d

Please sign in to comment.