Skip to content

Commit

Permalink
util/tracing: fix crash in StreamClientInterceptor
Browse files Browse the repository at this point in the history
Before this patch, our client-side tracing interceptor for streaming rpc
calls was exposed to gRPC bugs being currently fixed in
github.com/grpc/grpc-go/pull/5323. This had to do with calls to
clientStream.Context() panicking with an NPE under certain races with
RPCs failing. We've recently gotten two crashes seemingly because of
this. It's unclear why this hasn't shown up before, as nothing seems new
(either on our side or on the grpc side). In 22.2 we do use more
streaming RPCs than before (for example for span configs), so maybe that
does it.

This patch eliminates the problem by eliminating the
problematic call into ClientStream.Context(). The background is that
our interceptors needs to watch for ctx cancelation and consider the RPC
done at that point. But, there was no reason for that call; we can more
simply use the RPC caller's ctx for the purposes of figuring out if the
caller cancels the RPC. In fact, calling ClientStream.Context() is bad
for other reasons, beyond exposing us to the bug:
1) ClientStream.Context() pins the RPC attempt to a lower-level
connection, and inhibits gRPC's ability to sometimes transparently
retry failed calls. In fact, there's a comment on ClientStream.Context()
that tells you not to call it before using the stream through other
methods like Recv(), which imply that the RPC is already "pinned" and
transparent retries are no longer possible anyway. We were breaking
this.
2) One of the grpc-go maintainers suggested that, due to the bugs
reference above, this call could actually sometimes give us "the
wrong context", although how wrong exactly is not clear to me (i.e.
could we have gotten a ctx that doesn't inherit from the caller's ctx?
Or a ctx that's canceled independently from the caller?)

This patch also removes a paranoid catch-all finalizer in the
interceptor that assured that the RPC client span is always eventually
closed (at a random GC time), regardless of what the caller does - i.e.
even if the caller forgets about interacting with the response stream or
canceling the ctx.  This kind of paranoia is not needed. The code in
question was copied from grpc-opentracing[1], which quoted a
StackOverflow answer[2] about whether or not a client is allowed to
simply walk away from a streaming call. As a result of conversations
triggered by this patch [3], that SO answer was updated to reflect the fact
that it is not, in fact, OK for a client to do so, as it will leak gRPC
resources. The client's contract is specified in [4] (although arguably
that place is not the easiest to find by a casual gRPC user). In any
case, this patch gets rid of the finalizer. This could in theory result
in leaked spans if our own code is buggy in the way that the paranoia
prevented, but all our TestServers check that spans don't leak like that
so we are pretty protected. FWIW, a newer re-implementation of the
OpenTracing interceptor[4] doesn't have the finalizer (although it also
doesn't listen for ctx cancellation, so I think it's buggy), and neither
does the equivalent OpenTelemetry interceptor[6].

Fixes cockroachdb#80689

[1] https://github.com/grpc-ecosystem/grpc-opentracing/blob/8e809c8a86450a29b90dcc9efbf062d0fe6d9746/go/otgrpc/client.go#L174
[2] https://stackoverflow.com/questions/42915337/are-you-required-to-call-recv-until-you-get-io-eof-when-interacting-with-grpc-cl
[3] grpc/grpc-go#5324
[4] https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
[5] https://github.com/grpc-ecosystem/go-grpc-middleware/blob/master/tracing/opentracing/client_interceptors.go#L37-L52
[6] https://github.com/open-telemetry/opentelemetry-go-contrib/blame/main/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go#L193

Release note: A rare crash indicating a nil-pointer deference in
google.golang.org/grpc/internal/transport.(*Stream).Context(...)
was fixed.
  • Loading branch information
andreimatei committed May 3, 2022
1 parent 3672d03 commit 13a65d3
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 27 deletions.
28 changes: 11 additions & 17 deletions pkg/util/tracing/grpc_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ package tracing
import (
"context"
"io"
"runtime"
"strings"
"sync/atomic"

Expand Down Expand Up @@ -356,12 +355,12 @@ func StreamClientInterceptor(tracer *Tracer, init func(*Span)) grpc.StreamClient
clientSpan.Finish()
return cs, err
}
return newTracingClientStream(cs, method, desc, clientSpan), nil
return newTracingClientStream(ctx, cs, desc, clientSpan), nil
}
}

