From d79782290b11d1feef4924f92113a284fde06d8a Mon Sep 17 00:00:00 2001 From: Matt Dale <9760375+matthewdale@users.noreply.github.com> Date: Mon, 5 Jun 2023 13:02:07 -0700 Subject: [PATCH] GODRIVER-2828 Use topology version from Server instead of Connection in ProcessError. (#1252) Co-authored-by: Preston Vasquez --- x/mongo/driver/driver.go | 6 - x/mongo/driver/topology/server.go | 62 ++- x/mongo/driver/topology/server_test.go | 552 +++++++++++++++++-------- 3 files changed, 439 insertions(+), 181 deletions(-) diff --git a/x/mongo/driver/driver.go b/x/mongo/driver/driver.go index 5bbf9a365e..38a0a2d130 100644 --- a/x/mongo/driver/driver.go +++ b/x/mongo/driver/driver.go @@ -158,12 +158,6 @@ const ( ConnectionPoolCleared ) -// ServerChanged returns true if the ProcessErrorResult indicates that the server changed from an SDAM perspective -// during a ProcessError() call. -func (p ProcessErrorResult) ServerChanged() bool { - return p != NoChange -} - // ErrorProcessor implementations can handle processing errors, which may modify their internal state. // If this type is implemented by a Server, then Operation.Execute will call it's ProcessError // method after it decodes a wire message. diff --git a/x/mongo/driver/topology/server.go b/x/mongo/driver/topology/server.go index 03dea1bcdd..95ca8e85ba 100644 --- a/x/mongo/driver/topology/server.go +++ b/x/mongo/driver/topology/server.go @@ -24,6 +24,7 @@ import ( ) const minHeartbeatInterval = 500 * time.Millisecond +const wireVersion42 = 8 // Wire version for MongoDB 4.2 // Server state constants. const ( @@ -294,6 +295,8 @@ func (s *Server) ProcessHandshakeError(err error, startingGenerationNumber uint6 return } + // Unwrap any connection errors. If there is no wrapped connection error, then the error should + // not result in any Server state change (e.g. a command error from the database). wrappedConnErr := unwrapConnectionError(err) if wrappedConnErr == nil { return @@ -384,27 +387,58 @@ func getWriteConcernErrorForProcessing(err error) (*driver.WriteConcernError, bo // ProcessError handles SDAM error handling and implements driver.ErrorProcessor. func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessErrorResult { - // ignore nil error + // Ignore nil errors. if err == nil { return driver.NoChange } + // Ignore errors from stale connections because the error came from a previous generation of the + // connection pool. The root cause of the error has aleady been handled, which is what caused + // the pool generation to increment. Processing errors for stale connections could result in + // handling the same error root cause multiple times (e.g. a temporary network interrupt causing + // all connections to the same server to return errors). + if conn.Stale() { + return driver.NoChange + } + // Must hold the processErrorLock while updating the server description and clearing the pool. // Not holding the lock leads to possible out-of-order processing of pool.clear() and // pool.ready() calls from concurrent server description updates. s.processErrorLock.Lock() defer s.processErrorLock.Unlock() - // ignore stale error - if conn.Stale() { - return driver.NoChange + // Get the wire version and service ID from the connection description because they will never + // change for the lifetime of a connection and can possibly be different between connections to + // the same server. + connDesc := conn.Description() + wireVersion := connDesc.WireVersion + serviceID := connDesc.ServiceID + + // Get the topology version from the Server description because the Server description is + // updated by heartbeats and errors, so typically has a more up-to-date topology version. + serverDesc := s.desc.Load().(description.Server) + topologyVersion := serverDesc.TopologyVersion + + // We don't currently update the Server topology version when we create new application + // connections, so it's possible for a connection's topology version to be newer than the + // Server's topology version. Pick the "newest" of the two topology versions. + // Technically a nil topology version on a new database response should be considered a new + // topology version and replace the Server's topology version. However, we don't know if the + // connection's topology version is based on a new or old database response, so we ignore a nil + // topology version on the connection for now. + // + // TODO(GODRIVER-2841): Remove this logic once we set the Server description when we create + // TODO application connections because then the Server's topology version will always be the + // TODO latest known. + if tv := connDesc.TopologyVersion; tv != nil && topologyVersion.CompareToIncoming(tv) < 0 { + topologyVersion = tv } + // Invalidate server description if not primary or node recovering error occurs. // These errors can be reported as a command error or a write concern error. - desc := conn.Description() if cerr, ok := err.(driver.Error); ok && (cerr.NodeIsRecovering() || cerr.NotPrimary()) { - // ignore stale error - if desc.TopologyVersion.CompareToIncoming(cerr.TopologyVersion) >= 0 { + // Ignore errors that came from when the database was on a previous topology version. + if topologyVersion.CompareToIncoming(cerr.TopologyVersion) >= 0 { return driver.NoChange } @@ -414,16 +448,16 @@ func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessE res := driver.ServerMarkedUnknown // If the node is shutting down or is older than 4.2, we synchronously clear the pool - if cerr.NodeIsShuttingDown() || desc.WireVersion == nil || desc.WireVersion.Max < 8 { + if cerr.NodeIsShuttingDown() || wireVersion == nil || wireVersion.Max < wireVersion42 { res = driver.ConnectionPoolCleared - s.pool.clear(err, desc.ServiceID) + s.pool.clear(err, serviceID) } return res } if wcerr, ok := getWriteConcernErrorForProcessing(err); ok { - // ignore stale error - if desc.TopologyVersion.CompareToIncoming(wcerr.TopologyVersion) >= 0 { + // Ignore errors that came from when the database was on a previous topology version. + if topologyVersion.CompareToIncoming(wcerr.TopologyVersion) >= 0 { return driver.NoChange } @@ -433,9 +467,9 @@ func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessE res := driver.ServerMarkedUnknown // If the node is shutting down or is older than 4.2, we synchronously clear the pool - if wcerr.NodeIsShuttingDown() || desc.WireVersion == nil || desc.WireVersion.Max < 8 { + if wcerr.NodeIsShuttingDown() || wireVersion == nil || wireVersion.Max < wireVersion42 { res = driver.ConnectionPoolCleared - s.pool.clear(err, desc.ServiceID) + s.pool.clear(err, serviceID) } return res } @@ -457,7 +491,7 @@ func (s *Server) ProcessError(err error, conn driver.Connection) driver.ProcessE // monitoring check. The check is cancelled last to avoid a post-cancellation reconnect racing with // updateDescription. s.updateDescription(description.NewServerFromError(s.address, err, nil)) - s.pool.clear(err, desc.ServiceID) + s.pool.clear(err, serviceID) s.cancelCheck() return driver.ConnectionPoolCleared } diff --git a/x/mongo/driver/topology/server_test.go b/x/mongo/driver/topology/server_test.go index ecb001e311..31f4d0fae6 100644 --- a/x/mongo/driver/topology/server_test.go +++ b/x/mongo/driver/topology/server_test.go @@ -623,155 +623,7 @@ func TestServer(t *testing.T) { wg.Wait() close(cleanup) }) - t.Run("ProcessError", func(t *testing.T) { - processID := primitive.NewObjectID() - // Declare "old" and "new" topology versions and a connection that reports the new version from its - // Description() method. This connection can be used to test that errors containing a stale topology version - // do not affect the state of the server. - oldTV := &description.TopologyVersion{ - ProcessID: processID, - Counter: 0, - } - newTV := &description.TopologyVersion{ - ProcessID: processID, - Counter: 1, - } - oldTVConn := newProcessErrorTestConn(oldTV) - newTVConn := newProcessErrorTestConn(newTV) - - staleNotPrimaryError := driver.Error{ - Code: 10107, // NotPrimary - TopologyVersion: oldTV, - } - newNotPrimaryError := driver.Error{ - Code: 10107, - } - newShutdownError := driver.Error{ - Code: 11600, // InterruptedAtShutdown - } - staleNotPrimaryWCError := driver.WriteCommandError{ - WriteConcernError: &driver.WriteConcernError{ - Code: 10107, - TopologyVersion: oldTV, - }, - } - newNotPrimaryWCError := driver.WriteCommandError{ - WriteConcernError: &driver.WriteConcernError{ - Code: 10107, - }, - } - newShutdownWCError := driver.WriteCommandError{ - WriteConcernError: &driver.WriteConcernError{ - Code: 11600, - }, - } - nonStateChangeError := driver.Error{ - Code: 1, - } - networkTimeoutError := driver.Error{ - Labels: []string{driver.NetworkError}, - Wrapped: ConnectionError{ - // Use a net.Error implementation that can return true from its Timeout() function. - Wrapped: &net.DNSError{ - IsTimeout: true, - }, - }, - } - contextCanceledError := driver.Error{ - Labels: []string{driver.NetworkError}, - Wrapped: ConnectionError{ - Wrapped: context.Canceled, - }, - } - nonTimeoutNetworkError := driver.Error{ - Labels: []string{driver.NetworkError}, - Wrapped: ConnectionError{ - // Use a net.Error implementation that always returns false from its Timeout() function. - Wrapped: &net.AddrError{}, - }, - } - - testCases := []struct { - name string - err error - conn driver.Connection - result driver.ProcessErrorResult - }{ - // One-off tests for errors that should have no effect. - {"nil error", nil, oldTVConn, driver.NoChange}, - {"stale connection", errors.New("foo"), newStaleProcessErrorTestConn(), driver.NoChange}, - {"non state change error", nonStateChangeError, oldTVConn, driver.NoChange}, - - // Tests for top-level (ok: 0) errors. We test a NotPrimary error and a Shutdown error because the former - // only causes the server to be marked Unknown and the latter causes the pool to be cleared. - {"stale not primary error", staleNotPrimaryError, newTVConn, driver.NoChange}, - {"new not primary error", newNotPrimaryError, oldTVConn, driver.ServerMarkedUnknown}, - {"new shutdown error", newShutdownError, oldTVConn, driver.ConnectionPoolCleared}, - - // Repeat ok:0 tests for write concern errors. - {"stale not primary write concern error", staleNotPrimaryWCError, newTVConn, driver.NoChange}, - {"new not primary write concern error", newNotPrimaryWCError, oldTVConn, driver.ServerMarkedUnknown}, - {"new shutdown write concern error", newShutdownWCError, oldTVConn, driver.ConnectionPoolCleared}, - - // Network/timeout error tests. - {"network timeout error", networkTimeoutError, oldTVConn, driver.NoChange}, - {"context canceled error", contextCanceledError, oldTVConn, driver.NoChange}, - {"non-timeout network error", nonTimeoutNetworkError, oldTVConn, driver.ConnectionPoolCleared}, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - server := NewServer(address.Address("localhost"), primitive.NewObjectID()) - server.state = serverConnected - err := server.pool.ready() - assert.Nil(t, err, "pool.ready() error: %v", err) - - originalDesc := description.Server{ - // The actual Kind value does not matter as long as it's not Unknown so we can detect that it is - // properly changed to Unknown during the ProcessError call if needed. - Kind: description.RSPrimary, - } - server.desc.Store(originalDesc) - - result := server.ProcessError(tc.err, tc.conn) - assert.Equal(t, tc.result, result, - "expected ProcessError result %v, got %v", tc.result, result) - - // Test the ServerChanged() function. - expectedServerChanged := tc.result != driver.NoChange - serverChanged := result.ServerChanged() - assert.Equal(t, expectedServerChanged, serverChanged, "expected ServerChanged() to return %v, got %v", - expectedServerChanged, serverChanged) - - // Test that the server description fields have been updated to match the ProcessError result. - expectedKind := originalDesc.Kind - var expectedError error - var expectedPoolGeneration uint64 - switch tc.result { - case driver.ConnectionPoolCleared: - expectedPoolGeneration = 1 - // This case also implies ServerMarkedUnknown, so any logic in the following case applies as well. - fallthrough - case driver.ServerMarkedUnknown: - expectedKind = description.Unknown - expectedError = tc.err - case driver.NoChange: - default: - t.Fatalf("unrecognized ProcessErrorResult value %v", tc.result) - } - - desc := server.Description() - assert.Equal(t, expectedKind, desc.Kind, - "expected server kind %q, got %q", expectedKind, desc.Kind) - assert.Equal(t, expectedError, desc.LastError, - "expected last error %v, got %v", expectedError, desc.LastError) - generation := server.pool.generation.getGeneration(nil) - assert.Equal(t, expectedPoolGeneration, generation, - "expected pool generation %d, got %d", expectedPoolGeneration, generation) - }) - } - }) t.Run("update topology", func(t *testing.T) { var updated atomic.Value // bool updated.Store(false) @@ -980,6 +832,374 @@ func TestServer(t *testing.T) { }) } +func TestServer_ProcessError(t *testing.T) { + t.Parallel() + + processID := primitive.NewObjectID() + newProcessID := primitive.NewObjectID() + + testCases := []struct { + name string + + startDescription description.Server // Initial server description at the start of the test. + + inputErr error // ProcessError error input. + inputConn driver.Connection // ProcessError conn input. + + want driver.ProcessErrorResult // Expected ProcessError return value. + wantGeneration uint64 // Expected resulting connection pool generation. + wantDescription description.Server // Expected resulting server description. + }{ + // Test that a nil error does not change the Server state. + { + name: "nil error", + startDescription: description.Server{ + Kind: description.RSPrimary, + }, + inputErr: nil, + want: driver.NoChange, + wantGeneration: 0, + wantDescription: description.Server{ + Kind: description.RSPrimary, + }, + }, + // Test that errors that occur on stale connections are ignored. + { + name: "stale connection", + startDescription: description.Server{ + Kind: description.RSPrimary, + }, + inputErr: errors.New("foo"), + inputConn: newProcessErrorTestConn( + &description.VersionRange{ + Max: 17, + }, + true), + want: driver.NoChange, + wantGeneration: 0, + wantDescription: description.Server{ + Kind: description.RSPrimary, + }, + }, + // Test that errors that do not indicate a database state change or connection error are + // ignored. + { + name: "non state change error", + startDescription: description.Server{ + Kind: description.RSPrimary, + }, + inputErr: driver.Error{ + Code: 1, + }, + inputConn: newProcessErrorTestConn(&description.VersionRange{Max: 17}, false), + want: driver.NoChange, + wantGeneration: 0, + wantDescription: description.Server{ + Kind: description.RSPrimary, + }, + }, + // Test that a "not writable primary" error with an old topology version is ignored. + { + name: "stale not writable primary error", + startDescription: newServerDescription(description.RSPrimary, processID, 1, nil), + inputErr: driver.Error{ + Code: 10107, // NotWritablePrimary + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 0, + }, + }, + inputConn: newProcessErrorTestConn(&description.VersionRange{Max: 17}, false), + want: driver.NoChange, + wantGeneration: 0, + wantDescription: newServerDescription(description.RSPrimary, processID, 1, nil), + }, + // Test that a "not writable primary" error with an newer topology version marks the Server + // as "unknown" and updates its topology version. + { + name: "new not writable primary error", + startDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + inputErr: driver.Error{ + Code: 10107, // NotWritablePrimary + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 1, + }, + }, + inputConn: newProcessErrorTestConn(&description.VersionRange{Max: 17}, false), + want: driver.ServerMarkedUnknown, + wantGeneration: 0, + wantDescription: newServerDescription(description.Unknown, processID, 1, driver.Error{ + Code: 10107, // NotWritablePrimary + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 1, + }, + }), + }, + // Test that a "not writable primary" error with an different topology process ID marks the Server as + // "unknown" and updates its topology version. + { + name: "new process ID not writable primary error", + startDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + inputErr: driver.Error{ + Code: 10107, // NotWritablePrimary + TopologyVersion: &description.TopologyVersion{ + ProcessID: newProcessID, + Counter: 0, + }, + }, + inputConn: newProcessErrorTestConn(&description.VersionRange{Max: 17}, false), + want: driver.ServerMarkedUnknown, + wantGeneration: 0, + wantDescription: newServerDescription(description.Unknown, newProcessID, 0, driver.Error{ + Code: 10107, // NotWritablePrimary + TopologyVersion: &description.TopologyVersion{ + ProcessID: newProcessID, + Counter: 0, + }, + }), + }, + // Test that a connection with a newer topology version overrides the server topology + // version and causes an error with the same topology version to be ignored. + // TODO(GODRIVER-2841): Remove this test case. + { + name: "newer connection topology version", + startDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + inputErr: driver.Error{ + Code: 10107, // NotWritablePrimary + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 1, + }, + }, + inputConn: &processErrorTestConn{ + description: description.Server{ + WireVersion: &description.VersionRange{Max: 17}, + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 1, + }, + }, + stale: false, + }, + want: driver.NoChange, + wantGeneration: 0, + wantDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + }, + // Test that a "node is shutting down" error with a newer topology version clears the + // connection pool, marks the Server as "unknown", and updates its topology version. + { + name: "new shutdown error", + startDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + inputErr: driver.Error{ + Code: 11600, // InterruptedAtShutdown + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 1, + }, + }, + inputConn: newProcessErrorTestConn(&description.VersionRange{Max: 17}, false), + want: driver.ConnectionPoolCleared, + wantGeneration: 1, + wantDescription: newServerDescription(description.Unknown, processID, 1, driver.Error{ + Code: 11600, // InterruptedAtShutdown + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 1, + }, + }), + }, + // Test that a "not writable primary" error with a stale topology version is ignored. + { + name: "stale not writable primary write concern error", + startDescription: newServerDescription(description.RSPrimary, processID, 1, nil), + inputErr: driver.WriteCommandError{ + WriteConcernError: &driver.WriteConcernError{ + Code: 10107, // NotWritablePrimary + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 0, + }, + }, + }, + inputConn: newProcessErrorTestConn(&description.VersionRange{Max: 17}, false), + want: driver.NoChange, + wantGeneration: 0, + wantDescription: newServerDescription(description.RSPrimary, processID, 1, nil), + }, + // Test that a "not writable primary" error with a newer topology version marks the Server + // as "unknown" and updates its topology version. + { + name: "new not writable primary write concern error", + startDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + inputErr: driver.WriteCommandError{ + WriteConcernError: &driver.WriteConcernError{ + Code: 10107, // NotWritablePrimary + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 1, + }, + }, + }, + inputConn: newProcessErrorTestConn(&description.VersionRange{Max: 17}, false), + want: driver.ServerMarkedUnknown, + wantGeneration: 0, + wantDescription: newServerDescription(description.Unknown, processID, 1, driver.WriteCommandError{ + WriteConcernError: &driver.WriteConcernError{ + Code: 10107, // NotWritablePrimary + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 1, + }, + }, + }), + }, + // Test that "node is shutting down" errors that have a newer topology version than the + // local Server topology version mark the Server as "unknown" and clear the connection pool. + { + name: "new shutdown write concern error", + startDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + inputErr: driver.WriteCommandError{ + WriteConcernError: &driver.WriteConcernError{ + Code: 11600, // InterruptedAtShutdown + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 1, + }, + }, + }, + inputConn: newProcessErrorTestConn(&description.VersionRange{Max: 17}, false), + want: driver.ConnectionPoolCleared, + wantGeneration: 1, + wantDescription: newServerDescription(description.Unknown, processID, 1, driver.WriteCommandError{ + WriteConcernError: &driver.WriteConcernError{ + Code: 11600, // InterruptedAtShutdown + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 1, + }, + }, + }), + }, + // Test that "node is recovering" or "not writable primary" errors that have a newer + // topology version than the local Server topology version and appear to be from MongoDB + // servers before 4.2 mark the Server as "unknown" and clear the connection pool. + { + name: "older than 4.2 write concern error", + startDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + inputErr: driver.WriteCommandError{ + WriteConcernError: &driver.WriteConcernError{ + Code: 10107, // NotWritablePrimary + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 1, + }, + }, + }, + inputConn: newProcessErrorTestConn(&description.VersionRange{Max: 7}, false), + want: driver.ConnectionPoolCleared, + wantGeneration: 1, + wantDescription: newServerDescription(description.Unknown, processID, 1, driver.WriteCommandError{ + WriteConcernError: &driver.WriteConcernError{ + Code: 10107, // NotWritablePrimary + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: 1, + }, + }, + }), + }, + // Test that a network timeout error, such as a DNS lookup timeout error, is ignored. + { + name: "network timeout error", + startDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + inputErr: driver.Error{ + Labels: []string{driver.NetworkError}, + Wrapped: ConnectionError{ + // Use a net.Error implementation that can return true from its Timeout() function. + Wrapped: &net.DNSError{ + IsTimeout: true, + }, + }, + }, + inputConn: newProcessErrorTestConn(&description.VersionRange{Max: 17}, false), + want: driver.NoChange, + wantGeneration: 0, + wantDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + }, + // Test that a context canceled error is ignored. + { + name: "context canceled error", + startDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + inputErr: driver.Error{ + Labels: []string{driver.NetworkError}, + Wrapped: ConnectionError{ + Wrapped: context.Canceled, + }, + }, + inputConn: newProcessErrorTestConn(&description.VersionRange{Max: 17}, false), + want: driver.NoChange, + wantGeneration: 0, + wantDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + }, + // Test that a non-timeout network error, such as an address lookup error, marks the server + // as "unknown" and sets its topology version to nil. + { + name: "non-timeout network error", + startDescription: newServerDescription(description.RSPrimary, processID, 0, nil), + inputErr: driver.Error{ + Labels: []string{driver.NetworkError}, + Wrapped: ConnectionError{ + // Use a net.Error implementation that always returns false from its Timeout() function. + Wrapped: &net.AddrError{}, + }, + }, + inputConn: newProcessErrorTestConn(&description.VersionRange{Max: 17}, false), + want: driver.ConnectionPoolCleared, + wantGeneration: 1, + wantDescription: description.Server{ + Kind: description.Unknown, + LastError: driver.Error{ + Labels: []string{driver.NetworkError}, + Wrapped: ConnectionError{ + Wrapped: &net.AddrError{}, + }, + }, + }, + }, + } + + for _, tc := range testCases { + tc := tc // Capture range variable. + + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + server := NewServer(address.Address(""), primitive.NewObjectID()) + server.state = serverConnected + err := server.pool.ready() + require.Nil(t, err, "pool.ready() error: %v", err) + + server.desc.Store(tc.startDescription) + + got := server.ProcessError(tc.inputErr, tc.inputConn) + assert.Equal(t, tc.want, got, "expected and actual ProcessError result are different") + + desc := server.Description() + assert.Equal(t, + tc.wantDescription, + desc, + "expected and actual server descriptions are different") + + assert.Equal(t, + tc.wantGeneration, + server.pool.generation.getGeneration(nil), + "expected and actual pool generation are different") + }) + } +} + func includesMetadata(t *testing.T, wm []byte) bool { var ok bool _, _, _, _, wm, ok = wiremessage.ReadHeader(wm) @@ -1024,19 +1244,16 @@ type processErrorTestConn struct { // Embed a driver.Connection to quickly implement the interface without // implementing all methods. driver.Connection - stale bool - tv *description.TopologyVersion -} - -func newProcessErrorTestConn(tv *description.TopologyVersion) *processErrorTestConn { - return &processErrorTestConn{ - tv: tv, - } + description description.Server + stale bool } -func newStaleProcessErrorTestConn() *processErrorTestConn { +func newProcessErrorTestConn(wireVersion *description.VersionRange, stale bool) *processErrorTestConn { return &processErrorTestConn{ - stale: true, + description: description.Server{ + WireVersion: wireVersion, + }, + stale: stale, } } @@ -1045,10 +1262,23 @@ func (p *processErrorTestConn) Stale() bool { } func (p *processErrorTestConn) Description() description.Server { + return p.description +} + +// newServerDescription is a convenience function for creating a server description with a specified +// kind, topology version process ID and counter, and last error. +func newServerDescription( + kind description.ServerKind, + processID primitive.ObjectID, + counter int64, + lastError error, +) description.Server { return description.Server{ - WireVersion: &description.VersionRange{ - Max: SupportedWireVersions.Max, + Kind: kind, + TopologyVersion: &description.TopologyVersion{ + ProcessID: processID, + Counter: counter, }, - TopologyVersion: p.tv, + LastError: lastError, } }