From 11cdc9a2daf92c5c6437357aa21f745b21a0224a Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Wed, 21 Sep 2022 12:10:56 +0100 Subject: [PATCH] kvserver: align Raft recv/send queue sizes Fixes #87465 Release justification: performance fix Release note: Made sending and receiving Raft queue sizes match. Previously the receiver could unnecessarily drop messages in situations when the sending queue is bigger than the receiving one. --- pkg/base/config.go | 4 ++++ pkg/kv/kvserver/store.go | 8 +++++--- pkg/kv/kvserver/store_raft.go | 15 +++++++++------ pkg/kv/kvserver/store_raft_test.go | 6 +++--- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/pkg/base/config.go b/pkg/base/config.go index abb4af017aed..e7a8a6b9a238 100644 --- a/pkg/base/config.go +++ b/pkg/base/config.go @@ -361,6 +361,10 @@ type RaftConfig struct { // without acknowledgement. With an average entry size of 1 KB that // translates to ~4096 commands that might be executed in the handling of a // single raft.Ready operation. + // + // This setting is used both by sending and receiving end of Raft messages. To + // minimize dropped messages on the receiver, its size should at least match + // the sender's (being it the default size, or taken from the env variables). RaftMaxInflightMsgs int // Splitting a range which has a replica needing a snapshot results in two diff --git a/pkg/kv/kvserver/store.go b/pkg/kv/kvserver/store.go index bba930e7aa83..ca9905acf4ce 100644 --- a/pkg/kv/kvserver/store.go +++ b/pkg/kv/kvserver/store.go @@ -98,9 +98,11 @@ const ( // store's Raft log entry cache. defaultRaftEntryCacheSize = 1 << 24 // 16M - // replicaRequestQueueSize specifies the maximum number of requests to queue - // for a replica. - replicaRequestQueueSize = 100 + // replicaQueueExtraSize is the number of requests that a replica's incoming + // message queue can keep over RaftConfig.RaftMaxInflightMsgs. When the leader + // maxes out RaftMaxInflightMsgs, we want the receiving replica to still have + // some buffer for other messages, primarily heartbeats. + replicaQueueExtraSize = 10 defaultGossipWhenCapacityDeltaExceedsFraction = 0.01 diff --git a/pkg/kv/kvserver/store_raft.go b/pkg/kv/kvserver/store_raft.go index 10796a5eca12..257ab1f8acce 100644 --- a/pkg/kv/kvserver/store_raft.go +++ b/pkg/kv/kvserver/store_raft.go @@ -49,7 +49,8 @@ type raftReceiveQueue struct { syncutil.Mutex infos []raftRequestInfo } - acc mon.BoundAccount + maxLen int + acc mon.BoundAccount } // Len returns the number of requests in the queue. @@ -109,7 +110,7 @@ func (q *raftReceiveQueue) Append( size = int64(req.Size()) q.mu.Lock() defer q.mu.Unlock() - if q.mu.destroyed || len(q.mu.infos) >= replicaRequestQueueSize { + if q.mu.destroyed || len(q.mu.infos) >= q.maxLen { return false, size, false } if q.acc.Grow(context.Background(), size) != nil { @@ -136,13 +137,12 @@ func (qs *raftReceiveQueues) Load(rangeID roachpb.RangeID) (*raftReceiveQueue, b } func (qs *raftReceiveQueues) LoadOrCreate( - rangeID roachpb.RangeID, + rangeID roachpb.RangeID, maxLen int, ) (_ *raftReceiveQueue, loaded bool) { - if q, ok := qs.Load(rangeID); ok { return q, ok // fast path } - q := &raftReceiveQueue{} + q := &raftReceiveQueue{maxLen: maxLen} q.acc.Init(context.Background(), qs.mon) value, loaded := qs.m.LoadOrStore(int64(rangeID), unsafe.Pointer(q)) return (*raftReceiveQueue)(value), loaded @@ -303,7 +303,10 @@ func (s *Store) HandleRaftUncoalescedRequest( // count them. s.metrics.RaftRcvdMessages[req.Message.Type].Inc(1) - q, _ := s.raftRecvQueues.LoadOrCreate(req.RangeID) + // NB: add a buffer for extra messages, to allow heartbeats getting through + // even if MsgApp quota is maxed out by the sender. + q, _ := s.raftRecvQueues.LoadOrCreate(req.RangeID, + s.cfg.RaftMaxInflightMsgs+replicaQueueExtraSize) enqueue, size, appended := q.Append(req, respStream) if !appended { // TODO(peter): Return an error indicating the request was dropped. Note diff --git a/pkg/kv/kvserver/store_raft_test.go b/pkg/kv/kvserver/store_raft_test.go index 84a150d05301..9b84ac77d44a 100644 --- a/pkg/kv/kvserver/store_raft_test.go +++ b/pkg/kv/kvserver/store_raft_test.go @@ -44,11 +44,11 @@ func TestRaftReceiveQueue(t *testing.T) { qs.Load(r5) require.Zero(t, m.AllocBytes()) - q1, loaded := qs.LoadOrCreate(r1) + q1, loaded := qs.LoadOrCreate(r1, 10 /* maxLen */) require.Zero(t, m.AllocBytes()) require.False(t, loaded) { - q1x, loadedx := qs.LoadOrCreate(r1) + q1x, loadedx := qs.LoadOrCreate(r1, 10 /* maxLen */) require.True(t, loadedx) require.Equal(t, q1, q1x) } @@ -99,7 +99,7 @@ func TestRaftReceiveQueue(t *testing.T) { } // Now interleave creation of a second queue. - q5, loaded := qs.LoadOrCreate(r5) + q5, loaded := qs.LoadOrCreate(r5, 1 /* maxLen */) { require.False(t, loaded) require.Zero(t, q5.acc.Used())