func newTracingClientStream(
cs grpc.ClientStream, method string, desc *grpc.StreamDesc, clientSpan *Span,
ctx context.Context, cs grpc.ClientStream, desc *grpc.StreamDesc, clientSpan *Span,
) grpc.ClientStream {
finishChan := make(chan struct{})

Expand All @@ -386,26 +385,21 @@ func newTracingClientStream(
case <-finishChan:
// The client span is being finished by another code path; hence, no
// action is necessary.
case <-cs.Context().Done():
finishFunc(nil)
case <-ctx.Done():
// A streaming RPC can be finished by the caller cancelling the ctx. If
// the ctx is cancelled, the caller doesn't necessarily need to interact
// with the stream anymore (see [1]), so finishChan might never be
// signaled). Thus, we listen for ctx cancellation and finish the span.
//
// [1] https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
finishFunc(nil /* err */)
}
}()
otcs := &tracingClientStream{
return &tracingClientStream{
ClientStream: cs,
desc: desc,
finishFunc: finishFunc,
}

// The `ClientStream` interface allows one to omit calling `Recv` if it's
// known that the result will be `io.EOF`. See
// http://stackoverflow.com/q/42915337
// In such cases, there's nothing that triggers the span to finish. We,
// therefore, set a finalizer so that the span and the context goroutine will
// at least be cleaned up when the garbage collector is run.
runtime.SetFinalizer(otcs, func(otcs *tracingClientStream) {
otcs.finishFunc(nil)
})
return otcs
}

type tracingClientStream struct {
Expand Down
77 changes: 67 additions & 10 deletions pkg/util/tracing/grpc_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ package tracing_test
import (
"context"
"fmt"
"io"
"net"
"runtime"
"testing"

"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -141,7 +141,10 @@ func TestGRPCInterceptors(t *testing.T) {

for _, tc := range []struct {
name string
do func(context.Context) (*types.Any, error)
// expSpanName is the expected name of the RPC spans (client-side and
// server-side). If not specified, the test's name is used.
expSpanName string
do func(context.Context) (*types.Any, error)
}{
{
name: "UnaryUnary",
Expand All @@ -156,11 +159,45 @@ func TestGRPCInterceptors(t *testing.T) {
if err != nil {
return nil, err
}
any, err := sc.Recv()
if err := sc.CloseSend(); err != nil {
return nil, err
}
var firstResponse *types.Any
// Consume the stream fully, as mandated by the gRPC API.
for {
any, err := sc.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if firstResponse == nil {
firstResponse = any
}
}
return firstResponse, nil
},
},
{
// Test that cancelling the client's ctx finishes the client span. The
// client span is usually finished either when Recv() receives an error
// (e.g. when receiving an io.EOF after exhausting the stream). But the
// client is allowed to not read from the stream any more if it cancels
// the ctx.
name: "UnaryStream_ContextCancel",
expSpanName: "UnaryStream",
do: func(ctx context.Context) (*types.Any, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sc, err := c.UnaryStream(ctx, unusedAny)
if err != nil {
return nil, err
}
return any, sc.CloseSend()
if err := sc.CloseSend(); err != nil {
return nil, err
}
return sc.Recv()
},
},
{
Expand All @@ -186,7 +223,24 @@ func TestGRPCInterceptors(t *testing.T) {
if err := sc.Send(unusedAny); err != nil {
return nil, err
}
return sc.Recv()
if err := sc.CloseSend(); err != nil {
return nil, err
}
var firstResponse *types.Any
// Consume the stream fully, as mandated by the gRPC API.
for {
any, err := sc.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if firstResponse == nil {
firstResponse = any
}
}
return firstResponse, nil
},
},
} {
Expand Down Expand Up @@ -214,20 +268,23 @@ func TestGRPCInterceptors(t *testing.T) {
}
require.Equal(t, 1, n)

expSpanName := tc.expSpanName
if expSpanName == "" {
expSpanName = tc.name
}
exp := fmt.Sprintf(`
span: root
span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s
tags: span.kind=client
span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s
tags: span.kind=server
event: structured=magic-value`, tc.name)
event: structured=magic-value`, expSpanName)
require.NoError(t, tracing.CheckRecordedSpans(finalRecs, exp))
})
}
// Force a GC so that the finalizer for the stream client span runs and closes
// the span. Nothing else closes that span in this test. See
// newTracingClientStream().
runtime.GC()
// Check that all the RPC spans (client-side and server-side) have been
// closed. SucceedsSoon because the closing of the span is async (although
// immediate) in the ctx cancellation subtest.
testutils.SucceedsSoon(t, func() error {
return tr.VisitSpans(func(sp tracing.RegistrySpan) error {
rec := sp.GetFullRecording(tracing.RecordingVerbose)[0]
Expand Down

0 comments on commit 13a65d3

Please sign in to comment.