diff --git a/pkg/kv/kvserver/logstore/BUILD.bazel b/pkg/kv/kvserver/logstore/BUILD.bazel index e3cb78453749..7e29b6803e70 100644 --- a/pkg/kv/kvserver/logstore/BUILD.bazel +++ b/pkg/kv/kvserver/logstore/BUILD.bazel @@ -8,6 +8,7 @@ go_library( "sideload.go", "sideload_disk.go", "stateloader.go", + "sync_waiter.go", ], importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore", visibility = ["//visibility:public"], @@ -28,9 +29,11 @@ go_library( "//pkg/util/log", "//pkg/util/metric", "//pkg/util/protoutil", + "//pkg/util/stop", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//oserror", + "@com_github_cockroachdb_pebble//record", "@com_github_cockroachdb_redact//:redact", "@io_etcd_go_raft_v3//:raft", "@io_etcd_go_raft_v3//raftpb", @@ -43,6 +46,7 @@ go_test( srcs = [ "logstore_bench_test.go", "sideload_test.go", + "sync_waiter_test.go", ], args = ["-test.timeout=295s"], embed = [":logstore"], @@ -60,6 +64,7 @@ go_test( "//pkg/util/log", "//pkg/util/metric", "//pkg/util/protoutil", + "//pkg/util/stop", "//pkg/util/tracing", "@com_github_cockroachdb_errors//:errors", "@com_github_cockroachdb_errors//oserror", diff --git a/pkg/kv/kvserver/logstore/logstore.go b/pkg/kv/kvserver/logstore/logstore.go index f1784431b0f5..490c4bcd6f7f 100644 --- a/pkg/kv/kvserver/logstore/logstore.go +++ b/pkg/kv/kvserver/logstore/logstore.go @@ -13,6 +13,7 @@ package logstore import ( "context" + "fmt" "sync" "time" @@ -43,25 +44,24 @@ var disableSyncRaftLog = settings.RegisterBoolSetting( envutil.EnvOrDefaultBool("COCKROACH_DISABLE_RAFT_LOG_SYNCHRONIZATION_UNSAFE", false), ) -// Ready contains the log entries and state to be saved to stable storage. This -// is a subset of raft.Ready relevant to log storage. All fields are read-only. -type Ready struct { - // The current state of a replica to be saved to stable storage. Empty if - // there is no update. - raftpb.HardState +var enableNonBlockingRaftLogSync = settings.RegisterBoolSetting( + settings.TenantWritable, + "kv.raft_log.non_blocking_synchronization.enabled", + "set to true to enable non-blocking synchronization on Raft log writes to "+ + "persistent storage. Setting to true does not risk data loss or data corruption "+ + "on server crashes, but can reduce write latency.", + envutil.EnvOrDefaultBool("COCKROACH_ENABLE_RAFT_LOG_NON_BLOCKING_SYNCHRONIZATION", true), +) - // Entries specifies entries to be saved to stable storage. Empty if there is - // no update. - Entries []raftpb.Entry +// MsgStorageAppend is a raftpb.Message with type MsgStorageAppend. +type MsgStorageAppend raftpb.Message - // MustSync indicates whether the HardState and Entries must be synchronously - // written to disk, or if an asynchronous write is permissible. - MustSync bool -} - -// MakeReady constructs a Ready struct from raft.Ready. -func MakeReady(from raft.Ready) Ready { - return Ready{HardState: from.HardState, Entries: from.Entries, MustSync: from.MustSync} +// MakeMsgStorageAppend constructs a MsgStorageAppend from a raftpb.Message. +func MakeMsgStorageAppend(m raftpb.Message) MsgStorageAppend { + if m.Type != raftpb.MsgStorageAppend { + panic(fmt.Sprintf("unexpected message type %s", m.Type)) + } + return MsgStorageAppend(m) } // RaftState stores information about the last entry and the size of the log. @@ -86,6 +86,8 @@ type AppendStats struct { PebbleBytes int64 Sync bool + // If true, PebbleEnd-PebbleBegin does not include the sync time. + NonBlocking bool } // Metrics contains metrics specific to the log storage. @@ -99,37 +101,70 @@ type LogStore struct { Engine storage.Engine Sideload SideloadStorage StateLoader StateLoader + SyncWaiter *SyncWaiterLoop EntryCache *raftentry.Cache Settings *cluster.Settings Metrics Metrics } +// SyncCallback is a callback that is notified when a raft log write has been +// durably committed to disk. The function is handed the response messages that +// are associated with the MsgStorageAppend that triggered the fsync. +type SyncCallback interface { + OnLogSync(context.Context, []raftpb.Message) +} + func newStoreEntriesBatch(eng storage.Engine) storage.Batch { // Use an unindexed batch because we don't need to read our writes, and // it is more efficient. return eng.NewUnindexedBatch(false /* writeOnly */) } -// StoreEntries persists newly appended Raft log Entries to the log storage. +// StoreEntries persists newly appended Raft log Entries to the log storage, +// then calls the provided callback with the input's response messages (if any) +// once the entries are durable. The durable log write may or may not be +// blocking (and therefore the callback may or may not be called synchronously), +// depending on the kv.raft_log.non_blocking_synchronization.enabled cluster +// setting. Either way, the effects of the log append will be immediately +// visible to readers of the Engine. +// // Accepts the state of the log before the operation, returns the state after. // Persists HardState atomically with, or strictly after Entries. func (s *LogStore) StoreEntries( - ctx context.Context, state RaftState, rd Ready, stats *AppendStats, + ctx context.Context, state RaftState, m MsgStorageAppend, cb SyncCallback, stats *AppendStats, ) (RaftState, error) { batch := newStoreEntriesBatch(s.Engine) - defer batch.Close() - return s.storeEntriesAndCommitBatch(ctx, state, rd, stats, batch) + return s.storeEntriesAndCommitBatch(ctx, state, m, cb, stats, batch) } +// storeEntriesAndCommitBatch is like StoreEntries, but it accepts a +// storage.Batch, which it takes responsibility for committing and closing. func (s *LogStore) storeEntriesAndCommitBatch( - ctx context.Context, state RaftState, rd Ready, stats *AppendStats, batch storage.Batch, + ctx context.Context, + state RaftState, + m MsgStorageAppend, + cb SyncCallback, + stats *AppendStats, + batch storage.Batch, ) (RaftState, error) { + // Before returning, Close the batch if we haven't handed ownership of it to a + // SyncWaiterLoop. If batch == nil, SyncWaiterLoop is responsible for closing + // it once the in-progress disk writes complete. + defer func() { + if batch != nil { + defer batch.Close() + } + }() + prevLastIndex := state.LastIndex - if len(rd.Entries) > 0 { + overwriting := false + if len(m.Entries) > 0 { + firstPurge := m.Entries[0].Index // first new entry written + overwriting = firstPurge <= prevLastIndex stats.Begin = timeutil.Now() // All of the entries are appended to distinct keys, returning a new // last index. - thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := MaybeSideloadEntries(ctx, rd.Entries, s.Sideload) + thinEntries, numSideloaded, sideLoadedEntriesSize, otherEntriesSize, err := MaybeSideloadEntries(ctx, m.Entries, s.Sideload) if err != nil { const expl = "during sideloading" return RaftState{}, errors.Wrap(err, expl) @@ -148,16 +183,21 @@ func (s *LogStore) storeEntriesAndCommitBatch( stats.End = timeutil.Now() } - if !raft.IsEmptyHardState(rd.HardState) { + hs := raftpb.HardState{ + Term: m.Term, + Vote: m.Vote, + Commit: m.Commit, + } + if !raft.IsEmptyHardState(hs) { // NB: Note that without additional safeguards, it's incorrect to write - // the HardState before appending rd.Entries. When catching up, a follower + // the HardState before appending m.Entries. When catching up, a follower // will receive Entries that are immediately Committed in the same // Ready. If we persist the HardState but happen to lose the Entries, // assertions can be tripped. // // We have both in the same batch, so there's no problem. If that ever // changes, we must write and sync the Entries before the HardState. - if err := s.StateLoader.SetHardState(ctx, batch, rd.HardState); err != nil { + if err := s.StateLoader.SetHardState(ctx, batch, hs); err != nil { const expl = "during setHardState" return RaftState{}, errors.Wrap(err, expl) } @@ -167,9 +207,9 @@ func (s *LogStore) storeEntriesAndCommitBatch( // // Note that the data is visible to other goroutines before it is synced to // disk. This is fine. The important constraints are that these syncs happen - // before Raft messages are sent and before the call to RawNode.Advance. Our - // regular locking is sufficient for this and if other goroutines can see the - // data early, that's fine. In particular, snapshots are not a problem (I + // before the MsgStorageAppend's responses are delivered back to the RawNode. + // Our regular locking is sufficient for this and if other goroutines can see + // the data early, that's fine. In particular, snapshots are not a problem (I // think they're the only thing that might access log entries or HardState // from other goroutines). Snapshots do not include either the HardState or // uncommitted log entries, and even if they did include log entries that @@ -182,25 +222,57 @@ func (s *LogStore) storeEntriesAndCommitBatch( // (Replica), so this comment might need to move. stats.PebbleBegin = timeutil.Now() stats.PebbleBytes = int64(batch.Len()) - sync := rd.MustSync && !disableSyncRaftLog.Get(&s.Settings.SV) - if err := batch.Commit(sync); err != nil { - const expl = "while committing batch" - return RaftState{}, errors.Wrap(err, expl) - } - stats.Sync = sync - stats.PebbleEnd = timeutil.Now() - if rd.MustSync { - s.Metrics.RaftLogCommitLatency.RecordValue(stats.PebbleEnd.Sub(stats.PebbleBegin).Nanoseconds()) + wantsSync := len(m.Responses) > 0 + willSync := wantsSync && !disableSyncRaftLog.Get(&s.Settings.SV) + // Use the non-blocking log sync path if we are performing a log sync, the + // cluster setting to do so is enabled, and we are not overwriting any + // previous log entries. If we are overwriting, we may need to purge the + // sideloaded SSTables associated with overwritten entries. This must be + // performed after the corresponding entries are durably replaced and it's + // easier to do ensure proper ordering using a blocking log sync. This is a + // rare case, so it's not worth optimizing for. + nonBlockingSync := willSync && enableNonBlockingRaftLogSync.Get(&s.Settings.SV) && !overwriting + if nonBlockingSync { + // If non-blocking synchronization is enabled, apply the batched updates to + // the engine and initiate a synchronous disk write, but don't wait for the + // write to complete. Instead, enqueue that waiting on the SyncWaiterLoop, + // who will signal the callback when the write completes. + if err := batch.CommitNoSyncWait(); err != nil { + const expl = "while committing batch without sync wait" + return RaftState{}, errors.Wrap(err, expl) + } + stats.PebbleEnd = timeutil.Now() + s.SyncWaiter.enqueue(ctx, batch, func() { + // NOTE: run on the SyncWaiterLoop goroutine. + logCommitEnd := timeutil.Now() + s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds()) + cb.OnLogSync(ctx, m.Responses) + }) + // Do not Close batch on return. Will be Closed by SyncWaiterLoop. + batch = nil + } else { + if err := batch.Commit(willSync); err != nil { + const expl = "while committing batch" + return RaftState{}, errors.Wrap(err, expl) + } + stats.PebbleEnd = timeutil.Now() + if wantsSync { + logCommitEnd := stats.PebbleEnd + s.Metrics.RaftLogCommitLatency.RecordValue(logCommitEnd.Sub(stats.PebbleBegin).Nanoseconds()) + cb.OnLogSync(ctx, m.Responses) + } } + stats.Sync = wantsSync + stats.NonBlocking = nonBlockingSync - if len(rd.Entries) > 0 { + if overwriting { // We may have just overwritten parts of the log which contain // sideloaded SSTables from a previous term (and perhaps discarded some // entries that we didn't overwrite). Remove any such leftover on-disk // payloads (we can do that now because we've committed the deletion // just above). - firstPurge := rd.Entries[0].Index // first new entry written - purgeTerm := rd.Entries[0].Term - 1 + firstPurge := m.Entries[0].Index // first new entry written + purgeTerm := m.Entries[0].Term - 1 lastPurge := prevLastIndex // old end of the log, include in deletion purgedSize, err := maybePurgeSideloaded(ctx, s.Sideload, firstPurge, lastPurge, purgeTerm) if err != nil { @@ -216,7 +288,16 @@ func (s *LogStore) storeEntriesAndCommitBatch( // Update raft log entry cache. We clear any older, uncommitted log entries // and cache the latest ones. - s.EntryCache.Add(s.RangeID, rd.Entries, true /* truncate */) + // + // In the blocking log sync case, these entries are already durable. In the + // non-blocking case, these entries have been written to the pebble engine (so + // reads of the engine will see them), but they are not yet be durable. This + // means that the entry cache can lead the durable log. This is allowed by + // etcd/raft, which maintains its own tracking of entry durability by + // splitting its log into an unstable portion for entries that are not known + // to be durable and a stable portion for entries that are known to be + // durable. + s.EntryCache.Add(s.RangeID, m.Entries, true /* truncate */) return state, nil } diff --git a/pkg/kv/kvserver/logstore/logstore_bench_test.go b/pkg/kv/kvserver/logstore/logstore_bench_test.go index 0c3f8e0473e2..ebfdfd924bf3 100644 --- a/pkg/kv/kvserver/logstore/logstore_bench_test.go +++ b/pkg/kv/kvserver/logstore/logstore_bench_test.go @@ -36,6 +36,10 @@ func (b *discardBatch) Commit(bool) error { return nil } +type noopSyncCallback struct{} + +func (noopSyncCallback) OnLogSync(context.Context, []raftpb.Message) {} + func BenchmarkLogStore_StoreEntries(b *testing.B) { defer log.Scope(b).Close(b) const kb = 1 << 10 @@ -48,23 +52,25 @@ func BenchmarkLogStore_StoreEntries(b *testing.B) { } func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) { + ctx := context.Background() const tenMB = 10 * 1 << 20 ec := raftentry.NewCache(tenMB) const rangeID = 1 eng := storage.NewDefaultInMemForTesting() defer eng.Close() + st := cluster.MakeTestingClusterSettings() + enableNonBlockingRaftLogSync.Override(ctx, &st.SV, false) s := LogStore{ RangeID: rangeID, Engine: eng, StateLoader: NewStateLoader(rangeID), EntryCache: ec, - Settings: cluster.MakeTestingClusterSettings(), + Settings: st, Metrics: Metrics{ RaftLogCommitLatency: metric.NewHistogram(metric.Metadata{}, 10*time.Second, metric.IOLatencyBuckets), }, } - ctx := context.Background() rs := RaftState{ LastTerm: 1, ByteSize: 0, @@ -89,17 +95,13 @@ func runBenchmarkLogStore_StoreEntries(b *testing.B, bytes int64) { batch := &discardBatch{} for i := 0; i < b.N; i++ { batch.Batch = newStoreEntriesBatch(eng) - rd := Ready{ - HardState: raftpb.HardState{}, - Entries: ents, - MustSync: true, - } + m := MsgStorageAppend{Entries: ents} + cb := noopSyncCallback{} var err error - rs, err = s.storeEntriesAndCommitBatch(ctx, rs, rd, stats, batch) + rs, err = s.storeEntriesAndCommitBatch(ctx, rs, m, cb, stats, batch) if err != nil { b.Fatal(err) } - batch.Batch.Close() ents[0].Index++ } require.EqualValues(b, b.N, rs.LastIndex) diff --git a/pkg/kv/kvserver/logstore/sync_waiter.go b/pkg/kv/kvserver/logstore/sync_waiter.go new file mode 100644 index 000000000000..e62f482b8579 --- /dev/null +++ b/pkg/kv/kvserver/logstore/sync_waiter.go @@ -0,0 +1,120 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package logstore + +import ( + "context" + "time" + + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" + "github.com/cockroachdb/pebble/record" +) + +// syncWaiter is capable of waiting for a disk write to be durably committed. +type syncWaiter interface { + // SyncWait waits for the write to be durable. + SyncWait() error + // Close closes the syncWaiter and releases associated resources. + // Must be called after SyncWait returns. + Close() +} + +var _ syncWaiter = storage.Batch(nil) + +// SyncWaiterLoop waits on a sequence of in-progress disk writes, notifying +// callbacks when their corresponding disk writes have completed. +// Invariant: The callbacks are notified in the order that they were enqueued +// and without concurrency. +type SyncWaiterLoop struct { + q chan syncBatch + stopped chan struct{} + + logEveryEnqueueBlocked log.EveryN +} + +type syncBatch struct { + wg syncWaiter + cb func() +} + +// NewSyncWaiterLoop constructs a SyncWaiterLoop. It must be Started before use. +func NewSyncWaiterLoop() *SyncWaiterLoop { + return &SyncWaiterLoop{ + // We size the waiter loop's queue to the same size as Pebble's sync + // concurrency. This is the maximum number of pending syncWaiter's that + // pebble allows. + q: make(chan syncBatch, record.SyncConcurrency), + stopped: make(chan struct{}), + logEveryEnqueueBlocked: log.Every(1 * time.Second), + } +} + +// Start launches the loop. +func (w *SyncWaiterLoop) Start(ctx context.Context, stopper *stop.Stopper) { + _ = stopper.RunAsyncTaskEx(ctx, + stop.TaskOpts{ + TaskName: "raft-logstore-sync-waiter-loop", + // This task doesn't reference a parent because it runs for the server's + // lifetime. + SpanOpt: stop.SterileRootSpan, + }, + func(ctx context.Context) { + w.waitLoop(ctx, stopper) + }) +} + +// waitLoop pulls off the SyncWaiterLoop's queue. For each syncWaiter, it waits +// for the sync to complete and then calls the associated callback. +func (w *SyncWaiterLoop) waitLoop(ctx context.Context, stopper *stop.Stopper) { + defer close(w.stopped) + for { + select { + case w := <-w.q: + if err := w.wg.SyncWait(); err != nil { + log.Fatalf(ctx, "SyncWait error: %+v", err) + } + w.wg.Close() + w.cb() + case <-stopper.ShouldQuiesce(): + return + } + } +} + +// enqueue registers the syncWaiter with the SyncWaiterLoop. The provided +// callback will be called once the syncWaiter's associated disk write has been +// durably committed. It may never be called in case the stopper stops. +// +// The syncWaiter will be Closed after its SyncWait method completes. It must +// not be Closed by the caller. +// +// If the SyncWaiterLoop has already been stopped, the callback will never be +// called. +func (w *SyncWaiterLoop) enqueue(ctx context.Context, wg syncWaiter, cb func()) { + b := syncBatch{wg, cb} + select { + case w.q <- b: + case <-w.stopped: + default: + if w.logEveryEnqueueBlocked.ShouldLog() { + // NOTE: we don't expect to hit this because we size the enqueue channel + // with enough capacity to hold as many in-progress sync operations as + // Pebble allows (pebble/record.SyncConcurrency). + log.Warningf(ctx, "SyncWaiterLoop.enqueue blocking due to insufficient channel capacity") + } + select { + case w.q <- b: + case <-w.stopped: + } + } +} diff --git a/pkg/kv/kvserver/logstore/sync_waiter_test.go b/pkg/kv/kvserver/logstore/sync_waiter_test.go new file mode 100644 index 000000000000..ffc12492933f --- /dev/null +++ b/pkg/kv/kvserver/logstore/sync_waiter_test.go @@ -0,0 +1,93 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package logstore + +import ( + "context" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/stop" +) + +func TestSyncWaiterLoop(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + stopper := stop.NewStopper() + w := NewSyncWaiterLoop() + w.Start(ctx, stopper) + + // Enqueue a waiter while the loop is running. + c := make(chan struct{}) + wg1 := make(chanSyncWaiter) + w.enqueue(ctx, wg1, func() { close(c) }) + + // Callback is not called before SyncWait completes. + select { + case <-c: + t.Fatal("callback unexpectedly called before SyncWait") + case <-time.After(5 * time.Millisecond): + } + + // Callback is called after SyncWait completes. + close(wg1) + <-c + + // Enqueue a waiter once the loop is stopped. Enqueuing should not block, + // regardless of how many times it is called. + stopper.Stop(ctx) + wg2 := make(chanSyncWaiter) + for i := 0; i < 2*cap(w.q); i++ { + w.enqueue(ctx, wg2, func() { t.Fatalf("callback unexpectedly called") }) + } + + // Callback should not be called, even after SyncWait completes. + // NB: stopper.Stop waits for the waitLoop to exit. + time.Sleep(5 * time.Millisecond) // give time to catch bugs + close(wg2) + time.Sleep(5 * time.Millisecond) // give time to catch bugs +} + +func BenchmarkSyncWaiterLoop(b *testing.B) { + ctx := context.Background() + stopper := stop.NewStopper() + defer stopper.Stop(ctx) + w := NewSyncWaiterLoop() + w.Start(ctx, stopper) + + // Pre-allocate a syncWaiter, notification channel, and callback function that + // can all be re-used across benchmark iterations so we can isolate the + // performance of operations inside the SyncWaiterLoop. + wg := make(chanSyncWaiter) + c := make(chan struct{}) + cb := func() { c <- struct{}{} } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + w.enqueue(ctx, wg, cb) + wg <- struct{}{} + <-c + } +} + +// chanSyncWaiter implements the syncWaiter interface. +type chanSyncWaiter chan struct{} + +func (c chanSyncWaiter) SyncWait() error { + <-c + return nil +} + +func (c chanSyncWaiter) Close() {} diff --git a/pkg/kv/kvserver/replica.go b/pkg/kv/kvserver/replica.go index 16ed5104010e..eb72b9c650e2 100644 --- a/pkg/kv/kvserver/replica.go +++ b/pkg/kv/kvserver/replica.go @@ -55,6 +55,7 @@ import ( "github.com/cockroachdb/redact" "github.com/kr/pretty" "go.etcd.io/raft/v3" + "go.etcd.io/raft/v3/raftpb" "go.etcd.io/raft/v3/tracker" ) @@ -260,6 +261,18 @@ type Replica struct { decoder replicaDecoder } + // localMsgs contains a collection of raftpb.Message that target the local + // RawNode. They are to be delivered on the next iteration of handleRaftReady. + // + // Locking notes: + // - Replica.localMsgs must be held to append messages to active. + // - Replica.raftMu and Replica.localMsgs must both be held to switch slices. + // - Replica.raftMu < Replica.localMsgs + localMsgs struct { + syncutil.Mutex + active, recycled []raftpb.Message + } + // The last seen replica descriptors from incoming Raft messages. These are // stored so that the replica still knows the replica descriptors for itself // and for its message recipients in the circumstances when its RangeDescriptor @@ -384,11 +397,11 @@ type Replica struct { mergeTxnID uuid.UUID // The state of the Raft state machine. state kvserverpb.ReplicaState - // Last index/term persisted to the raft log (not necessarily - // committed). Note that lastTerm may be 0 (and thus invalid) even when - // lastIndex is known, in which case the term will have to be retrieved - // from the Raft log entry. Use the invalidLastTerm constant for this - // case. + // Last index/term written to the raft log (not necessarily durable + // locally or committed by the group). Note that lastTerm may be 0 (and + // thus invalid) even when lastIndex is known, in which case the term + // will have to be retrieved from the Raft log entry. Use the + // invalidLastTerm constant for this case. lastIndex, lastTerm uint64 // A map of raft log index of pending snapshots to deadlines. // Used to prohibit raft log truncations that would leave a gap between diff --git a/pkg/kv/kvserver/replica_raft.go b/pkg/kv/kvserver/replica_raft.go index f09bdf33cdac..10af3c9a13c6 100644 --- a/pkg/kv/kvserver/replica_raft.go +++ b/pkg/kv/kvserver/replica_raft.go @@ -610,7 +610,11 @@ func (s handleRaftReadyStats) SafeFormat(p redact.SafePrinter, _ rune) { { var sync redact.SafeString if s.append.Sync { - sync = "-sync" + if s.append.NonBlocking { + sync = "-non-blocking-sync" + } else { + sync = "-sync" + } } p.Printf("raft ready handling: %.2fs [append=%.2fs, apply=%.2fs, commit-batch%s=%.2fs", dTotal.Seconds(), dAppend.Seconds(), dApply.Seconds(), sync, dPebble.Seconds()) @@ -625,6 +629,9 @@ func (s handleRaftReadyStats) SafeFormat(p redact.SafePrinter, _ rune) { ) if s.append.Sync { p.SafeString(" sync") + if s.append.NonBlocking { + p.SafeString("(non-blocking)") + } } p.SafeString(" [") @@ -707,7 +714,9 @@ func (r *Replica) handleRaftReadyRaftMuLocked( } var hasReady bool - var rd raft.Ready + var softState *raft.SoftState + var outboundMsgs []raftpb.Message + var msgStorageAppend, msgStorageApply raftpb.Message r.mu.Lock() state := logstore.RaftState{ // used for append below LastIndex: r.mu.lastIndex, @@ -717,12 +726,18 @@ func (r *Replica) handleRaftReadyRaftMuLocked( leaderID := r.mu.leaderID lastLeaderID := leaderID err := r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { + r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup) + numFlushed, err := r.mu.proposalBuf.FlushLockedWithRaftGroup(ctx, raftGroup) if err != nil { return false, err } if hasReady = raftGroup.HasReady(); hasReady { - rd = raftGroup.Ready() + syncRd := raftGroup.Ready() + logRaftReady(ctx, syncRd) + asyncRd := makeAsyncReady(syncRd) + softState = asyncRd.SoftState + outboundMsgs, msgStorageAppend, msgStorageApply = splitLocalStorageMsgs(asyncRd.Messages) } // We unquiesce if we have a Ready (= there's work to do). We also have // to unquiesce if we just flushed some proposals but there isn't a @@ -739,7 +754,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( unquiesceAndWakeLeader := hasReady || numFlushed > 0 || len(r.mu.proposals) > 0 return unquiesceAndWakeLeader, nil }) - r.mu.applyingEntries = len(rd.CommittedEntries) > 0 + r.mu.applyingEntries = hasMsg(msgStorageApply) pausedFollowers := r.mu.pausedFollowers r.mu.Unlock() if errors.Is(err, errRemoved) { @@ -761,10 +776,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, nil } - logRaftReady(ctx, rd) - refreshReason := noReason - if rd.SoftState != nil && leaderID != roachpb.ReplicaID(rd.SoftState.Lead) { + if softState != nil && leaderID != roachpb.ReplicaID(softState.Lead) { // Refresh pending commands if the Raft leader has changed. This is usually // the first indication we have of a new leader on a restarted node. // @@ -773,17 +786,68 @@ func (r *Replica) handleRaftReadyRaftMuLocked( // indicating a newly elected leader or a conf change. Replay protection // prevents any corruption, so the waste is only a performance issue. if log.V(3) { - log.Infof(ctx, "raft leader changed: %d -> %d", leaderID, rd.SoftState.Lead) + log.Infof(ctx, "raft leader changed: %d -> %d", leaderID, softState.Lead) } if !r.store.TestingKnobs().DisableRefreshReasonNewLeader { refreshReason = reasonNewLeader } - leaderID = roachpb.ReplicaID(rd.SoftState.Lead) + leaderID = roachpb.ReplicaID(softState.Lead) } - if inSnap.Desc != nil { - if !raft.IsEmptySnap(rd.Snapshot) { - snapUUID, err := uuid.FromBytes(rd.Snapshot.Data) + r.traceMessageSends(outboundMsgs, "sending messages") + r.sendRaftMessages(ctx, outboundMsgs, pausedFollowers) + + // If the ready struct includes entries that have been committed, these + // entries will be applied to the Replica's replicated state machine down + // below, after appending new entries to the raft log and sending messages + // to peers. However, the process of appending new entries to the raft log + // and then applying committed entries to the state machine can take some + // time - and these entries are already durably committed. If they have + // clients waiting on them, we'd like to acknowledge their success as soon + // as possible. To facilitate this, we take a quick pass over the committed + // entries and acknowledge as many as we can trivially prove will not be + // rejected beneath raft. + // + // Note that the CommittedEntries slice may contain entries that are also in + // the Entries slice (to be appended in this ready pass). This can happen when + // a follower is being caught up on committed commands. We could acknowledge + // these commands early even though they aren't durably in the local raft log + // yet (since they're committed via a quorum elsewhere), but we chose to be + // conservative and avoid it by passing the last Ready cycle's `lastIndex` for + // the maxIndex argument to AckCommittedEntriesBeforeApplication. + // + // TODO(nvanbenschoten): this is less important with async storage writes. + // Consider getting rid of it. + sm := r.getStateMachine() + dec := r.getDecoder() + var appTask apply.Task + if hasMsg(msgStorageApply) { + appTask = apply.MakeTask(sm, dec) + appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize) + defer appTask.Close() + if err := appTask.Decode(ctx, msgStorageApply.Entries); err != nil { + return stats, err + } + if knobs := r.store.TestingKnobs(); knobs == nil || !knobs.DisableCanAckBeforeApplication { + if err := appTask.AckCommittedEntriesBeforeApplication(ctx, state.LastIndex); err != nil { + return stats, err + } + } + } + + if hasMsg(msgStorageAppend) { + if msgStorageAppend.Snapshot != nil { + if inSnap.Desc == nil { + // If we didn't expect Raft to have a snapshot but it has one + // regardless, that is unexpected and indicates a programming + // error. + return stats, errors.AssertionFailedf( + "have inSnap=nil, but raft has a snapshot %s", + raft.DescribeSnapshot(*msgStorageAppend.Snapshot), + ) + } + + snapUUID, err := uuid.FromBytes(msgStorageAppend.Snapshot.Data) if err != nil { return stats, errors.Wrap(err, "invalid snapshot id") } @@ -794,6 +858,16 @@ func (r *Replica) handleRaftReadyRaftMuLocked( log.Fatalf(ctx, "incoming snapshot id doesn't match raft snapshot id: %s != %s", snapUUID, inSnap.SnapUUID) } + snap := *msgStorageAppend.Snapshot + hs := raftpb.HardState{ + Term: msgStorageAppend.Term, + Vote: msgStorageAppend.Vote, + Commit: msgStorageAppend.Commit, + } + if len(msgStorageAppend.Entries) != 0 { + log.Fatalf(ctx, "found Entries in MsgStorageAppend with non-empty Snapshot") + } + // Applying this snapshot may require us to subsume one or more of our right // neighbors. This occurs if this replica is informed about the merges via a // Raft snapshot instead of a MsgApp containing the merge commits, e.g., @@ -803,7 +877,7 @@ func (r *Replica) handleRaftReadyRaftMuLocked( defer releaseMergeLock() stats.tSnapBegin = timeutil.Now() - if err := r.applySnapshot(ctx, inSnap, rd.Snapshot, rd.HardState, subsumedRepls); err != nil { + if err := r.applySnapshot(ctx, inSnap, snap, hs, subsumedRepls); err != nil { return stats, errors.Wrap(err, "while applying snapshot") } stats.tSnapEnd = timeutil.Now() @@ -830,129 +904,36 @@ func (r *Replica) handleRaftReadyRaftMuLocked( refreshReason == noReason { refreshReason = reasonSnapshotApplied } - } - } else if !raft.IsEmptySnap(rd.Snapshot) { - // If we didn't expect Raft to have a snapshot but it has one - // regardless, that is unexpected and indicates a programming - // error. - return stats, errors.AssertionFailedf( - "have inSnap=nil, but raft has a snapshot %s", - raft.DescribeSnapshot(rd.Snapshot), - ) - } - - // If the ready struct includes entries that have been committed, these - // entries will be applied to the Replica's replicated state machine down - // below, after appending new entries to the raft log and sending messages - // to peers. However, the process of appending new entries to the raft log - // and then applying committed entries to the state machine can take some - // time - and these entries are already durably committed. If they have - // clients waiting on them, we'd like to acknowledge their success as soon - // as possible. To facilitate this, we take a quick pass over the committed - // entries and acknowledge as many as we can trivially prove will not be - // rejected beneath raft. - // - // Note that the CommittedEntries slice may contain entries that are also in - // the Entries slice (to be appended in this ready pass). This can happen when - // a follower is being caught up on committed commands. We could acknowledge - // these commands early even though they aren't durably in the local raft log - // yet (since they're committed via a quorum elsewhere), but we chose to be - // conservative and avoid it by passing the last Ready cycle's `lastIndex` for - // the maxIndex argument to AckCommittedEntriesBeforeApplication. - sm := r.getStateMachine() - dec := r.getDecoder() - appTask := apply.MakeTask(sm, dec) - appTask.SetMaxBatchSize(r.store.TestingKnobs().MaxApplicationBatchSize) - defer appTask.Close() - if err := appTask.Decode(ctx, rd.CommittedEntries); err != nil { - return stats, err - } - if knobs := r.store.TestingKnobs(); knobs == nil || !knobs.DisableCanAckBeforeApplication { - if err := appTask.AckCommittedEntriesBeforeApplication(ctx, state.LastIndex); err != nil { - return stats, err - } - } - // Separate the MsgApp messages from all other Raft message types so that we - // can take advantage of the optimization discussed in the Raft thesis under - // the section: `10.2.1 Writing to the leader’s disk in parallel`. The - // optimization suggests that instead of a leader writing new log entries to - // disk before replicating them to its followers, the leader can instead - // write the entries to disk in parallel with replicating to its followers - // and them writing to their disks. - // - // Here, we invoke this optimization by: - // 1. sending all MsgApps. - // 2. syncing all entries and Raft state to disk. - // 3. sending all other messages. - // - // Since this is all handled in handleRaftReadyRaftMuLocked, we're assured - // that even though we may sync new entries to disk after sending them in - // MsgApps to followers, we'll always have them synced to disk before we - // process followers' MsgAppResps for the corresponding entries because - // Ready processing is sequential (and because a restart of the leader would - // prevent the MsgAppResp from being handled by it). This is important - // because it makes sure that the leader always has all of the entries in - // the log for its term, which is required in etcd/raft for technical - // reasons[1]. - // - // MsgApps are also used to inform followers of committed entries through - // the Commit index that they contain. Due to the optimization described - // above, a Commit index may be sent out to a follower before it is - // persisted on the leader. This is safe because the Commit index can be - // treated as volatile state, as is supported by raft.MustSync[2]. - // Additionally, the Commit index can never refer to entries from the - // current Ready (due to the MsgAppResp argument above) except in - // single-node groups, in which as a result we have to be careful to not - // persist a Commit index without the entries its commit index might refer - // to (see the HardState update below for details). - // - // [1]: the Raft thesis states that this can be made safe: - // - // > The leader may even commit an entry before it has been written to its - // > own disk, if a majority of followers have written it to their disks; - // > this is still safe. - // - // [2]: Raft thesis section: `3.8 Persisted state and server restarts`: - // - // > Other state variables are safe to lose on a restart, as they can all be - // > recreated. The most interesting example is the commit index, which can - // > safely be reinitialized to zero on a restart. - // - // Note that this will change when joint quorums are implemented, at which - // point we have to introduce coupling between the Commit index and - // persisted config changes, and also require some commit indexes to be - // durably synced. - // See: - // https://github.com/etcd-io/etcd/issues/7625#issuecomment-489232411 - - msgApps, otherMsgs := splitMsgApps(rd.Messages) - r.traceMessageSends(msgApps, "sending msgApp") - r.sendRaftMessages(ctx, msgApps, pausedFollowers) - - // TODO(pavelkalinnikov): find a way to move it to storeEntries. - if !raft.IsEmptyHardState(rd.HardState) { - if !r.IsInitialized() && rd.HardState.Commit != 0 { - log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s. HS=%+v", r, rd.HardState) + // Send MsgStorageAppend's responses. + r.sendRaftMessages(ctx, msgStorageAppend.Responses, nil) + } else { + // TODO(pavelkalinnikov): find a way to move it to storeEntries. + if msgStorageAppend.Commit != 0 && !r.IsInitialized() { + log.Fatalf(ctx, "setting non-zero HardState.Commit on uninitialized replica %s", r) + } + // TODO(pavelkalinnikov): construct and store this in Replica. + // TODO(pavelkalinnikov): fields like raftEntryCache are the same across all + // ranges, so can be passed to LogStore methods instead of being stored in it. + s := logstore.LogStore{ + RangeID: r.RangeID, + Engine: r.store.engine, + Sideload: r.raftMu.sideloaded, + StateLoader: r.raftMu.stateLoader.StateLoader, + SyncWaiter: r.store.syncWaiter, + EntryCache: r.store.raftEntryCache, + Settings: r.store.cfg.Settings, + Metrics: logstore.Metrics{ + RaftLogCommitLatency: r.store.metrics.RaftLogCommitLatency, + }, + } + m := logstore.MakeMsgStorageAppend(msgStorageAppend) + cb := (*replicaSyncCallback)(r) + if state, err = s.StoreEntries(ctx, state, m, cb, &stats.append); err != nil { + return stats, errors.Wrap(err, "while storing log entries") + } } } - // TODO(pavelkalinnikov): construct and store this in Replica. - // TODO(pavelkalinnikov): fields like raftEntryCache are the same across all - // ranges, so can be passed to LogStore methods instead of being stored in it. - s := logstore.LogStore{ - RangeID: r.RangeID, - Engine: r.store.engine, - Sideload: r.raftMu.sideloaded, - StateLoader: r.raftMu.stateLoader.StateLoader, - EntryCache: r.store.raftEntryCache, - Settings: r.store.cfg.Settings, - Metrics: logstore.Metrics{ - RaftLogCommitLatency: r.store.metrics.RaftLogCommitLatency, - }, - } - if state, err = s.StoreEntries(ctx, state, logstore.MakeReady(rd), &stats.append); err != nil { - return stats, errors.Wrap(err, "while storing log entries") - } // Update protected state - last index, last term, raft log size, and raft // leader ID. @@ -977,11 +958,10 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.store.replicateQueue.MaybeAddAsync(ctx, r, r.store.Clock().NowAsClockTimestamp()) } - r.sendRaftMessages(ctx, otherMsgs, nil /* blocked */) - r.traceEntries(rd.CommittedEntries, "committed, before applying any entries") - stats.tApplicationBegin = timeutil.Now() - if len(rd.CommittedEntries) > 0 { + if hasMsg(msgStorageApply) { + r.traceEntries(msgStorageApply.Entries, "committed, before applying any entries") + err := appTask.ApplyCommittedEntries(ctx) stats.apply = sm.moveStats() if err != nil { @@ -1016,11 +996,14 @@ func (r *Replica) handleRaftReadyRaftMuLocked( refreshReason = reasonNewLeaderOrConfigChange } } + + // Send MsgStorageApply's responses. + r.sendRaftMessages(ctx, msgStorageApply.Responses, nil) } stats.tApplicationEnd = timeutil.Now() applicationElapsed := stats.tApplicationEnd.Sub(stats.tApplicationBegin).Nanoseconds() r.store.metrics.RaftApplyCommittedLatency.RecordValue(applicationElapsed) - r.store.metrics.RaftCommandsApplied.Inc(int64(len(rd.CommittedEntries))) + r.store.metrics.RaftCommandsApplied.Inc(int64(len(msgStorageApply.Entries))) if r.store.TestingKnobs().EnableUnconditionalRefreshesInRaftReady { refreshReason = reasonNewLeaderOrConfigChange } @@ -1037,7 +1020,8 @@ func (r *Replica) handleRaftReadyRaftMuLocked( r.mu.Lock() err = r.withRaftGroupLocked(true, func(raftGroup *raft.RawNode) (bool, error) { - raftGroup.Advance(rd) + r.deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked(ctx, raftGroup) + if stats.apply.numConfChangeEntries > 0 { // If the raft leader got removed, campaign the first remaining voter. // @@ -1077,18 +1061,71 @@ func (r *Replica) handleRaftReadyRaftMuLocked( return stats, nil } -// splitMsgApps splits the Raft message slice into two slices, one containing -// MsgApps and one containing all other message types. Each slice retains the -// relative ordering between messages in the original slice. -func splitMsgApps(msgs []raftpb.Message) (msgApps, otherMsgs []raftpb.Message) { - splitIdx := 0 - for i, msg := range msgs { - if msg.Type == raftpb.MsgApp { - msgs[i], msgs[splitIdx] = msgs[splitIdx], msgs[i] - splitIdx++ +// asyncReady encapsulates the messages that are ready to be sent to other peers +// or to be sent to local storage routines when async storage writes are enabled. +// All fields in asyncReady are read-only. +// TODO(nvanbenschoten): move this into go.etcd.io/raft. +type asyncReady struct { + // The current volatile state of a Node. + // SoftState will be nil if there is no update. + // It is not required to consume or store SoftState. + *raft.SoftState + + // ReadStates can be used for node to serve linearizable read requests locally + // when its applied index is greater than the index in ReadState. + // Note that the readState will be returned when raft receives msgReadIndex. + // The returned is only valid for the request that requested to read. + ReadStates []raft.ReadState + + // Messages specifies outbound messages to other peers and to local storage + // threads. These messages can be sent in any order. + // + // If it contains a MsgSnap message, the application MUST report back to raft + // when the snapshot has been received or has failed by calling ReportSnapshot. + Messages []raftpb.Message +} + +// makeAsyncReady constructs an asyncReady from the provided Ready. +func makeAsyncReady(rd raft.Ready) asyncReady { + return asyncReady{ + SoftState: rd.SoftState, + ReadStates: rd.ReadStates, + Messages: rd.Messages, + } +} + +// hasMsg returns whether the provided raftpb.Message is present. +// It serves as a poor man's Optional[raftpb.Message]. +func hasMsg(m raftpb.Message) bool { return m.Type != 0 } + +// splitLocalStorageMsgs filters out local storage messages from the provided +// message slice and returns them separately. +func splitLocalStorageMsgs( + msgs []raftpb.Message, +) (otherMsgs []raftpb.Message, msgStorageAppend, msgStorageApply raftpb.Message) { + for i := len(msgs) - 1; i >= 0; i-- { + switch msgs[i].Type { + case raftpb.MsgStorageAppend: + if hasMsg(msgStorageAppend) { + panic("two MsgStorageAppend") + } + msgStorageAppend = msgs[i] + case raftpb.MsgStorageApply: + if hasMsg(msgStorageApply) { + panic("two MsgStorageApply") + } + msgStorageApply = msgs[i] + default: + // Local storage messages will always be at the end of the messages slice, + // so we can terminate iteration as soon as we reach any other message + // type. This is leaking an implementation detail from etcd/raft which may + // not always hold, but while it does, we use it for convenience and + // assert against it changing in sendRaftMessages. + return msgs[:i+1], msgStorageAppend, msgStorageApply } } - return msgs[:splitIdx], msgs[splitIdx:] + // Only local storage messages. + return nil, msgStorageAppend, msgStorageApply } // maybeFatalOnRaftReadyErr will fatal if err is neither nil nor @@ -1414,87 +1451,160 @@ func (r *Replica) maybeCoalesceHeartbeat( return true } +// replicaSyncCallback implements the logstore.SyncCallback interface. +type replicaSyncCallback Replica + +func (r *replicaSyncCallback) OnLogSync(ctx context.Context, msgs []raftpb.Message) { + // Send MsgStorageAppend's responses. + (*Replica)(r).sendRaftMessages(ctx, msgs, nil /* blocked */) +} + func (r *Replica) sendRaftMessages( ctx context.Context, messages []raftpb.Message, blocked map[roachpb.ReplicaID]struct{}, ) { var lastAppResp raftpb.Message for _, message := range messages { - _, drop := blocked[roachpb.ReplicaID(message.To)] - if drop { - r.store.Metrics().RaftPausedFollowerDroppedMsgs.Inc(1) - } - switch message.Type { - case raftpb.MsgApp: - if util.RaceEnabled { - // Iterate over the entries to assert that all sideloaded commands - // are already inlined. replicaRaftStorage.Entries already performs - // the sideload inlining for stable entries and raft.unstable always - // contain fat entries. Since these are the only two sources that - // raft.sendAppend gathers entries from to populate MsgApps, we - // should never see thin entries here. + switch message.To { + case raft.LocalAppendThread: + // To local append thread. + // NOTE: we don't currently split append work off into an async goroutine. + // Instead, we handle messages to LocalAppendThread inline on the raft + // scheduler goroutine, so this code path is unused. + panic("unsupported, currently processed inline on raft scheduler goroutine") + case raft.LocalApplyThread: + // To local apply thread. + // NOTE: we don't currently split apply work off into an async goroutine. + // Instead, we handle messages to LocalAppendThread inline on the raft + // scheduler goroutine, so this code path is unused. + panic("unsupported, currently processed inline on raft scheduler goroutine") + case uint64(r.ReplicaID()): + // To local raft state machine, from local storage append and apply work. + // NOTE: For async Raft log appends, these messages come from calls to + // replicaSyncCallback.OnLogSync. For other local storage work (log + // application and snapshot application), these messages come from + // Replica.handleRaftReadyRaftMuLocked. + r.sendLocalRaftMsg(message) + default: + _, drop := blocked[roachpb.ReplicaID(message.To)] + if drop { + r.store.Metrics().RaftPausedFollowerDroppedMsgs.Inc(1) + } + switch message.Type { + case raftpb.MsgApp: + if util.RaceEnabled { + // Iterate over the entries to assert that all sideloaded commands + // are already inlined. replicaRaftStorage.Entries already performs + // the sideload inlining for stable entries and raft.unstable always + // contain fat entries. Since these are the only two sources that + // raft.sendAppend gathers entries from to populate MsgApps, we + // should never see thin entries here. + // + // Also assert that the log term only ever increases (most of the + // time it stays constant, as term changes are rare), and that + // the index increases by exactly one with each entry. + // + // This assertion came out of #61990. + prevTerm := message.LogTerm // term of entry preceding the append + prevIndex := message.Index // index of entry preceding the append + for j := range message.Entries { + ent := &message.Entries[j] + logstore.AssertSideloadedRaftCommandInlined(ctx, ent) + + if prevIndex+1 != ent.Index { + log.Fatalf(ctx, + "index gap in outgoing MsgApp: idx %d followed by %d", + prevIndex, ent.Index, + ) + } + prevIndex = ent.Index + if prevTerm > ent.Term { + log.Fatalf(ctx, + "term regression in outgoing MsgApp: idx %d at term=%d "+ + "appended with logterm=%d", + ent.Index, ent.Term, message.LogTerm, + ) + } + prevTerm = ent.Term + } + } + + case raftpb.MsgAppResp: + // A successful (non-reject) MsgAppResp contains one piece of + // information: the highest log index. Raft currently queues up + // one MsgAppResp per incoming MsgApp, and we may process + // multiple messages in one handleRaftReady call (because + // multiple messages may arrive while we're blocked syncing to + // disk). If we get redundant MsgAppResps, drop all but the + // last (we've seen that too many MsgAppResps can overflow + // message queues on the receiving side). // - // Also assert that the log term only ever increases (most of the - // time it stays constant, as term changes are rare), and that - // the index increases by exactly one with each entry. + // Note that this reorders the chosen MsgAppResp relative to + // other messages (including any MsgAppResps with the Reject flag), + // but raft is fine with this reordering. // - // This assertion came out of #61990. - prevTerm := message.LogTerm // term of entry preceding the append - prevIndex := message.Index // index of entry preceding the append - for j := range message.Entries { - ent := &message.Entries[j] - logstore.AssertSideloadedRaftCommandInlined(ctx, ent) - - if prevIndex+1 != ent.Index { - log.Fatalf(ctx, - "index gap in outgoing MsgApp: idx %d followed by %d", - prevIndex, ent.Index, - ) - } - prevIndex = ent.Index - if prevTerm > ent.Term { - log.Fatalf(ctx, - "term regression in outgoing MsgApp: idx %d at term=%d "+ - "appended with logterm=%d", - ent.Index, ent.Term, message.LogTerm, - ) - } - prevTerm = ent.Term + // TODO(bdarnell): Consider pushing this optimization into etcd/raft. + // Similar optimizations may be possible for other message types, + // although MsgAppResp is the only one that has been seen as a + // problem in practice. + if !message.Reject && message.Index > lastAppResp.Index { + lastAppResp = message + drop = true } } - case raftpb.MsgAppResp: - // A successful (non-reject) MsgAppResp contains one piece of - // information: the highest log index. Raft currently queues up - // one MsgAppResp per incoming MsgApp, and we may process - // multiple messages in one handleRaftReady call (because - // multiple messages may arrive while we're blocked syncing to - // disk). If we get redundant MsgAppResps, drop all but the - // last (we've seen that too many MsgAppResps can overflow - // message queues on the receiving side). - // - // Note that this reorders the chosen MsgAppResp relative to - // other messages (including any MsgAppResps with the Reject flag), - // but raft is fine with this reordering. - // - // TODO(bdarnell): Consider pushing this optimization into etcd/raft. - // Similar optimizations may be possible for other message types, - // although MsgAppResp is the only one that has been seen as a - // problem in practice. - if !message.Reject && message.Index > lastAppResp.Index { - lastAppResp = message - drop = true + if !drop { + r.sendRaftMessage(ctx, message) } } - - if !drop { - r.sendRaftMessage(ctx, message) - } } if lastAppResp.Index > 0 { r.sendRaftMessage(ctx, lastAppResp) } } +// sendLocalRaftMsg sends a message to the local raft state machine. +func (r *Replica) sendLocalRaftMsg(msg raftpb.Message) { + if msg.To != uint64(r.ReplicaID()) { + panic("incorrect message target") + } + r.localMsgs.Lock() + wasEmpty := len(r.localMsgs.active) == 0 + r.localMsgs.active = append(r.localMsgs.active, msg) + r.localMsgs.Unlock() + if wasEmpty { + r.store.enqueueRaftUpdateCheck(r.RangeID) + } +} + +// deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked delivers local messages to +// the provided raw node. +func (r *Replica) deliverLocalRaftMsgsRaftMuLockedReplicaMuLocked( + ctx context.Context, raftGroup *raft.RawNode, +) { + r.raftMu.AssertHeld() + r.mu.AssertHeld() + r.localMsgs.Lock() + localMsgs := r.localMsgs.active + r.localMsgs.active, r.localMsgs.recycled = r.localMsgs.recycled, r.localMsgs.active[:0] + // Don't recycle large slices. + if cap(r.localMsgs.recycled) > 16 { + r.localMsgs.recycled = nil + } + r.localMsgs.Unlock() + + for i, m := range localMsgs { + if err := raftGroup.Step(m); err != nil { + log.Fatalf(ctx, "unexpected error stepping local raft message [%s]: %v", + raftDescribeMessage(m, raftEntryFormatter), err) + } + // NB: we can reset messages in the localMsgs.recycled slice without holding + // the localMsgs mutex because no-one ever writes to localMsgs.recycled abd + // we are holding raftMu, which must be held to switch localMsgs.active and + // localMsgs.recycled. + localMsgs[i].Reset() // for GC + } +} + // sendRaftMessage sends a Raft message. // // When calling this method, the raftMu may be held, but it does not need to be. diff --git a/pkg/kv/kvserver/replica_raftstorage.go b/pkg/kv/kvserver/replica_raftstorage.go index ae01168ee7a4..22cef1a6f55d 100644 --- a/pkg/kv/kvserver/replica_raftstorage.go +++ b/pkg/kv/kvserver/replica_raftstorage.go @@ -95,6 +95,8 @@ func (r *replicaRaftStorage) InitialState() (raftpb.HardState, raftpb.ConfState, // maxBytes. Sideloaded proposals count towards maxBytes with their payloads inlined. // Entries requires that r.mu is held for writing because it requires exclusive // access to r.mu.stateLoader. +// +// Entries can return log entries that are not yet stable in durable storage. func (r *replicaRaftStorage) Entries(lo, hi, maxBytes uint64) ([]raftpb.Entry, error) { ctx := r.AnnotateCtx(context.TODO()) if r.raftMu.sideloaded == nil { diff --git a/pkg/kv/kvserver/replica_test.go b/pkg/kv/kvserver/replica_test.go index bdd2cf0d3557..3fae219f750b 100644 --- a/pkg/kv/kvserver/replica_test.go +++ b/pkg/kv/kvserver/replica_test.go @@ -9881,99 +9881,6 @@ func TestApplyPaginatedCommittedEntries(t *testing.T) { } } -func TestSplitMsgApps(t *testing.T) { - defer leaktest.AfterTest(t)() - defer log.Scope(t).Close(t) - - msgApp := func(idx uint64) raftpb.Message { - return raftpb.Message{Index: idx, Type: raftpb.MsgApp} - } - otherMsg := func(idx uint64) raftpb.Message { - return raftpb.Message{Index: idx, Type: raftpb.MsgVote} - } - formatMsgs := func(msgs []raftpb.Message) string { - strs := make([]string, len(msgs)) - for i, msg := range msgs { - strs[i] = fmt.Sprintf("{%s:%d}", msg.Type, msg.Index) - } - return fmt.Sprint(strs) - } - - testCases := []struct { - msgsIn, msgAppsOut, otherMsgsOut []raftpb.Message - }{ - // No msgs. - { - msgsIn: []raftpb.Message{}, - msgAppsOut: []raftpb.Message{}, - otherMsgsOut: []raftpb.Message{}, - }, - // Only msgApps. - { - msgsIn: []raftpb.Message{msgApp(1)}, - msgAppsOut: []raftpb.Message{msgApp(1)}, - otherMsgsOut: []raftpb.Message{}, - }, - { - msgsIn: []raftpb.Message{msgApp(1), msgApp(2)}, - msgAppsOut: []raftpb.Message{msgApp(1), msgApp(2)}, - otherMsgsOut: []raftpb.Message{}, - }, - { - msgsIn: []raftpb.Message{msgApp(2), msgApp(1)}, - msgAppsOut: []raftpb.Message{msgApp(2), msgApp(1)}, - otherMsgsOut: []raftpb.Message{}, - }, - // Only otherMsgs. - { - msgsIn: []raftpb.Message{otherMsg(1)}, - msgAppsOut: []raftpb.Message{}, - otherMsgsOut: []raftpb.Message{otherMsg(1)}, - }, - { - msgsIn: []raftpb.Message{otherMsg(1), otherMsg(2)}, - msgAppsOut: []raftpb.Message{}, - otherMsgsOut: []raftpb.Message{otherMsg(1), otherMsg(2)}, - }, - { - msgsIn: []raftpb.Message{otherMsg(2), otherMsg(1)}, - msgAppsOut: []raftpb.Message{}, - otherMsgsOut: []raftpb.Message{otherMsg(2), otherMsg(1)}, - }, - // Mixed msgApps and otherMsgs. - { - msgsIn: []raftpb.Message{msgApp(1), otherMsg(2)}, - msgAppsOut: []raftpb.Message{msgApp(1)}, - otherMsgsOut: []raftpb.Message{otherMsg(2)}, - }, - { - msgsIn: []raftpb.Message{otherMsg(1), msgApp(2)}, - msgAppsOut: []raftpb.Message{msgApp(2)}, - otherMsgsOut: []raftpb.Message{otherMsg(1)}, - }, - { - msgsIn: []raftpb.Message{msgApp(1), otherMsg(2), msgApp(3)}, - msgAppsOut: []raftpb.Message{msgApp(1), msgApp(3)}, - otherMsgsOut: []raftpb.Message{otherMsg(2)}, - }, - { - msgsIn: []raftpb.Message{otherMsg(1), msgApp(2), otherMsg(3)}, - msgAppsOut: []raftpb.Message{msgApp(2)}, - otherMsgsOut: []raftpb.Message{otherMsg(1), otherMsg(3)}, - }, - } - for _, c := range testCases { - inStr := formatMsgs(c.msgsIn) - t.Run(inStr, func(t *testing.T) { - msgAppsRes, otherMsgsRes := splitMsgApps(c.msgsIn) - if !reflect.DeepEqual(msgAppsRes, c.msgAppsOut) || !reflect.DeepEqual(otherMsgsRes, c.otherMsgsOut) { - t.Errorf("expected splitMsgApps(%s)=%s/%s, found %s/%s", inStr, formatMsgs(c.msgAppsOut), - formatMsgs(c.otherMsgsOut), formatMsgs(msgAppsRes), formatMsgs(otherMsgsRes)) - } - }) - } -} - type testQuiescer struct { desc roachpb.RangeDescriptor numProposals int diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index e9c90106b427..fc2546734a78 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -46,6 +46,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/logstore" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/multiqueue" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/raftentry" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/rangefeed" @@ -270,6 +271,7 @@ func newRaftConfig( return &raft.Config{ ID: id, Applied: appliedIndex, + AsyncStorageWrites: true, ElectionTick: storeCfg.RaftElectionTimeoutTicks, HeartbeatTick: storeCfg.RaftHeartbeatIntervalTicks, MaxUncommittedEntriesSize: storeCfg.RaftMaxUncommittedEntriesSize, @@ -745,6 +747,7 @@ type Store struct { metrics *StoreMetrics intentResolver *intentresolver.IntentResolver recoveryMgr txnrecovery.Manager + syncWaiter *logstore.SyncWaiterLoop raftEntryCache *raftentry.Cache limiters batcheval.Limiters txnWaitMetrics *txnwait.Metrics @@ -1248,6 +1251,8 @@ func NewStore( s.scheduler = newRaftScheduler(cfg.AmbientCtx, s.metrics, s, storeSchedulerConcurrency, cfg.RaftElectionTimeoutTicks) + s.syncWaiter = logstore.NewSyncWaiterLoop() + s.raftEntryCache = raftentry.NewCache(cfg.RaftEntryCacheSize) s.metrics.registry.AddMetricStruct(s.raftEntryCache.Metrics()) diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 60d543d71685..90c93262c610 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -724,6 +724,8 @@ func (s *Store) processRaft(ctx context.Context) { s.cfg.Transport.Stop(s.StoreID()) })) + s.syncWaiter.Start(ctx, s.stopper) + // We'll want to cancel all in-flight proposals. Proposals embed tracing // spans in them, and we don't want to be leaking any. s.stopper.AddCloser(stop.CloserFn(func() {