Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various bug-fixes and improvements #20866

Merged
merged 3 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changelog/20866.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
```release-note:improvement
api: Randomize the returned server list for the WatchServers gRPC endpoint.
```

```release-note:bug
connect: Fix potential goroutine leak in xDS stream handling.
```
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package serverdiscovery
import (
"context"
"errors"
"math/rand"

"github.com/hashicorp/go-hclog"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -130,6 +131,10 @@ func eventToResponse(req *pbserverdiscovery.WatchServersRequest, event stream.Ev
})
}

// Shuffle servers so that consul-dataplane doesn't consistently choose the same connections on startup.
rand.Shuffle(len(servers), func(i, j int) {
servers[i], servers[j] = servers[j], servers[i]
})
return &pbserverdiscovery.WatchServersResponse{
Servers: servers,
}, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestWatchServers_StreamLifeCycle(t *testing.T) {
// 4. See the corresponding message sent back through the stream.
rsp = mustGetServers(t, rspCh)
require.NotNil(t, rsp)
prototest.AssertDeepEqual(t, twoServerResponse, rsp)
prototest.AssertElementsMatch(t, twoServerResponse.Servers, rsp.Servers)

// 5. Send a NewCloseSubscriptionEvent for the token secret.
publisher.Publish([]stream.Event{
Expand All @@ -176,7 +176,7 @@ func TestWatchServers_StreamLifeCycle(t *testing.T) {
// 6. Observe another snapshot message
rsp = mustGetServers(t, rspCh)
require.NotNil(t, rsp)
prototest.AssertDeepEqual(t, twoServerResponse, rsp)
prototest.AssertElementsMatch(t, twoServerResponse.Servers, rsp.Servers)

// 7. Publish another event to move to 3 servers.
publisher.Publish([]stream.Event{
Expand All @@ -192,7 +192,7 @@ func TestWatchServers_StreamLifeCycle(t *testing.T) {
// seen after stream reinitialization.
rsp = mustGetServers(t, rspCh)
require.NotNil(t, rsp)
prototest.AssertDeepEqual(t, threeServerResponse, rsp)
prototest.AssertElementsMatch(t, threeServerResponse.Servers, rsp.Servers)
}

func TestWatchServers_ACLToken_AnonymousToken(t *testing.T) {
Expand Down
5 changes: 4 additions & 1 deletion agent/xds/delta.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,10 @@ func (s *Server) DeltaAggregatedResources(stream ADSDeltaStream) error {
close(reqCh)
return
}
reqCh <- req
select {
case <-stream.Context().Done():
case reqCh <- req:
}
}
}()

Expand Down
Loading