Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2828 Use topology version from Server instead of Connection in ProcessError. #1252

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions x/mongo/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,12 +158,6 @@ const (
ConnectionPoolCleared
)

// ServerChanged returns true if the ProcessErrorResult indicates that the server changed from an SDAM perspective
// during a ProcessError() call.
func (p ProcessErrorResult) ServerChanged() bool {
return p != NoChange
}

// ErrorProcessor implementations can handle processing errors, which may modify their internal state.
// If this type is implemented by a Server, then Operation.Execute will call it's ProcessError
// method after it decodes a wire message.
Expand Down
62 changes: 48 additions & 14 deletions x/mongo/driver/topology/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
)

const minHeartbeatInterval = 500 * time.Millisecond
const wireVersion42 = 8 // Wire version for MongoDB 4.2

// Server state constants.
const (
Expand Down Expand Up @@ -295,6 +296,8 @@ func (s *Server) ProcessHandshakeError(err error, startingGenerationNumber uint6
return
}

// Unwrap any connection errors. If there is no wrapped connection error, then the error should
// not result in any Server state change (e.g. a command error from the database).
wrappedConnErr := unwrapConnectionError(err)
if wrappedConnErr == nil {
return
Expand Down Expand Up @@ -385,27 +388,58 @@ func getWriteConcernErrorForProcessing(err error) (*driver.WriteConcernError, bo

// ProcessError handles SDAM error handling and implements driver.ErrorProcessor.
func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessErrorResult {
// ignore nil error
// Ignore nil errors.
if err == nil {
return driver.NoChange
}

// Ignore errors from stale connections because the error came from a previous generation of the
// connection pool. The root cause of the error has aleady been handled, which is what caused
// the pool generation to increment. Processing errors for stale connections could result in
// handling the same error root cause multiple times (e.g. a temporary network interrupt causing
// all connections to the same server to return errors).
if conn.Stale() {
return driver.NoChange
}

// Must hold the processErrorLock while updating the server description and clearing the pool.
// Not holding the lock leads to possible out-of-order processing of pool.clear() and
// pool.ready() calls from concurrent server description updates.
s.processErrorLock.Lock()
defer s.processErrorLock.Unlock()

// ignore stale error
if conn.Stale() {
return driver.NoChange
// Get the wire version and service ID from the connection description because they will never
// change for the lifetime of a connection and can possibly be different between connections to
// the same server.
connDesc := conn.Description()
wireVersion := connDesc.WireVersion
serviceID := connDesc.ServiceID

// Get the topology version from the Server description because the Server description is
// updated by heartbeats and errors, so typically has a more up-to-date topology version.
serverDesc := s.desc.Load().(description.Server)
topologyVersion := serverDesc.TopologyVersion

// We don't currently update the Server topology version when we create new application
// connections, so it's possible for a connection's topology version to be newer than the
// Server's topology version. Pick the "newest" of the two topology versions.
// Technically a nil topology version on a new database response should be considered a new
// topology version and replace the Server's topology version. However, we don't know if the
// connection's topology version is based on a new or old database response, so we ignore a nil
// topology version on the connection for now.
//
// TODO(GODRIVER-2841): Remove this logic once we set the Server description when we create
// TODO application connections because then the Server's topology version will always be the
// TODO latest known.
if tv := connDesc.TopologyVersion; tv != nil && topologyVersion.CompareToIncoming(tv) < 0 {
topologyVersion = tv
}

// Invalidate server description if not primary or node recovering error occurs.
// These errors can be reported as a command error or a write concern error.
desc := conn.Description()
if cerr, ok := err.(driver.Error); ok && (cerr.NodeIsRecovering() || cerr.NotPrimary()) {
// ignore stale error
if desc.TopologyVersion.CompareToIncoming(cerr.TopologyVersion) >= 0 {
// Ignore errors that came from when the database was on a previous topology version.
if topologyVersion.CompareToIncoming(cerr.TopologyVersion) >= 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where the max{ServerTopology, ConnectionTopology} = ErrorTopology , why do we consider this a "previous topology version"? If this is correct behavior, can we add a comment explaining it?

// In the case where the max{serverDesc.TopologyVersion, connDesc.TopologyVersion} == cerr.TopologyVersion , 
// we consider cerr.Topology version a "previous topology version" because ...

Copy link
Collaborator Author

@matthewdale matthewdale May 23, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, the current comment actually describes slightly different behavior because the logic also ignores errors that came from the current topology version.

I believe the reason we only act on errors where the error topology version is strictly greater than the local topology version is that we only want to process the first instance of that error for a given topology version, and ignore the rest.

For example consider the following sequence of events:

Step Action Driver TopologyVersion DB TopologyVersion
1 Initial state 1 1
2 DB elects new primary 1 2
3 Driver starts 4 concurrent writes to secondary, secondary returns "not writable primary" 1 2
4 Driver processes first "not writable primary" error with TopologyVersion=2, marks server as Kind=Unknown, updates its TopologyVersion 2 2
5 Driver processes 3 more "not writable primary" errors with TopologyVersion=2, ignores them 2 2

Notice that the driver only processes the first "not writable primary" error, marking the server role as "unknown" until it is updated by the monitoring routine. Subsequent errors are ignored based on topology version, which prevents the driver from re-processing errors for the same topology change event.

Keep in mind that we only check topology version for "node is recovering" or "not primary" error codes, which we expect to increment topology version on the database. The first time the driver sees one of those errors, it knows the database topology has changed and needs to re-discover the topology. The driver ignores topology version for all other kinds of errors.

For additional reference, the topology version comparison logic comes directly from the error handling pseudocode in the SDAM spec.

I'll update the comment with a summary of the above information to increase clarity.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I see. This great explanation! Thank you

return driver.NoChange
}

Expand All @@ -415,16 +449,16 @@ func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessE

res := driver.ServerMarkedUnknown
// If the node is shutting down or is older than 4.2, we synchronously clear the pool
if cerr.NodeIsShuttingDown() || desc.WireVersion == nil || desc.WireVersion.Max < 8 {
if cerr.NodeIsShuttingDown() || wireVersion == nil || wireVersion.Max < wireVersion42 {
res = driver.ConnectionPoolCleared
s.pool.clear(err, desc.ServiceID)
s.pool.clear(err, serviceID)
}

return res
}
if wcerr, ok := getWriteConcernErrorForProcessing(err); ok {
// ignore stale error
if desc.TopologyVersion.CompareToIncoming(wcerr.TopologyVersion) >= 0 {
// Ignore errors that came from when the database was on a previous topology version.
if topologyVersion.CompareToIncoming(wcerr.TopologyVersion) >= 0 {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case where the max{ServerTopology, ConnectionTopology} = ErrorTopology , why do we consider this a "previous topology version"? If this is correct behavior, can we add a comment explaining it?

// In the case where the max{serverDesc.TopologyVersion, connDesc.TopologyVersion} == wcerr.TopologyVersion , 
// we consider cerr.Topology version a "previous topology version" because ...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See this comment for a detailed answer.

return driver.NoChange
}

Expand All @@ -434,9 +468,9 @@ func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessE

res := driver.ServerMarkedUnknown
// If the node is shutting down or is older than 4.2, we synchronously clear the pool
if wcerr.NodeIsShuttingDown() || desc.WireVersion == nil || desc.WireVersion.Max < 8 {
if wcerr.NodeIsShuttingDown() || wireVersion == nil || wireVersion.Max < wireVersion42 {
res = driver.ConnectionPoolCleared
s.pool.clear(err, desc.ServiceID)
s.pool.clear(err, serviceID)
}
return res
}
Expand All @@ -458,7 +492,7 @@ func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessE
// monitoring check. The check is cancelled last to avoid a post-cancellation reconnect racing with
// updateDescription.
s.updateDescription(description.NewServerFromError(s.address, err, nil))
s.pool.clear(err, desc.ServiceID)
s.pool.clear(err, serviceID)
s.cancelCheck()
return driver.ConnectionPoolCleared
}
Expand Down
Loading