Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the handling of removing disconnected streams to avoid a panic when multiple streams disconnect from the sdkserver #3668

Merged
merged 1 commit into from
Feb 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions pkg/sdkserver/sdkserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -1243,27 +1243,35 @@ func (s *SDKServer) sendGameServerUpdate(gs *agonesv1.GameServer) {
s.streamMutex.Lock()
defer s.streamMutex.Unlock()

for i, stream := range s.connectedStreams {
// Filter the slice of streams sharing the same backing array and capacity as the original
// so that storage is reused and no memory allocations are made. This modifies the original
// slice.
//
// See https://go.dev/wiki/SliceTricks#filtering-without-allocating
remainingStreams := s.connectedStreams[:0]
zmerlynn marked this conversation as resolved.
Show resolved Hide resolved
for _, stream := range s.connectedStreams {
select {
case <-stream.Context().Done():
s.connectedStreams = append(s.connectedStreams[:i], s.connectedStreams[i+1:]...)
s.logger.Debug("Dropping stream")

err := stream.Context().Err()
switch {
case err != nil:
s.logger.WithError(errors.WithStack(err)).Error("stream closed with error")
default:
s.logger.Debug("stream closed")
s.logger.Debug("Stream closed")
}
continue
default:
}
s.logger.Debug("Keeping stream")
remainingStreams = append(remainingStreams, stream)

if err := stream.Send(convert(gs)); err != nil {
s.logger.WithError(errors.WithStack(err)).
Error("error sending game server update event")
if err := stream.Send(convert(gs)); err != nil {
s.logger.WithError(errors.WithStack(err)).
Error("error sending game server update event")
}
}
}
s.connectedStreams = remainingStreams

if gs.Status.State == agonesv1.GameServerStateShutdown {
// Wrap this in a go func(), just in case pushing to this channel deadlocks since there is only one instance of
Expand Down
42 changes: 21 additions & 21 deletions pkg/sdkserver/sdkserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -833,28 +833,28 @@ func TestSDKServer_SendGameServerUpdateRemovesDisconnectedStream(t *testing.T) {
return err == nil
}, time.Minute, time.Second, "Could not find the GameServer")

streamCtx, streamCancel := context.WithCancel(context.Background())
t.Cleanup(streamCancel)

// Trigger stream removal by sending an update on a cancelled stream.

stream := newGameServerMockStream()
stream.ctx = streamCtx

asyncWatchGameServer(t, sc, stream)
assert.Nil(t, waitConnectedStreamCount(sc, 1))

<-stream.msgs // Initial msg when WatchGameServer() is called.

streamCancel()

// Create and initialize two streams.
streamOne := newGameServerMockStream()
streamOneCtx, streamOneCancel := context.WithCancel(context.Background())
t.Cleanup(streamOneCancel)
streamOne.ctx = streamOneCtx
asyncWatchGameServer(t, sc, streamOne)

streamTwo := newGameServerMockStream()
streamTwoCtx, streamTwoCancel := context.WithCancel(context.Background())
t.Cleanup(streamTwoCancel)
streamTwo.ctx = streamTwoCtx
asyncWatchGameServer(t, sc, streamTwo)

// Verify that two streams are connected.
assert.Nil(t, waitConnectedStreamCount(sc, 2))
streamOneCancel()
streamTwoCancel()

// Trigger stream removal by sending a game server update.
sc.sendGameServerUpdate(fixture)

select {
case <-stream.msgs:
assert.Fail(t, "Event stream should have been removed.")
case <-time.After(1 * time.Second):
}
// Verify that zero streams are connected.
assert.Nil(t, waitConnectedStreamCount(sc, 0))
}

func TestSDKServerUpdateEventHandler(t *testing.T) {
Expand Down
Loading