diff --git a/pkg/storage/client_raft_test.go b/pkg/storage/client_raft_test.go index 77589504936b..688bab6d029b 100644 --- a/pkg/storage/client_raft_test.go +++ b/pkg/storage/client_raft_test.go @@ -666,7 +666,7 @@ func TestRaftLogSizeAfterTruncation(t *testing.T) { // compute its size. repl.RaftLock() realSize, err := storage.ComputeRaftLogSize( - repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(), + context.Background(), repl.RangeID, repl.Engine(), repl.SideloadedRaftMuLocked(), ) size := repl.GetRaftLogSize() repl.RaftUnlock() diff --git a/pkg/storage/client_replica_gc_test.go b/pkg/storage/client_replica_gc_test.go index 52cf092af50b..5ec980a4a9e0 100644 --- a/pkg/storage/client_replica_gc_test.go +++ b/pkg/storage/client_replica_gc_test.go @@ -127,6 +127,7 @@ func TestReplicaGCQueueDropReplicaDirect(t *testing.T) { dir := repl1.SideloadedRaftMuLocked().Dir() repl1.RaftUnlock() _, err := os.Stat(dir) + if os.IsNotExist(err) { return nil } diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index faf95fab10d4..f198836a3237 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -322,7 +322,8 @@ func (r *Replica) ShouldBackpressureWrites() bool { return r.shouldBackpressureWrites() } -// GetRaftLogSize returns the raft log size. +// GetRaftLogSize returns the approximate raft log size. See r.mu.raftLogSize +// for details. func (r *Replica) GetRaftLogSize() int64 { r.mu.RLock() defer r.mu.RUnlock() diff --git a/pkg/storage/raft_log_queue.go b/pkg/storage/raft_log_queue.go index 1dcedd3dc092..6c889832b150 100644 --- a/pkg/storage/raft_log_queue.go +++ b/pkg/storage/raft_log_queue.go @@ -342,12 +342,7 @@ func (td *truncateDecision) String() string { func (td *truncateDecision) NumTruncatableIndexes() int { if td.NewFirstIndex < td.Input.FirstIndex { - log.Fatalf( - context.Background(), - "invalid truncate decision: first index would move from %d to %d", - td.Input.FirstIndex, - td.NewFirstIndex, - ) + return 0 } return int(td.NewFirstIndex - td.Input.FirstIndex) } @@ -490,7 +485,35 @@ func (rlq *raftLogQueue) shouldQueue( log.Warning(ctx, err) return false, 0 } - return decision.ShouldTruncate(), float64(decision.Input.LogSize) + + shouldQ, _, prio := rlq.shouldQueueImpl(ctx, decision) + return shouldQ, prio +} + +// shouldQueueImpl returns whether the given truncate decision should lead to +// a log truncation. This is either the case if the decision says so or of +// we want to recompute the log size (in which case `recomputeRaftLogSize` and +// `shouldQ` are both true and a reasonable priority is returned). +func (rlq *raftLogQueue) shouldQueueImpl( + ctx context.Context, decision truncateDecision, +) (shouldQ bool, recomputeRaftLogSize bool, priority float64) { + if decision.ShouldTruncate() { + return true, false, float64(decision.Input.LogSize) + } + if decision.Input.LogSize > 0 || + decision.Input.LastIndex == decision.Input.FirstIndex { + + return false, false, 0 + } + // We have a nonempty log (first index != last index) and think that its size is + // zero, which can't be true (even an empty entry's Size() is nonzero). We queue + // the replica; processing it will force a recomputation. For the priority, we + // have to pick one as we usually use the log size which is not available here. + // Going half-way between zero and the MaxLogSize should give a good tradeoff + // between processing the recomputation quickly, and not starving replicas which + // see a significant amount of write traffic until they run over and truncate + // more aggressively than they need to. + return true, true, 1.0 + float64(decision.Input.MaxLogSize)/2.0 } // process truncates the raft log of the range if the replica is the raft @@ -502,6 +525,35 @@ func (rlq *raftLogQueue) process(ctx context.Context, r *Replica, _ *config.Syst return err } + if _, recompute, _ := rlq.shouldQueueImpl(ctx, decision); recompute { + log.VEventf(ctx, 2, "recomputing raft log based on decision %+v", decision) + + // We need to hold raftMu both to access the sideloaded storage and to + // make sure concurrent Raft activity doesn't foul up our update to the + // cached in-memory values. + r.raftMu.Lock() + n, err := ComputeRaftLogSize(ctx, r.RangeID, r.Engine(), r.raftMu.sideloaded) + if err == nil { + r.mu.Lock() + r.mu.raftLogSize = n + r.mu.raftLogLastCheckSize = n + r.mu.Unlock() + } + r.raftMu.Unlock() + + if err != nil { + return errors.Wrap(err, "recomputing raft log size") + } + + log.VEventf(ctx, 2, "recomputed raft log size to %s", humanizeutil.IBytes(n)) + + // Override the decision, now that an accurate log size is available. + decision, err = newTruncateDecision(ctx, r) + if err != nil { + return err + } + } + // Can and should the raft logs be truncated? if decision.ShouldTruncate() { if n := decision.NumNewRaftSnapshots(); log.V(1) || n > 0 && rlq.logSnapshots.ShouldProcess(timeutil.Now()) { diff --git a/pkg/storage/raft_log_queue_test.go b/pkg/storage/raft_log_queue_test.go index 9c2ca5191032..bc68c4011ab1 100644 --- a/pkg/storage/raft_log_queue_test.go +++ b/pkg/storage/raft_log_queue_test.go @@ -15,6 +15,7 @@ package storage import ( + "bytes" "context" "fmt" "math" @@ -23,7 +24,9 @@ import ( "time" "github.com/cockroachdb/cockroach/pkg/internal/client" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -111,6 +114,7 @@ func TestGetQuorumIndex(t *testing.T) { func TestComputeTruncateDecision(t *testing.T) { defer leaktest.AfterTest(t)() + ctx := context.Background() const targetSize = 1000 @@ -209,23 +213,48 @@ func TestComputeTruncateDecision(t *testing.T) { "should truncate: false [truncate 0 entries to first index 2 (chosen via: first index)]", }} for i, c := range testCases { - status := raft.Status{ - Progress: make(map[uint64]raft.Progress), - } - for j, v := range c.progress { - status.Progress[uint64(j)] = raft.Progress{RecentActive: true, State: raft.ProgressStateReplicate, Match: v, Next: v + 1} - } - decision := computeTruncateDecision(truncateDecisionInput{ - RaftStatus: status, - LogSize: c.raftLogSize, - MaxLogSize: targetSize, - FirstIndex: c.firstIndex, - LastIndex: c.lastIndex, - PendingPreemptiveSnapshotIndex: c.pendingSnapshot, + t.Run(fmt.Sprintf("%+v", c), func(t *testing.T) { + status := raft.Status{ + Progress: make(map[uint64]raft.Progress), + } + for j, v := range c.progress { + status.Progress[uint64(j)] = raft.Progress{RecentActive: true, State: raft.ProgressStateReplicate, Match: v, Next: v + 1} + } + input := truncateDecisionInput{ + RaftStatus: status, + LogSize: c.raftLogSize, + MaxLogSize: targetSize, + FirstIndex: c.firstIndex, + LastIndex: c.lastIndex, + PendingPreemptiveSnapshotIndex: c.pendingSnapshot, + } + decision := computeTruncateDecision(input) + if act, exp := decision.String(), c.exp; act != exp { + t.Errorf("%d: got:\n%s\nwanted:\n%s", i, act, exp) + } + + // Verify the triggers that queue a range for recomputation. In essence, + // when the raft log size is zero we'll want to suggest a truncation and + // also a recomputation. Before the log size is zero, we'll just see the + // decision play out as before. With a zero log size, we always want to + // truncate and recompute. + // The real tests for this are in TestRaftLogQueueShouldQueue, but this is + // some nice extra coverage. + should, recompute, prio := (*raftLogQueue)(nil).shouldQueueImpl(ctx, decision) + assert.Equal(t, decision.ShouldTruncate(), should) + assert.False(t, recompute) + assert.Equal(t, decision.ShouldTruncate(), prio != 0) + input.LogSize = 0 + input.RaftStatus.RaftState = raft.StateLeader + if input.LastIndex <= input.FirstIndex { + input.LastIndex = input.FirstIndex + 1 + } + decision = computeTruncateDecision(input) + should, recompute, prio = (*raftLogQueue)(nil).shouldQueueImpl(ctx, decision) + assert.True(t, should) + assert.True(t, prio > 0) + assert.True(t, recompute) }) - if act, exp := decision.String(), c.exp; act != exp { - t.Errorf("%d: got:\n%s\nwanted:\n%s", i, act, exp) - } } } @@ -334,7 +363,7 @@ func verifyLogSizeInSync(t *testing.T, r *Replica) { r.mu.Lock() raftLogSize := r.mu.raftLogSize r.mu.Unlock() - actualRaftLogSize, err := ComputeRaftLogSize(r.RangeID, r.Engine(), r.SideloadedRaftMuLocked()) + actualRaftLogSize, err := ComputeRaftLogSize(context.Background(), r.RangeID, r.Engine(), r.SideloadedRaftMuLocked()) if err != nil { t.Fatal(err) } @@ -783,3 +812,104 @@ func TestTruncateLog(t *testing.T) { t.Errorf("invalid term 0 for truncated entry") } } + +func TestRaftLogQueueShouldQueueRecompute(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + var rlq *raftLogQueue + + _ = ctx + _ = rlq + + // NB: Cases for which decision.ShouldTruncate() is true are tested in + // TestComputeTruncateDecision, so here the decision itself is never + // positive. + var decision truncateDecision + decision.Input.MaxLogSize = 1000 + + verify := func(shouldQ bool, recompute bool, prio float64) { + t.Helper() + isQ, isR, isP := rlq.shouldQueueImpl(ctx, decision) + assert.Equal(t, shouldQ, isQ) + assert.Equal(t, recompute, isR) + assert.Equal(t, prio, isP) + } + + verify(false, false, 0) + + // Check all the boxes: unknown log size, leader, and non-empty log. + decision.Input.LogSize = 0 + decision.Input.FirstIndex = 10 + decision.Input.LastIndex = 20 + + verify(true, true, 1+float64(decision.Input.MaxLogSize)/2) + + golden := decision + + // Check all boxes except that log is not empty. + decision.Input.LogSize = 1 + verify(false, false, 0) + + // Check all boxes except that log is empty. + decision = golden + decision.Input.LastIndex = decision.Input.FirstIndex + verify(false, false, 0) +} + +// TestTruncateLogRecompute checks that if raftLogSize is zero, the raft log +// queue picks up the replica, recomputes the log size, and considers a +// truncation. +func TestTruncateLogRecompute(t *testing.T) { + defer leaktest.AfterTest(t)() + + ctx := context.Background() + dir, cleanup := testutils.TempDir(t) + defer cleanup() + + cache := engine.NewRocksDBCache(1 << 20) + defer cache.Release() + eng, err := engine.NewRocksDB(engine.RocksDBConfig{Dir: dir}, cache) + if err != nil { + t.Fatal(err) + } + defer eng.Close() + + tc := testContext{ + engine: eng, + } + stopper := stop.NewStopper() + defer stopper.Stop(context.TODO()) + tc.Start(t, stopper) + tc.repl.store.SetRaftLogQueueActive(false) + + key := roachpb.Key("a") + repl := tc.store.LookupReplica(keys.MustAddr(key)) + + var v roachpb.Value + v.SetBytes(bytes.Repeat([]byte("x"), RaftLogQueueStaleSize*5)) + put := roachpb.NewPut(key, v) + var ba roachpb.BatchRequest + ba.Add(put) + ba.RangeID = repl.RangeID + + if _, pErr := tc.store.Send(ctx, ba); pErr != nil { + t.Fatal(pErr) + } + + decision, err := newTruncateDecision(ctx, repl) + assert.NoError(t, err) + assert.True(t, decision.ShouldTruncate()) + + repl.mu.Lock() + repl.mu.raftLogSize = 0 + repl.mu.raftLogLastCheckSize = 0 + repl.mu.Unlock() + + // Force a raft log queue run. The result should be a nonzero Raft log of + // size below the threshold (though we won't check that since it could have + // grown over threshold again; we compute instead that its size is correct). + tc.store.SetRaftLogQueueActive(true) + tc.store.MustForceRaftLogScanAndProcess() + verifyLogSizeInSync(t, repl) +} diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index abeb4504036f..85044f138fc3 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -250,14 +250,16 @@ type Replica struct { // already finished snapshot "pending" for extended periods of time // (preventing log truncation). snapshotLogTruncationConstraints map[uuid.UUID]snapTruncationInfo - // raftLogSize is the approximate size in bytes of the persisted raft log. - // The value is not persisted and is computed lazily, paced by the raft - // log truncation queue which will recompute the log size when it finds - // it uninitialized. This recomputation mechanism isn't relevant for - // ranges which see regular write activity (for those the log size will - // deviate from zero quickly, and so it won't be recomputed but will - // undercount until the first truncation is carried out), but it prevents - // a large dormant Raft log from sitting around forever. + // raftLogSize is the approximate size in bytes of the persisted raft + // log, including sideloaded entries' payloads. The value itself is not + // persisted and is computed lazily, paced by the raft log truncation + // queue which will recompute the log size when it finds it + // uninitialized. This recomputation mechanism isn't relevant for ranges + // which see regular write activity (for those the log size will deviate + // from zero quickly, and so it won't be recomputed but will undercount + // until the first truncation is carried out), but it prevents a large + // dormant Raft log from sitting around forever, which has caused problems + // in the past. raftLogSize int64 // raftLogLastCheckSize is the value of raftLogSize the last time the Raft // log was checked for truncation or at the time of the last Raft log diff --git a/pkg/storage/replica_raft.go b/pkg/storage/replica_raft.go index 5032e801e306..f7c481d5607f 100644 --- a/pkg/storage/replica_raft.go +++ b/pkg/storage/replica_raft.go @@ -2461,23 +2461,24 @@ func handleTruncatedStateBelowRaft( return true, nil } -// ComputeRaftLogSize computes the size of the Raft log from the storage engine. -// This will iterate over the Raft log and sideloaded files, so depending on the -// size of these it can be mildly to extremely expensive and thus should not be -// called frequently. +// ComputeRaftLogSize computes the size (in bytes) of the Raft log from the +// storage engine. This will iterate over the Raft log and sideloaded files, so +// depending on the size of these it can be mildly to extremely expensive and +// thus should not be called frequently. // // The sideloaded storage may be nil, in which case it is treated as empty. func ComputeRaftLogSize( - rangeID roachpb.RangeID, reader engine.Reader, sideloaded SideloadStorage, + ctx context.Context, rangeID roachpb.RangeID, reader engine.Reader, sideloaded SideloadStorage, ) (int64, error) { prefix := keys.RaftLogPrefix(rangeID) + prefixEnd := prefix.PrefixEnd() iter := reader.NewIterator(engine.IterOptions{ LowerBound: prefix, - UpperBound: prefix.PrefixEnd(), + UpperBound: prefixEnd, }) defer iter.Close() from := engine.MakeMVCCMetadataKey(prefix) - to := engine.MakeMVCCMetadataKey(prefix.PrefixEnd()) + to := engine.MakeMVCCMetadataKey(prefixEnd) ms, err := iter.ComputeStats(from, to, 0 /* nowNanos */) if err != nil { return 0, err @@ -2485,7 +2486,9 @@ func ComputeRaftLogSize( var totalSideloaded int64 if sideloaded != nil { var err error - _, totalSideloaded, err = sideloaded.TruncateTo(context.TODO(), 1) + // Truncating all indexes strictly smaller than zero is a no-op but + // gives us the number of bytes in the storage back. + _, totalSideloaded, err = sideloaded.TruncateTo(ctx, 0) if err != nil { return 0, err }