Skip to content

Commit

Permalink
util/tracing: fix crash in StreamClientInterceptor
Browse files Browse the repository at this point in the history
Before this patch, our client-side tracing interceptor for streaming rpc
calls was exposed to gRPC bugs being currently fixed in
github.com/grpc/grpc-go/pull/5323. This had to do with calls to
clientStream.Context() panicking with an NPE under certain races with
RPCs failing. We've recently gotten two crashes seemingly because of
this. It's unclear why this hasn't shown up before, as nothing seems new
(either on our side or on the grpc side). In 22.2 we do use more
streaming RPCs than before (for example for span configs), so maybe that
does it.

This patch eliminates the problem by eliminating the
problematic call into ClientStream.Context(). The background is that
our interceptors needs to watch for ctx cancelation and consider the RPC
done at that point. But, there was no reason for that call; we can more
simply use the RPC caller's ctx for the purposes of figuring out if the
caller cancels the RPC. In fact, calling ClientStream.Context() is bad
for other reasons, beyond exposing us to the bug:
1) ClientStream.Context() pins the RPC attempt to a lower-level
connection, and inhibits gRPC's ability to sometimes transparently
retry failed calls. In fact, there's a comment on ClientStream.Context()
that tells you not to call it before using the stream through other
methods like Recv(), which imply that the RPC is already "pinned" and
transparent retries are no longer possible anyway. We were breaking
this.
2) One of the grpc-go maintainers suggested that, due to the bugs
reference above, this call could actually sometimes give us "the
wrong context", although how wrong exactly is not clear to me (i.e.
could we have gotten a ctx that doesn't inherit from the caller's ctx?
Or a ctx that's canceled independently from the caller?)

Release note: A rare crash indicating a nil-pointer deference in
google.golang.org/grpc/internal/transport.(*Stream).Context(...)
was fixed.
  • Loading branch information
andreimatei committed May 3, 2022
1 parent a040247 commit 0402ccb
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 14 deletions.
14 changes: 10 additions & 4 deletions pkg/util/tracing/grpc_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,12 @@ func StreamClientInterceptor(tracer *Tracer, init func(*Span)) grpc.StreamClient
clientSpan.Finish()
return cs, err
}
return newTracingClientStream(cs, method, desc, clientSpan), nil
return newTracingClientStream(ctx, cs, desc, clientSpan), nil
}
}

func newTracingClientStream(
cs grpc.ClientStream, method string, desc *grpc.StreamDesc, clientSpan *Span,
ctx context.Context, cs grpc.ClientStream, desc *grpc.StreamDesc, clientSpan *Span,
) grpc.ClientStream {
finishChan := make(chan struct{})

Expand All @@ -386,8 +386,14 @@ func newTracingClientStream(
case <-finishChan:
// The client span is being finished by another code path; hence, no
// action is necessary.
case <-cs.Context().Done():
finishFunc(nil)
case <-ctx.Done():
// A streaming RPC can be finished by the caller cancelling the ctx. If
// the ctx is cancelled, the caller doesn't necessarily need to interact
// with the stream anymore (see [1]), so finishChan might never be
// signaled). Thus, we listen for ctx cancellation and finish the span.
//
// [1] https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
finishFunc(nil /* err */)
}
}()
otcs := &tracingClientStream{
Expand Down
77 changes: 67 additions & 10 deletions pkg/util/tracing/grpc_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ package tracing_test
import (
"context"
"fmt"
"io"
"net"
"runtime"
"testing"

"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -141,7 +141,10 @@ func TestGRPCInterceptors(t *testing.T) {

for _, tc := range []struct {
name string
do func(context.Context) (*types.Any, error)
// expSpanName is the expected name of the RPC spans (client-side and
// server-side). If not specified, the test's name is used.
expSpanName string
do func(context.Context) (*types.Any, error)
}{
{
name: "UnaryUnary",
Expand All @@ -156,11 +159,45 @@ func TestGRPCInterceptors(t *testing.T) {
if err != nil {
return nil, err
}
any, err := sc.Recv()
if err := sc.CloseSend(); err != nil {
return nil, err
}
var firstResponse *types.Any
// Consume the stream fully, as mandated by the gRPC API.
for {
any, err := sc.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if firstResponse == nil {
firstResponse = any
}
}
return firstResponse, nil
},
},
{
// Test that cancelling the client's ctx finishes the client span. The
// client span is usually finished either when Recv() receives an error
// (e.g. when receiving an io.EOF after exhausting the stream). But the
// client is allowed to not read from the stream any more if it cancels
// the ctx.
name: "UnaryStream_ContextCancel",
expSpanName: "UnaryStream",
do: func(ctx context.Context) (*types.Any, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sc, err := c.UnaryStream(ctx, unusedAny)
if err != nil {
return nil, err
}
return any, sc.CloseSend()
if err := sc.CloseSend(); err != nil {
return nil, err
}
return sc.Recv()
},
},
{
Expand All @@ -186,7 +223,24 @@ func TestGRPCInterceptors(t *testing.T) {
if err := sc.Send(unusedAny); err != nil {
return nil, err
}
return sc.Recv()
if err := sc.CloseSend(); err != nil {
return nil, err
}
var firstResponse *types.Any
// Consume the stream fully, as mandated by the gRPC API.
for {
any, err := sc.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if firstResponse == nil {
firstResponse = any
}
}
return firstResponse, nil
},
},
} {
Expand Down Expand Up @@ -214,20 +268,23 @@ func TestGRPCInterceptors(t *testing.T) {
}
require.Equal(t, 1, n)

expSpanName := tc.expSpanName
if expSpanName == "" {
expSpanName = tc.name
}
exp := fmt.Sprintf(`
span: root
span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s
tags: span.kind=client
span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s
tags: span.kind=server
event: structured=magic-value`, tc.name)
event: structured=magic-value`, expSpanName)
require.NoError(t, tracing.CheckRecordedSpans(finalRecs, exp))
})
}
// Force a GC so that the finalizer for the stream client span runs and closes
// the span. Nothing else closes that span in this test. See
// newTracingClientStream().
runtime.GC()
// Check that all the RPC spans (client-side and server-side) have been
// closed. SucceedsSoon because the closing of the span is async (although
// immediate) in the ctx cancellation subtest.
testutils.SucceedsSoon(t, func() error {
return tr.VisitSpans(func(sp tracing.RegistrySpan) error {
rec := sp.GetFullRecording(tracing.RecordingVerbose)[0]
Expand Down

0 comments on commit 0402ccb

Please sign in to comment.