Skip to content

Commit

Permalink
kvserver: align Raft recv/send queue sizes
Browse files Browse the repository at this point in the history
Fixes #87465

Release justification: performance fix
Release note: Made sending and receiving Raft queue sizes match. Previously the
receiver could unnecessarily drop messages in situations when the sending queue
is bigger than the receiving one.
  • Loading branch information
pav-kv committed Sep 21, 2022
1 parent 2c30691 commit 11cdc9a
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 12 deletions.
4 changes: 4 additions & 0 deletions pkg/base/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -361,6 +361,10 @@ type RaftConfig struct {
// without acknowledgement. With an average entry size of 1 KB that
// translates to ~4096 commands that might be executed in the handling of a
// single raft.Ready operation.
//
// This setting is used both by sending and receiving end of Raft messages. To
// minimize dropped messages on the receiver, its size should at least match
// the sender's (being it the default size, or taken from the env variables).
RaftMaxInflightMsgs int

// Splitting a range which has a replica needing a snapshot results in two
Expand Down
8 changes: 5 additions & 3 deletions pkg/kv/kvserver/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,9 +98,11 @@ const (
// store's Raft log entry cache.
defaultRaftEntryCacheSize = 1 << 24 // 16M

// replicaRequestQueueSize specifies the maximum number of requests to queue
// for a replica.
replicaRequestQueueSize = 100
// replicaQueueExtraSize is the number of requests that a replica's incoming
// message queue can keep over RaftConfig.RaftMaxInflightMsgs. When the leader
// maxes out RaftMaxInflightMsgs, we want the receiving replica to still have
// some buffer for other messages, primarily heartbeats.
replicaQueueExtraSize = 10

defaultGossipWhenCapacityDeltaExceedsFraction = 0.01

Expand Down
15 changes: 9 additions & 6 deletions pkg/kv/kvserver/store_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ type raftReceiveQueue struct {
syncutil.Mutex
infos []raftRequestInfo
}
acc mon.BoundAccount
maxLen int
acc mon.BoundAccount
}

// Len returns the number of requests in the queue.
Expand Down Expand Up @@ -109,7 +110,7 @@ func (q *raftReceiveQueue) Append(
size = int64(req.Size())
q.mu.Lock()
defer q.mu.Unlock()
if q.mu.destroyed || len(q.mu.infos) >= replicaRequestQueueSize {
if q.mu.destroyed || len(q.mu.infos) >= q.maxLen {
return false, size, false
}
if q.acc.Grow(context.Background(), size) != nil {
Expand All @@ -136,13 +137,12 @@ func (qs *raftReceiveQueues) Load(rangeID roachpb.RangeID) (*raftReceiveQueue, b
}

func (qs *raftReceiveQueues) LoadOrCreate(
rangeID roachpb.RangeID,
rangeID roachpb.RangeID, maxLen int,
) (_ *raftReceiveQueue, loaded bool) {

if q, ok := qs.Load(rangeID); ok {
return q, ok // fast path
}
q := &raftReceiveQueue{}
q := &raftReceiveQueue{maxLen: maxLen}
q.acc.Init(context.Background(), qs.mon)
value, loaded := qs.m.LoadOrStore(int64(rangeID), unsafe.Pointer(q))
return (*raftReceiveQueue)(value), loaded
Expand Down Expand Up @@ -303,7 +303,10 @@ func (s *Store) HandleRaftUncoalescedRequest(
// count them.
s.metrics.RaftRcvdMessages[req.Message.Type].Inc(1)

q, _ := s.raftRecvQueues.LoadOrCreate(req.RangeID)
// NB: add a buffer for extra messages, to allow heartbeats getting through
// even if MsgApp quota is maxed out by the sender.
q, _ := s.raftRecvQueues.LoadOrCreate(req.RangeID,
s.cfg.RaftMaxInflightMsgs+replicaQueueExtraSize)
enqueue, size, appended := q.Append(req, respStream)
if !appended {
// TODO(peter): Return an error indicating the request was dropped. Note
Expand Down
6 changes: 3 additions & 3 deletions pkg/kv/kvserver/store_raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ func TestRaftReceiveQueue(t *testing.T) {
qs.Load(r5)
require.Zero(t, m.AllocBytes())

q1, loaded := qs.LoadOrCreate(r1)
q1, loaded := qs.LoadOrCreate(r1, 10 /* maxLen */)
require.Zero(t, m.AllocBytes())
require.False(t, loaded)
{
q1x, loadedx := qs.LoadOrCreate(r1)
q1x, loadedx := qs.LoadOrCreate(r1, 10 /* maxLen */)
require.True(t, loadedx)
require.Equal(t, q1, q1x)
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestRaftReceiveQueue(t *testing.T) {
}

// Now interleave creation of a second queue.
q5, loaded := qs.LoadOrCreate(r5)
q5, loaded := qs.LoadOrCreate(r5, 1 /* maxLen */)
{
require.False(t, loaded)
require.Zero(t, q5.acc.Used())
Expand Down

0 comments on commit 11cdc9a

Please sign in to comment.