Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

release-22.1: util/tracing: fix crash in StreamClientInterceptor #80911

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions pkg/util/tracing/grpc_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,12 @@ func StreamClientInterceptor(tracer *Tracer, init func(*Span)) grpc.StreamClient
clientSpan.Finish()
return cs, err
}
return newTracingClientStream(cs, method, desc, clientSpan), nil
return newTracingClientStream(ctx, cs, desc, clientSpan), nil
}
}

func newTracingClientStream(
cs grpc.ClientStream, method string, desc *grpc.StreamDesc, clientSpan *Span,
ctx context.Context, cs grpc.ClientStream, desc *grpc.StreamDesc, clientSpan *Span,
) grpc.ClientStream {
finishChan := make(chan struct{})

Expand All @@ -386,8 +386,14 @@ func newTracingClientStream(
case <-finishChan:
// The client span is being finished by another code path; hence, no
// action is necessary.
case <-cs.Context().Done():
finishFunc(nil)
case <-ctx.Done():
// A streaming RPC can be finished by the caller cancelling the ctx. If
// the ctx is cancelled, the caller doesn't necessarily need to interact
// with the stream anymore (see [1]), so finishChan might never be
// signaled). Thus, we listen for ctx cancellation and finish the span.
//
// [1] https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream
finishFunc(nil /* err */)
}
}()
otcs := &tracingClientStream{
Expand Down
77 changes: 67 additions & 10 deletions pkg/util/tracing/grpc_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ package tracing_test
import (
"context"
"fmt"
"io"
"net"
"runtime"
"testing"

"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -141,7 +141,10 @@ func TestGRPCInterceptors(t *testing.T) {

for _, tc := range []struct {
name string
do func(context.Context) (*types.Any, error)
// expSpanName is the expected name of the RPC spans (client-side and
// server-side). If not specified, the test's name is used.
expSpanName string
do func(context.Context) (*types.Any, error)
}{
{
name: "UnaryUnary",
Expand All @@ -156,11 +159,45 @@ func TestGRPCInterceptors(t *testing.T) {
if err != nil {
return nil, err
}
any, err := sc.Recv()
if err := sc.CloseSend(); err != nil {
return nil, err
}
var firstResponse *types.Any
// Consume the stream fully, as mandated by the gRPC API.
for {
any, err := sc.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if firstResponse == nil {
firstResponse = any
}
}
return firstResponse, nil
},
},
{
// Test that cancelling the client's ctx finishes the client span. The
// client span is usually finished either when Recv() receives an error
// (e.g. when receiving an io.EOF after exhausting the stream). But the
// client is allowed to not read from the stream any more if it cancels
// the ctx.
name: "UnaryStream_ContextCancel",
expSpanName: "UnaryStream",
do: func(ctx context.Context) (*types.Any, error) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
sc, err := c.UnaryStream(ctx, unusedAny)
if err != nil {
return nil, err
}
return any, sc.CloseSend()
if err := sc.CloseSend(); err != nil {
return nil, err
}
return sc.Recv()
},
},
{
Expand All @@ -186,7 +223,24 @@ func TestGRPCInterceptors(t *testing.T) {
if err := sc.Send(unusedAny); err != nil {
return nil, err
}
return sc.Recv()
if err := sc.CloseSend(); err != nil {
return nil, err
}
var firstResponse *types.Any
// Consume the stream fully, as mandated by the gRPC API.
for {
any, err := sc.Recv()
if err == io.EOF {
break
}
if err != nil {
return nil, err
}
if firstResponse == nil {
firstResponse = any
}
}
return firstResponse, nil
},
},
} {
Expand Down Expand Up @@ -214,20 +268,23 @@ func TestGRPCInterceptors(t *testing.T) {
}
require.Equal(t, 1, n)

expSpanName := tc.expSpanName
if expSpanName == "" {
expSpanName = tc.name
}
exp := fmt.Sprintf(`
span: root
span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s
tags: span.kind=client
span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s
tags: span.kind=server
event: structured=magic-value`, tc.name)
event: structured=magic-value`, expSpanName)
require.NoError(t, tracing.CheckRecordedSpans(finalRecs, exp))
})
}
// Force a GC so that the finalizer for the stream client span runs and closes
// the span. Nothing else closes that span in this test. See
// newTracingClientStream().
runtime.GC()
// Check that all the RPC spans (client-side and server-side) have been
// closed. SucceedsSoon because the closing of the span is async (although
// immediate) in the ctx cancellation subtest.
testutils.SucceedsSoon(t, func() error {
return tr.VisitSpans(func(sp tracing.RegistrySpan) error {
rec := sp.GetFullRecording(tracing.RecordingVerbose)[0]
Expand Down