Skip to content

Commit

Permalink
protobuf: Stop reflection client for graceful stream shutdown (#247)
Browse files Browse the repository at this point in the history
Currently, the reflection stream is closed abruptly when yab ends,
which looks like an error for intermediate proxies.

Add a Close method to the reflection source API and ensure we reset
the grpc client which stops the underlying stream to avoid this error.

For testing, use the grpc GracefulStop API, which blocks until all RPCs
are complete. If the test times out, then the reflection stream isn't
being shutdown correctly.
  • Loading branch information
prashantv committed Jan 30, 2019
1 parent 568faec commit 118981d
Show file tree
Hide file tree
Showing 7 changed files with 26 additions and 5 deletions.
2 changes: 1 addition & 1 deletion integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func setupGRPCServer(t *testing.T) (net.Addr, *grpc.Server) {

func TestGRPCReflectionSource(t *testing.T) {
addr, server := setupGRPCServer(t)
defer server.Stop()
defer server.GracefulStop()

tests := []struct {
desc string
Expand Down
2 changes: 2 additions & 0 deletions protobuf/filedescriptorsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,5 @@ func (fs *fileSource) FindSymbol(fullyQualifiedName string) (desc.Descriptor, er
}
return nil, fmt.Errorf("symbol not found: %q", fullyQualifiedName)
}

func (fs *fileSource) Close() {}
8 changes: 6 additions & 2 deletions protobuf/filedescriptorsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,12 @@ func TestNewDescriptorProviderFileDescriptorSetBins(t *testing.T) {
assert.Contains(t, err.Error(), tt.errMsg, "%v: invalid error", tt.name)
return
}
assert.NotNil(t, got)
assert.NoError(t, err)
require.NoError(t, err)
require.NotNil(t, got)

// Doesn't do anything, but is part of the API.
defer got.Close()

if tt.lookupSymbol != "" {
require.NotNil(t, got)
s, err := got.FindSymbol(tt.lookupSymbol)
Expand Down
2 changes: 2 additions & 0 deletions protobuf/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ import "github.com/jhump/protoreflect/desc"
type DescriptorProvider interface {
// FindSymbol returns a descriptor for the given fully-qualified symbol name.
FindSymbol(fullyQualifiedName string) (desc.Descriptor, error)

Close()
}
4 changes: 4 additions & 0 deletions protobuf/source_reflection.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,7 @@ func (s *grpcreflectSource) FindSymbol(fullyQualifiedName string) (desc.Descript
}
return file.FindSymbol(fullyQualifiedName), nil
}

func (s *grpcreflectSource) Close() {
s.client.Reset()
}
9 changes: 7 additions & 2 deletions protobuf/source_reflection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,20 @@ func TestReflection(t *testing.T) {
s := grpc.NewServer()
reflection.Register(s)
go s.Serve(ln)
defer s.Stop()

// Ensure that all streams are closed by the end of the test.
defer s.GracefulStop()

source, err := NewDescriptorProviderReflection(ReflectionArgs{
Timeout: time.Second,
Peers: []string{ln.Addr().String()},
})
assert.NoError(t, err)
require.NoError(t, err)
require.NotNil(t, source)

// Close the streaming reflect call to ensure GracefulStop doesn't block.
defer source.Close()

result, err := source.FindSymbol("grpc.reflection.v1alpha.ServerReflectionRequest")
assert.NoError(t, err)
assert.NotNil(t, result)
Expand Down
4 changes: 4 additions & 0 deletions request.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ func NewSerializer(opts Options) (encoding.Serializer, error) {
if err != nil {
return nil, err
}

// The descriptor is only used in the New function, so it's safe to defer Close.
defer descSource.Close()

return encoding.NewProtobuf(opts.ROpts.Procedure, descSource)
}

Expand Down

0 comments on commit 118981d

Please sign in to comment.