Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FIXED] Race condition that could cause error on subscribe #1241

Merged
merged 1 commit into from
Feb 12, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3245,19 +3245,19 @@ func (s *StanServer) checkClientHealth(clientID string) {
client.fhb++
// If we have reached the max number of failures
if client.fhb > s.opts.ClientHBFailCount {
s.log.Errorf("[Client:%s] Timed out on heartbeats", clientID)
// close the client (connection). This locks the
// client object internally so unlock here.
client.Unlock()
// If clustered, thread operations through Raft.
if s.isClustered {
if err := s.replicateConnClose(&pb.CloseRequest{ClientID: clientID}, false); err != nil {
s.log.Errorf("[Client:%s] Failed to replicate disconnect on heartbeat expiration: %v",
clientID, err)
s.barrier(func() {
s.log.Errorf("[Client:%s] Timed out on heartbeats", clientID)
// If clustered, thread operations through Raft.
if s.isClustered {
if err := s.replicateConnClose(&pb.CloseRequest{ClientID: clientID}, false); err != nil {
s.log.Errorf("[Client:%s] Failed to replicate disconnect on heartbeat expiration: %v",
clientID, err)
}
} else {
s.closeClient(clientID)
}
} else {
s.closeClient(clientID)
}
})
return
}
} else {
Expand Down Expand Up @@ -3298,14 +3298,16 @@ func (s *StanServer) closeClient(clientID string) error {
return ErrUnknownClient
}

// Remove all non-durable subscribers.
s.removeAllNonDurableSubscribers(client)

// Remove from our clientStore.
// Remove from our clientStore before removing subs.
// This prevent race when the same client ID is just
// reconnecting and registering a durable.
if _, err := s.clients.unregister(clientID); err != nil {
s.log.Errorf("Error unregistering client %q: %v", clientID, err)
}

// Remove all non-durable subscribers.
s.removeAllNonDurableSubscribers(client)

if s.debug {
client.RLock()
hbInbox := client.info.HbInbox
Expand Down