Skip to content

Commit

Permalink
fix: add locking around broker throttle timer to prevent race conditi…
Browse files Browse the repository at this point in the history
…on (#2826)

Fixes #2823

Signed-off-by: shacheng <shacheng@tencent.com>
Co-authored-by: shacheng <shacheng@tencent.com>
  • Loading branch information
chengsha and shacheng committed Mar 8, 2024
1 parent fd84c2b commit a8b3b3d
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ type Broker struct {
kerberosAuthenticator GSSAPIKerberosAuth
clientSessionReauthenticationTimeMs int64

throttleTimer *time.Timer
throttleTimer *time.Timer
throttleTimerLock sync.Mutex
}

// SASLMechanism specifies the SASL mechanism the client uses to authenticate with the broker
Expand Down Expand Up @@ -1697,6 +1698,8 @@ func (b *Broker) handleThrottledResponse(resp protocolBody) {
}

func (b *Broker) setThrottle(throttleTime time.Duration) {
b.throttleTimerLock.Lock()
defer b.throttleTimerLock.Unlock()
if b.throttleTimer != nil {
// if there is an existing timer stop/clear it
if !b.throttleTimer.Stop() {
Expand All @@ -1707,6 +1710,8 @@ func (b *Broker) setThrottle(throttleTime time.Duration) {
}

func (b *Broker) waitIfThrottled() {
b.throttleTimerLock.Lock()
defer b.throttleTimerLock.Unlock()
if b.throttleTimer != nil {
DebugLogger.Printf("broker/%d waiting for throttle timer\n", b.ID())
<-b.throttleTimer.C
Expand Down

0 comments on commit a8b3b3d

Please sign in to comment.