Skip to content

Commit

Permalink
kvserver: use type-safe atomics in raftSendQueue
Browse files Browse the repository at this point in the history
Go 1.19 introduced atomic types that enforce atomic access to variables, which
in many situation is less error-prone. This commit resolves a TODO to take
advantage of these types.

Release note: None
  • Loading branch information
pav-kv committed Sep 27, 2022
1 parent 0961abd commit 0b7fec8
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions pkg/kv/kvserver/raft_transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ type raftSendQueue struct {
reqs chan *kvserverpb.RaftMessageRequest
// The number of bytes in flight. Must be updated *atomically* on sending and
// receiving from the reqs channel.
// TODO(pavelkalinnikov): replace by atomic.Int64 when CRDB uses Go 1.19.
bytes int64
bytes atomic.Int64
}

// NewDummyRaftTransport returns a dummy raft transport for use in tests which
Expand Down Expand Up @@ -228,7 +227,7 @@ func (t *RaftTransport) queueMessageCount() int64 {
// queueByteSize returns the total bytes size of outgoing messages in the queue.
func (t *RaftTransport) queueByteSize() int64 {
var size int64
t.visitQueues(func(q *raftSendQueue) { size += atomic.LoadInt64(&q.bytes) })
t.visitQueues(func(q *raftSendQueue) { size += q.bytes.Load() })
return size
}

Expand Down Expand Up @@ -441,7 +440,7 @@ func (t *RaftTransport) processQueue(
return err
case req := <-q.reqs:
size := int64(req.Size())
atomic.AddInt64(&q.bytes, -size)
q.bytes.Add(-size)
budget := targetRaftOutgoingBatchSize.Get(&t.st.SV) - size
batch.Requests = append(batch.Requests, *req)
releaseRaftMessageRequest(req)
Expand All @@ -450,7 +449,7 @@ func (t *RaftTransport) processQueue(
select {
case req = <-q.reqs:
size := int64(req.Size())
atomic.AddInt64(&q.bytes, -size)
q.bytes.Add(-size)
budget -= size
batch.Requests = append(batch.Requests, *req)
releaseRaftMessageRequest(req)
Expand Down Expand Up @@ -532,7 +531,7 @@ func (t *RaftTransport) SendAsync(
size := int64(req.Size())
select {
case q.reqs <- req:
atomic.AddInt64(&q.bytes, size)
q.bytes.Add(size)
return true
default:
if logRaftSendQueueFullEvery.ShouldLog() {
Expand Down Expand Up @@ -564,7 +563,7 @@ func (t *RaftTransport) startProcessNewQueue(
for {
select {
case req := <-q.reqs:
atomic.AddInt64(&q.bytes, -int64(req.Size()))
q.bytes.Add(-int64(req.Size()))
t.metrics.MessagesDropped.Inc(1)
releaseRaftMessageRequest(req)
default:
Expand Down

0 comments on commit 0b7fec8

Please sign in to comment.