Skip to content

Commit

Permalink
Merge pull request #80911 from andreimatei/backport.tracing.fix-strea…
Browse files Browse the repository at this point in the history
…m-interceptor

release-22.1: util/tracing: fix crash in StreamClientInterceptor
  • Loading branch information
andreimatei authored May 4, 2022
2 parents 2d4e909 + 0402ccb commit 0a7883a
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 14 deletions.
14 changes: 10 additions & 4 deletions pkg/util/tracing/grpc_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,12 @@ func StreamClientInterceptor(tracer *Tracer, init func(*Span)) grpc.StreamClient
clientSpan.Finish()
return cs, err
}
return newTracingClientStream(cs, method, desc, clientSpan), nil
return newTracingClientStream(ctx, cs, desc, clientSpan), nil
}
}

func newTracingClientStream(
cs grpc.ClientStream, method string, desc *grpc.StreamDesc, clientSpan *Span,
ctx context.Context, cs grpc.ClientStream, desc *grpc.StreamDesc, clientSpan *Span,
) grpc.ClientStream {
finishChan := make(chan struct{})

Expand All @@ -386,8 +386,14 @@ func newTracingClientStream(
case <-finishChan:
// The client span is being finished by another code path; hence, no
// action is necessary.
case <-cs.Context().Done():
finishFunc(nil)
case <-ctx.Done():
// A streaming RPC can be finished by the caller cancelling the ctx. If
// the ctx is cancelled, the caller doesn't necessarily need to interact
// with the stream anymore (see [1]), so finishChan might never be
// signaled). Thus, we listen for ctx cancellation and finish the span.
//
// [1] https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
finishFunc(nil /* err */)
}
}()
otcs := &tracingClientStream{
Expand Down
77 changes: 67 additions & 10 deletions pkg/util/tracing/grpc_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ package tracing_test
import (
"context"
"fmt"
"io"
"net"
"runtime"
"testing"

"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -141,7 +141,10 @@ func TestGRPCInterceptors(t *testing.T) {

for _, tc := range []struct {
name string
do func(context.Context) (*types.Any, error)
// expSpanName is the expected name of the RPC spans (client-side and
// server-side). If not specified, the test's name is used.
expSpanName string
do func(context.Context) (*types.Any, error)
}{
{
name: "UnaryUnary",
Expand All @@ -156,11 +159,45 @@ func TestGRPCInterceptors(t *testing.T) {
if err != nil {
return nil, err
}
any, err := sc.Recv()
if err := sc.CloseSend(); err != nil {
return nil, err
}
var firstResponse *types.Any
// Consume the stream fully, as mandated by the gRPC API.
for {
any, err := sc.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if firstResponse == nil {
firstResponse = any
}
}
return firstResponse, nil
},
},
{
// Test that cancelling the client's ctx finishes the client span. The
// client span is usually finished either when Recv() receives an error
// (e.g. when receiving an io.EOF after exhausting the stream). But the
// client is allowed to not read from the stream any more if it cancels
// the ctx.
name: "UnaryStream_ContextCancel",
expSpanName: "UnaryStream",
do: func(ctx context.Context) (*types.Any, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sc, err := c.UnaryStream(ctx, unusedAny)
if err != nil {
return nil, err
}
return any, sc.CloseSend()
if err := sc.CloseSend(); err != nil {
return nil, err
}
return sc.Recv()
},
},
{
Expand All @@ -186,7 +223,24 @@ func TestGRPCInterceptors(t *testing.T) {
if err := sc.Send(unusedAny); err != nil {
return nil, err
}
return sc.Recv()
if err := sc.CloseSend(); err != nil {
return nil, err
}
var firstResponse *types.Any
// Consume the stream fully, as mandated by the gRPC API.
for {
any, err := sc.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if firstResponse == nil {
firstResponse = any
}
}
return firstResponse, nil
},
},
} {
Expand Down Expand Up @@ -214,20 +268,23 @@ func TestGRPCInterceptors(t *testing.T) {
}
require.Equal(t, 1, n)

expSpanName := tc.expSpanName
if expSpanName == "" {
expSpanName = tc.name
}
exp := fmt.Sprintf(`
span: root
span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s
tags: span.kind=client
span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s
tags: span.kind=server
event: structured=magic-value`, tc.name)
event: structured=magic-value`, expSpanName)
require.NoError(t, tracing.CheckRecordedSpans(finalRecs, exp))
})
}
// Force a GC so that the finalizer for the stream client span runs and closes
// the span. Nothing else closes that span in this test. See
// newTracingClientStream().
runtime.GC()
// Check that all the RPC spans (client-side and server-side) have been
// closed. SucceedsSoon because the closing of the span is async (although
// immediate) in the ctx cancellation subtest.
testutils.SucceedsSoon(t, func() error {
return tr.VisitSpans(func(sp tracing.RegistrySpan) error {
rec := sp.GetFullRecording(tracing.RecordingVerbose)[0]
Expand Down

0 comments on commit 0a7883a

Please sign in to comment.