diff --git a/.changelog/20866.txt b/.changelog/20866.txt new file mode 100644 index 000000000000..20c41f02bc0f --- /dev/null +++ b/.changelog/20866.txt @@ -0,0 +1,7 @@ +```release-note:improvement +api: Randomize the returned server list for the WatchServers gRPC endpoint. +``` + +```release-note:bug +connect: Fix potential goroutine leak in xDS stream handling. +``` diff --git a/agent/grpc-external/services/serverdiscovery/watch_servers.go b/agent/grpc-external/services/serverdiscovery/watch_servers.go index 94ed7ac58aef..24960336c815 100644 --- a/agent/grpc-external/services/serverdiscovery/watch_servers.go +++ b/agent/grpc-external/services/serverdiscovery/watch_servers.go @@ -6,6 +6,7 @@ package serverdiscovery import ( "context" "errors" + "math/rand" "github.com/hashicorp/go-hclog" "google.golang.org/grpc/codes" @@ -130,6 +131,10 @@ func eventToResponse(req *pbserverdiscovery.WatchServersRequest, event stream.Ev }) } + // Shuffle servers so that consul-dataplane doesn't consistently choose the same connections on startup. + rand.Shuffle(len(servers), func(i, j int) { + servers[i], servers[j] = servers[j], servers[i] + }) return &pbserverdiscovery.WatchServersResponse{ Servers: servers, }, nil diff --git a/agent/grpc-external/services/serverdiscovery/watch_servers_test.go b/agent/grpc-external/services/serverdiscovery/watch_servers_test.go index 0df48f3bb35c..85daa9b60663 100644 --- a/agent/grpc-external/services/serverdiscovery/watch_servers_test.go +++ b/agent/grpc-external/services/serverdiscovery/watch_servers_test.go @@ -166,7 +166,7 @@ func TestWatchServers_StreamLifeCycle(t *testing.T) { // 4. See the corresponding message sent back through the stream. rsp = mustGetServers(t, rspCh) require.NotNil(t, rsp) - prototest.AssertDeepEqual(t, twoServerResponse, rsp) + prototest.AssertElementsMatch(t, twoServerResponse.Servers, rsp.Servers) // 5. Send a NewCloseSubscriptionEvent for the token secret. publisher.Publish([]stream.Event{ @@ -176,7 +176,7 @@ func TestWatchServers_StreamLifeCycle(t *testing.T) { // 6. Observe another snapshot message rsp = mustGetServers(t, rspCh) require.NotNil(t, rsp) - prototest.AssertDeepEqual(t, twoServerResponse, rsp) + prototest.AssertElementsMatch(t, twoServerResponse.Servers, rsp.Servers) // 7. Publish another event to move to 3 servers. publisher.Publish([]stream.Event{ @@ -192,7 +192,7 @@ func TestWatchServers_StreamLifeCycle(t *testing.T) { // seen after stream reinitialization. rsp = mustGetServers(t, rspCh) require.NotNil(t, rsp) - prototest.AssertDeepEqual(t, threeServerResponse, rsp) + prototest.AssertElementsMatch(t, threeServerResponse.Servers, rsp.Servers) } func TestWatchServers_ACLToken_AnonymousToken(t *testing.T) { diff --git a/agent/xds/delta.go b/agent/xds/delta.go index 70b65afe2719..cd4d4fb164b7 100644 --- a/agent/xds/delta.go +++ b/agent/xds/delta.go @@ -79,7 +79,10 @@ func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error { close(reqCh) return } - reqCh <- req + select { + case <-stream.Context().Done(): + case reqCh <- req: + } } }()