diff --git a/pkg/util/tracing/grpc_interceptor.go b/pkg/util/tracing/grpc_interceptor.go index d5ad14fcab8d..87247f42bd04 100644 --- a/pkg/util/tracing/grpc_interceptor.go +++ b/pkg/util/tracing/grpc_interceptor.go @@ -356,12 +356,12 @@ func StreamClientInterceptor(tracer *Tracer, init func(*Span)) grpc.StreamClient clientSpan.Finish() return cs, err } - return newTracingClientStream(cs, method, desc, clientSpan), nil + return newTracingClientStream(ctx, cs, desc, clientSpan), nil } } func newTracingClientStream( - cs grpc.ClientStream, method string, desc *grpc.StreamDesc, clientSpan *Span, + ctx context.Context, cs grpc.ClientStream, desc *grpc.StreamDesc, clientSpan *Span, ) grpc.ClientStream { finishChan := make(chan struct{}) @@ -386,8 +386,14 @@ func newTracingClientStream( case <-finishChan: // The client span is being finished by another code path; hence, no // action is necessary. - case <-cs.Context().Done(): - finishFunc(nil) + case <-ctx.Done(): + // A streaming RPC can be finished by the caller cancelling the ctx. If + // the ctx is cancelled, the caller doesn't necessarily need to interact + // with the stream anymore (see [1]), so finishChan might never be + // signaled). Thus, we listen for ctx cancellation and finish the span. + // + // [1] https://pkg.go.dev/google.golang.org/grpc#ClientConn.NewStream + finishFunc(nil /* err */) } }() otcs := &tracingClientStream{ diff --git a/pkg/util/tracing/grpc_interceptor_test.go b/pkg/util/tracing/grpc_interceptor_test.go index fa3637a06457..c8f9ae0fb3cc 100644 --- a/pkg/util/tracing/grpc_interceptor_test.go +++ b/pkg/util/tracing/grpc_interceptor_test.go @@ -13,8 +13,8 @@ package tracing_test import ( "context" "fmt" + "io" "net" - "runtime" "testing" "github.com/cockroachdb/cockroach/pkg/testutils" @@ -141,7 +141,10 @@ func TestGRPCInterceptors(t *testing.T) { for _, tc := range []struct { name string - do func(context.Context) (*types.Any, error) + // expSpanName is the expected name of the RPC spans (client-side and + // server-side). If not specified, the test's name is used. + expSpanName string + do func(context.Context) (*types.Any, error) }{ { name: "UnaryUnary", @@ -156,11 +159,45 @@ func TestGRPCInterceptors(t *testing.T) { if err != nil { return nil, err } - any, err := sc.Recv() + if err := sc.CloseSend(); err != nil { + return nil, err + } + var firstResponse *types.Any + // Consume the stream fully, as mandated by the gRPC API. + for { + any, err := sc.Recv() + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + if firstResponse == nil { + firstResponse = any + } + } + return firstResponse, nil + }, + }, + { + // Test that cancelling the client's ctx finishes the client span. The + // client span is usually finished either when Recv() receives an error + // (e.g. when receiving an io.EOF after exhausting the stream). But the + // client is allowed to not read from the stream any more if it cancels + // the ctx. + name: "UnaryStream_ContextCancel", + expSpanName: "UnaryStream", + do: func(ctx context.Context) (*types.Any, error) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + sc, err := c.UnaryStream(ctx, unusedAny) if err != nil { return nil, err } - return any, sc.CloseSend() + if err := sc.CloseSend(); err != nil { + return nil, err + } + return sc.Recv() }, }, { @@ -186,7 +223,24 @@ func TestGRPCInterceptors(t *testing.T) { if err := sc.Send(unusedAny); err != nil { return nil, err } - return sc.Recv() + if err := sc.CloseSend(); err != nil { + return nil, err + } + var firstResponse *types.Any + // Consume the stream fully, as mandated by the gRPC API. + for { + any, err := sc.Recv() + if err == io.EOF { + break + } + if err != nil { + return nil, err + } + if firstResponse == nil { + firstResponse = any + } + } + return firstResponse, nil }, }, } { @@ -214,20 +268,23 @@ func TestGRPCInterceptors(t *testing.T) { } require.Equal(t, 1, n) + expSpanName := tc.expSpanName + if expSpanName == "" { + expSpanName = tc.name + } exp := fmt.Sprintf(` span: root span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s tags: span.kind=client span: /cockroach.testutils.grpcutils.GRPCTest/%[1]s tags: span.kind=server - event: structured=magic-value`, tc.name) + event: structured=magic-value`, expSpanName) require.NoError(t, tracing.CheckRecordedSpans(finalRecs, exp)) }) } - // Force a GC so that the finalizer for the stream client span runs and closes - // the span. Nothing else closes that span in this test. See - // newTracingClientStream(). - runtime.GC() + // Check that all the RPC spans (client-side and server-side) have been + // closed. SucceedsSoon because the closing of the span is async (although + // immediate) in the ctx cancellation subtest. testutils.SucceedsSoon(t, func() error { return tr.VisitSpans(func(sp tracing.RegistrySpan) error { rec := sp.GetFullRecording(tracing.RecordingVerbose)[0]