From afcff9417b4a5fb4fc8733c9313ef3fe0f57b913 Mon Sep 17 00:00:00 2001 From: Mitch Dempsey Date: Thu, 12 Sep 2024 03:26:25 -0700 Subject: [PATCH 1/2] contrib/aws/aws-sdk-go-v2/aws: Fix streamName nil pointer panic (#2846) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Dario Castañé --- contrib/aws/aws-sdk-go-v2/aws/aws.go | 31 +++++++++--- contrib/aws/aws-sdk-go-v2/aws/aws_test.go | 62 +++++++++++++++++++++++ 2 files changed, 85 insertions(+), 8 deletions(-) diff --git a/contrib/aws/aws-sdk-go-v2/aws/aws.go b/contrib/aws/aws-sdk-go-v2/aws/aws.go index 7a1d0b4e6a..f193914cbd 100644 --- a/contrib/aws/aws-sdk-go-v2/aws/aws.go +++ b/contrib/aws/aws-sdk-go-v2/aws/aws.go @@ -229,21 +229,23 @@ func tableName(requestInput middleware.InitializeInput) string { func streamName(requestInput middleware.InitializeInput) string { switch params := requestInput.Parameters.(type) { case *kinesis.PutRecordInput: - return *params.StreamName + return coalesceNameOrArnResource(params.StreamName, params.StreamARN) case *kinesis.PutRecordsInput: - return *params.StreamName + return coalesceNameOrArnResource(params.StreamName, params.StreamARN) case *kinesis.AddTagsToStreamInput: - return *params.StreamName + return coalesceNameOrArnResource(params.StreamName, params.StreamARN) case *kinesis.RemoveTagsFromStreamInput: - return *params.StreamName + return coalesceNameOrArnResource(params.StreamName, params.StreamARN) case *kinesis.CreateStreamInput: - return *params.StreamName + if params.StreamName != nil { + return *params.StreamName + } case *kinesis.DeleteStreamInput: - return *params.StreamName + return coalesceNameOrArnResource(params.StreamName, params.StreamARN) case *kinesis.DescribeStreamInput: - return *params.StreamName + return coalesceNameOrArnResource(params.StreamName, params.StreamARN) case *kinesis.DescribeStreamSummaryInput: - return *params.StreamName + return coalesceNameOrArnResource(params.StreamName, params.StreamARN) case *kinesis.GetRecordsInput: if params.StreamARN != nil { streamArnValue := *params.StreamARN @@ -353,3 +355,16 @@ func serviceName(cfg *config, awsService string) string { defaultName := fmt.Sprintf("aws.%s", awsService) return namingschema.ServiceNameOverrideV0(defaultName, defaultName) } + +func coalesceNameOrArnResource(name *string, arnVal *string) string { + if name != nil { + return *name + } + + if arnVal != nil { + parts := strings.Split(*arnVal, "/") + return parts[len(parts)-1] + } + + return "" +} diff --git a/contrib/aws/aws-sdk-go-v2/aws/aws_test.go b/contrib/aws/aws-sdk-go-v2/aws/aws_test.go index 3c45d1a1a7..88fba42c5d 100644 --- a/contrib/aws/aws-sdk-go-v2/aws/aws_test.go +++ b/contrib/aws/aws-sdk-go-v2/aws/aws_test.go @@ -30,6 +30,7 @@ import ( "github.com/aws/aws-sdk-go-v2/service/sns" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" + "github.com/aws/smithy-go/middleware" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -1077,3 +1078,64 @@ func TestWithErrorCheck(t *testing.T) { }) } } + +func TestStreamName(t *testing.T) { + dummyName := `my-stream` + dummyArn := `arn:aws:kinesis:us-east-1:111111111111:stream/` + dummyName + + tests := []struct { + name string + input any + expected string + }{ + { + name: "PutRecords with ARN", + input: &kinesis.PutRecordsInput{StreamARN: &dummyArn}, + expected: dummyName, + }, + { + name: "PutRecords with Name", + input: &kinesis.PutRecordsInput{StreamName: &dummyName}, + expected: dummyName, + }, + { + name: "PutRecords with both", + input: &kinesis.PutRecordsInput{StreamName: &dummyName, StreamARN: &dummyArn}, + expected: dummyName, + }, + { + name: "PutRecord with Name", + input: &kinesis.PutRecordInput{StreamName: &dummyName}, + expected: dummyName, + }, + { + name: "CreateStream", + input: &kinesis.CreateStreamInput{StreamName: &dummyName}, + expected: dummyName, + }, + { + name: "CreateStream with nothing", + input: &kinesis.CreateStreamInput{}, + expected: "", + }, + { + name: "GetRecords", + input: &kinesis.GetRecordsInput{StreamARN: &dummyArn}, + expected: dummyName, + }, + { + name: "GetRecords with nothing", + input: &kinesis.GetRecordsInput{}, + expected: "", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + req := middleware.InitializeInput{ + Parameters: tt.input, + } + val := streamName(req) + assert.Equal(t, tt.expected, val) + }) + } +} From e081e4a9bf1755e09210707972cfaf2e6b77d8ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rodrigo=20Arg=C3=BCello?= Date: Thu, 12 Sep 2024 17:02:32 +0200 Subject: [PATCH 2/2] contrib/cloud.google.com/go/pubsub.v1: split tracing code (#2852) --- .../go/pubsub.v1/internal/tracing/config.go | 43 +++++++ .../go/pubsub.v1/internal/tracing/tracing.go | 120 ++++++++++++++++++ .../cloud.google.com/go/pubsub.v1/option.go | 36 +----- .../cloud.google.com/go/pubsub.v1/pubsub.go | 108 ++++------------ .../internal/telemetrytest/telemetry_test.go | 34 +++-- 5 files changed, 221 insertions(+), 120 deletions(-) create mode 100644 contrib/cloud.google.com/go/pubsub.v1/internal/tracing/config.go create mode 100644 contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go diff --git a/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/config.go b/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/config.go new file mode 100644 index 0000000000..b5b04b19a8 --- /dev/null +++ b/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/config.go @@ -0,0 +1,43 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package tracing + +import ( + "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema" +) + +type config struct { + serviceName string + publishSpanName string + receiveSpanName string + measured bool +} + +func defaultConfig() *config { + return &config{ + serviceName: namingschema.ServiceNameOverrideV0("", ""), + publishSpanName: namingschema.OpName(namingschema.GCPPubSubOutbound), + receiveSpanName: namingschema.OpName(namingschema.GCPPubSubInbound), + measured: false, + } +} + +// Option is used to customize spans started by WrapReceiveHandler or Publish. +type Option func(cfg *config) + +// WithServiceName sets the service name tag for traces started by WrapReceiveHandler or Publish. +func WithServiceName(serviceName string) Option { + return func(cfg *config) { + cfg.serviceName = serviceName + } +} + +// WithMeasured sets the measured tag for traces started by WrapReceiveHandler or Publish. +func WithMeasured() Option { + return func(cfg *config) { + cfg.measured = true + } +} diff --git a/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go b/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go new file mode 100644 index 0000000000..e3331c90e1 --- /dev/null +++ b/contrib/cloud.google.com/go/pubsub.v1/internal/tracing/tracing.go @@ -0,0 +1,120 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2024 Datadog, Inc. + +package tracing + +import ( + "context" + "sync" + "time" + + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" + "gopkg.in/DataDog/dd-trace-go.v1/internal/log" + "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" +) + +const componentName = "cloud.google.com/go/pubsub.v1" + +func init() { + telemetry.LoadIntegration(componentName) + tracer.MarkIntegrationImported(componentName) +} + +type Message struct { + ID string + Data []byte + OrderingKey string + Attributes map[string]string + DeliveryAttempt *int + PublishTime time.Time +} + +type Topic interface { + String() string +} + +type Subscription interface { + String() string +} + +func TracePublish(ctx context.Context, topic Topic, msg *Message, opts ...Option) (context.Context, func(serverID string, err error)) { + cfg := defaultConfig() + for _, opt := range opts { + opt(cfg) + } + spanOpts := []ddtrace.StartSpanOption{ + tracer.ResourceName(topic.String()), + tracer.SpanType(ext.SpanTypeMessageProducer), + tracer.Tag("message_size", len(msg.Data)), + tracer.Tag("ordering_key", msg.OrderingKey), + tracer.Tag(ext.Component, componentName), + tracer.Tag(ext.SpanKind, ext.SpanKindProducer), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemGCPPubsub), + } + if cfg.serviceName != "" { + spanOpts = append(spanOpts, tracer.ServiceName(cfg.serviceName)) + } + if cfg.measured { + spanOpts = append(spanOpts, tracer.Measured()) + } + span, ctx := tracer.StartSpanFromContext( + ctx, + cfg.publishSpanName, + spanOpts..., + ) + if msg.Attributes == nil { + msg.Attributes = make(map[string]string) + } + if err := tracer.Inject(span.Context(), tracer.TextMapCarrier(msg.Attributes)); err != nil { + log.Debug("contrib/cloud.google.com/go/pubsub.v1/trace: failed injecting tracing attributes: %v", err) + } + span.SetTag("num_attributes", len(msg.Attributes)) + + var once sync.Once + closeSpan := func(serverID string, err error) { + once.Do(func() { + span.SetTag("server_id", serverID) + span.Finish(tracer.WithError(err)) + }) + } + return ctx, closeSpan +} + +func TraceReceiveFunc(s Subscription, opts ...Option) func(ctx context.Context, msg *Message) (context.Context, func()) { + cfg := defaultConfig() + for _, opt := range opts { + opt(cfg) + } + log.Debug("contrib/cloud.google.com/go/pubsub.v1/trace: Wrapping Receive Handler: %#v", cfg) + return func(ctx context.Context, msg *Message) (context.Context, func()) { + parentSpanCtx, _ := tracer.Extract(tracer.TextMapCarrier(msg.Attributes)) + opts := []ddtrace.StartSpanOption{ + tracer.ResourceName(s.String()), + tracer.SpanType(ext.SpanTypeMessageConsumer), + tracer.Tag("message_size", len(msg.Data)), + tracer.Tag("num_attributes", len(msg.Attributes)), + tracer.Tag("ordering_key", msg.OrderingKey), + tracer.Tag("message_id", msg.ID), + tracer.Tag("publish_time", msg.PublishTime.String()), + tracer.Tag(ext.Component, componentName), + tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), + tracer.Tag(ext.MessagingSystem, ext.MessagingSystemGCPPubsub), + tracer.ChildOf(parentSpanCtx), + } + if cfg.serviceName != "" { + opts = append(opts, tracer.ServiceName(cfg.serviceName)) + } + if cfg.measured { + opts = append(opts, tracer.Measured()) + } + span, ctx := tracer.StartSpanFromContext(ctx, cfg.receiveSpanName, opts...) + if msg.DeliveryAttempt != nil { + span.SetTag("delivery_attempt", *msg.DeliveryAttempt) + } + return ctx, func() { span.Finish() } + } +} diff --git a/contrib/cloud.google.com/go/pubsub.v1/option.go b/contrib/cloud.google.com/go/pubsub.v1/option.go index 3e8d8b3c29..3820859a12 100644 --- a/contrib/cloud.google.com/go/pubsub.v1/option.go +++ b/contrib/cloud.google.com/go/pubsub.v1/option.go @@ -6,41 +6,17 @@ package pubsub import ( - "gopkg.in/DataDog/dd-trace-go.v1/internal/namingschema" + "gopkg.in/DataDog/dd-trace-go.v1/contrib/cloud.google.com/go/pubsub.v1/internal/tracing" ) -type config struct { - serviceName string - publishSpanName string - receiveSpanName string - measured bool -} +// Option is used to customize spans started by WrapReceiveHandler or Publish. +type Option = tracing.Option -func defaultConfig() *config { - return &config{ - serviceName: namingschema.ServiceNameOverrideV0("", ""), - publishSpanName: namingschema.OpName(namingschema.GCPPubSubOutbound), - receiveSpanName: namingschema.OpName(namingschema.GCPPubSubInbound), - measured: false, - } -} - -// A Option is used to customize spans started by WrapReceiveHandler or Publish. -type Option func(cfg *config) - -// A ReceiveOption has been deprecated in favor of Option. +// Deprecated: ReceiveOption has been deprecated in favor of Option. type ReceiveOption = Option // WithServiceName sets the service name tag for traces started by WrapReceiveHandler or Publish. -func WithServiceName(serviceName string) Option { - return func(cfg *config) { - cfg.serviceName = serviceName - } -} +var WithServiceName = tracing.WithServiceName // WithMeasured sets the measured tag for traces started by WrapReceiveHandler or Publish. -func WithMeasured() Option { - return func(cfg *config) { - cfg.measured = true - } -} +var WithMeasured = tracing.WithMeasured diff --git a/contrib/cloud.google.com/go/pubsub.v1/pubsub.go b/contrib/cloud.google.com/go/pubsub.v1/pubsub.go index feb10a860e..1f01965090 100644 --- a/contrib/cloud.google.com/go/pubsub.v1/pubsub.go +++ b/contrib/cloud.google.com/go/pubsub.v1/pubsub.go @@ -8,23 +8,11 @@ package pubsub import ( "context" - "sync" - - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" - "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" - "gopkg.in/DataDog/dd-trace-go.v1/internal/log" - "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" "cloud.google.com/go/pubsub" -) - -const componentName = "cloud.google.com/go/pubsub.v1" -func init() { - telemetry.LoadIntegration(componentName) - tracer.MarkIntegrationImported(componentName) -} + "gopkg.in/DataDog/dd-trace-go.v1/contrib/cloud.google.com/go/pubsub.v1/internal/tracing" +) // Publish publishes a message on the specified topic and returns a PublishResult. // This function is functionally equivalent to t.Publish(ctx, msg), but it also starts a publish @@ -33,58 +21,27 @@ func init() { // It is required to call (*PublishResult).Get(ctx) on the value returned by Publish to complete // the span. func Publish(ctx context.Context, t *pubsub.Topic, msg *pubsub.Message, opts ...Option) *PublishResult { - cfg := defaultConfig() - for _, opt := range opts { - opt(cfg) - } - spanOpts := []ddtrace.StartSpanOption{ - tracer.ResourceName(t.String()), - tracer.SpanType(ext.SpanTypeMessageProducer), - tracer.Tag("message_size", len(msg.Data)), - tracer.Tag("ordering_key", msg.OrderingKey), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindProducer), - tracer.Tag(ext.MessagingSystem, ext.MessagingSystemGCPPubsub), - } - if cfg.serviceName != "" { - spanOpts = append(spanOpts, tracer.ServiceName(cfg.serviceName)) - } - if cfg.measured { - spanOpts = append(spanOpts, tracer.Measured()) - } - span, ctx := tracer.StartSpanFromContext( - ctx, - cfg.publishSpanName, - spanOpts..., - ) - if msg.Attributes == nil { - msg.Attributes = make(map[string]string) - } - if err := tracer.Inject(span.Context(), tracer.TextMapCarrier(msg.Attributes)); err != nil { - log.Debug("contrib/cloud.google.com/go/pubsub.v1/: failed injecting tracing attributes: %v", err) - } - span.SetTag("num_attributes", len(msg.Attributes)) + traceMsg := newTraceMessage(msg) + ctx, closeSpan := tracing.TracePublish(ctx, t, traceMsg, opts...) + msg.Attributes = traceMsg.Attributes + return &PublishResult{ PublishResult: t.Publish(ctx, msg), - span: span, + closeSpan: closeSpan, } } // PublishResult wraps *pubsub.PublishResult type PublishResult struct { *pubsub.PublishResult - once sync.Once - span tracer.Span + closeSpan func(serverID string, err error) } // Get wraps (pubsub.PublishResult).Get(ctx). When this function returns the publish // span created in Publish is completed. func (r *PublishResult) Get(ctx context.Context) (string, error) { serverID, err := r.PublishResult.Get(ctx) - r.once.Do(func() { - r.span.SetTag("server_id", serverID) - r.span.Finish(tracer.WithError(err)) - }) + r.closeSpan(serverID, err) return serverID, err } @@ -92,37 +49,24 @@ func (r *PublishResult) Get(ctx context.Context) (string, error) { // extracts any tracing metadata attached to the received message, and starts a // receive span. func WrapReceiveHandler(s *pubsub.Subscription, f func(context.Context, *pubsub.Message), opts ...Option) func(context.Context, *pubsub.Message) { - cfg := defaultConfig() - for _, opt := range opts { - opt(cfg) - } - log.Debug("contrib/cloud.google.com/go/pubsub.v1: Wrapping Receive Handler: %#v", cfg) + traceFn := tracing.TraceReceiveFunc(s, opts...) return func(ctx context.Context, msg *pubsub.Message) { - parentSpanCtx, _ := tracer.Extract(tracer.TextMapCarrier(msg.Attributes)) - opts := []ddtrace.StartSpanOption{ - tracer.ResourceName(s.String()), - tracer.SpanType(ext.SpanTypeMessageConsumer), - tracer.Tag("message_size", len(msg.Data)), - tracer.Tag("num_attributes", len(msg.Attributes)), - tracer.Tag("ordering_key", msg.OrderingKey), - tracer.Tag("message_id", msg.ID), - tracer.Tag("publish_time", msg.PublishTime.String()), - tracer.Tag(ext.Component, componentName), - tracer.Tag(ext.SpanKind, ext.SpanKindConsumer), - tracer.Tag(ext.MessagingSystem, ext.MessagingSystemGCPPubsub), - tracer.ChildOf(parentSpanCtx), - } - if cfg.serviceName != "" { - opts = append(opts, tracer.ServiceName(cfg.serviceName)) - } - if cfg.measured { - opts = append(opts, tracer.Measured()) - } - span, ctx := tracer.StartSpanFromContext(ctx, cfg.receiveSpanName, opts...) - if msg.DeliveryAttempt != nil { - span.SetTag("delivery_attempt", *msg.DeliveryAttempt) - } - defer span.Finish() + ctx, closeSpan := traceFn(ctx, newTraceMessage(msg)) + defer closeSpan() f(ctx, msg) } } + +func newTraceMessage(msg *pubsub.Message) *tracing.Message { + if msg == nil { + return nil + } + return &tracing.Message{ + ID: msg.ID, + Data: msg.Data, + OrderingKey: msg.OrderingKey, + Attributes: msg.Attributes, + DeliveryAttempt: msg.DeliveryAttempt, + PublishTime: msg.PublishTime, + } +} diff --git a/contrib/internal/telemetrytest/telemetry_test.go b/contrib/internal/telemetrytest/telemetry_test.go index 9dd22278f5..a203228fc2 100644 --- a/contrib/internal/telemetrytest/telemetry_test.go +++ b/contrib/internal/telemetrytest/telemetry_test.go @@ -6,6 +6,7 @@ package telemetrytest import ( "encoding/json" + "os" "os/exec" "strings" "testing" @@ -39,36 +40,53 @@ type contribPkg struct { var TelemetryImport = "gopkg.in/DataDog/dd-trace-go.v1/internal/telemetry" -func (p *contribPkg) hasTelemetryImport() bool { +func readPackage(t *testing.T, path string) contribPkg { + cmd := exec.Command("go", "list", "-json", path) + cmd.Stderr = os.Stderr + output, err := cmd.Output() + require.NoError(t, err) + p := contribPkg{} + err = json.Unmarshal(output, &p) + require.NoError(t, err) + return p +} + +func (p *contribPkg) hasTelemetryImport(t *testing.T) bool { for _, imp := range p.Imports { if imp == TelemetryImport { return true } } + // if we didn't find it imported directly, it might be imported in one of sub-package imports + for _, imp := range p.Imports { + if strings.HasPrefix(imp, p.ImportPath) { + p := readPackage(t, imp) + if p.hasTelemetryImport(t) { + return true + } + } + } return false } // TestTelemetryEnabled verifies that the expected contrib packages leverage instrumentation telemetry func TestTelemetryEnabled(t *testing.T) { body, err := exec.Command("go", "list", "-json", "../../...").Output() - if err != nil { - t.Fatalf(err.Error()) - } + require.NoError(t, err) + var packages []contribPkg stream := json.NewDecoder(strings.NewReader(string(body))) for stream.More() { var out contribPkg err := stream.Decode(&out) - if err != nil { - t.Fatalf(err.Error()) - } + require.NoError(t, err) packages = append(packages, out) } for _, pkg := range packages { if strings.Contains(pkg.ImportPath, "/test") || strings.Contains(pkg.ImportPath, "/internal") { continue } - if !pkg.hasTelemetryImport() { + if !pkg.hasTelemetryImport(t) { t.Fatalf(`package %q is expected use instrumentation telemetry. For more info see https://github.com/DataDog/dd-trace-go/blob/main/contrib/README.md#instrumentation-telemetry`, pkg.ImportPath) } }