From d5b03be25089b9689f97585ee1c1f86086b2f3d0 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Wed, 14 Apr 2021 11:04:32 +0200 Subject: [PATCH 1/7] Add instrumentation for amazon SQS --- module/apmawssdkgo/session.go | 12 ++- module/apmawssdkgo/sqs.go | 87 ++++++++++++++++ module/apmawssdkgo/sqs_test.go | 176 +++++++++++++++++++++++++++++++++ 3 files changed, 274 insertions(+), 1 deletion(-) create mode 100644 module/apmawssdkgo/sqs.go create mode 100644 module/apmawssdkgo/sqs_test.go diff --git a/module/apmawssdkgo/session.go b/module/apmawssdkgo/session.go index ee9f943a9..504ae9f9a 100644 --- a/module/apmawssdkgo/session.go +++ b/module/apmawssdkgo/session.go @@ -51,12 +51,14 @@ func WrapSession(s *session.Session) *session.Session { const ( serviceS3 = "s3" serviceDynamoDB = "dynamodb" + serviceSQS = "sqs" ) var ( serviceTypeMap = map[string]string{ serviceS3: "storage", serviceDynamoDB: "db", + serviceSQS: "messaging", } ) @@ -83,12 +85,20 @@ func send(req *request.Request) { return } - var svc service + var ( + svc service + err error + ) switch spanSubtype { case serviceS3: svc = newS3(req) case serviceDynamoDB: svc = newDynamoDB(req) + case serviceSQS: + if svc, err = newSQS(req); err != nil { + // Unsupported method type or queue name. + return + } default: // Unsupported type return diff --git a/module/apmawssdkgo/sqs.go b/module/apmawssdkgo/sqs.go new file mode 100644 index 000000000..009ed186e --- /dev/null +++ b/module/apmawssdkgo/sqs.go @@ -0,0 +1,87 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package apmawssdkgo // import "go.elastic.co/apm/module/apmawssdkgo" + +import ( + "errors" + "strings" + + "go.elastic.co/apm" + + "github.com/aws/aws-sdk-go/aws/request" +) + +var ( + sqsErrMethodNotSupported = errors.New("method not supported") + operationName = map[string]string{ + "SendMessage": "send", + "SendMessageBatch": "send_batch", + "DeleteMessage": "delete", + "DeleteMessageBatch": "delete_batch", + "ReceiveMessage": "poll", + } +) + +type apmSQS struct { + name, opName, resourceName string +} + +func newSQS(req *request.Request) (*apmSQS, error) { + opName, ok := operationName[req.Operation.Name] + if !ok { + return nil, sqsErrMethodNotSupported + } + name := req.ClientInfo.ServiceID + " " + strings.ToUpper(opName) + resourceName := serviceSQS + + queueName := getQueueName(req) + if queueName != "" { + name += " " + operationDirection(req.Operation.Name) + " " + queueName + resourceName += "/" + queueName + } + + s := &apmSQS{ + name: name, + opName: opName, + resourceName: resourceName, + } + + return s, nil +} + +func (s *apmSQS) spanName() string { return s.name } + +func (s *apmSQS) resource() string { return s.resourceName } + +func (s *apmSQS) setAdditional(span *apm.Span) { + span.Action = s.opName +} + +func operationDirection(operationName string) string { + switch operationName { + case "SendMessage", "SendMessageBatch": + return "to" + default: + return "from" + } +} + +func getQueueName(req *request.Request) string { + parts := strings.Split(req.HTTPRequest.FormValue("QueueUrl"), "/") + return parts[len(parts)-1] +} diff --git a/module/apmawssdkgo/sqs_test.go b/module/apmawssdkgo/sqs_test.go new file mode 100644 index 000000000..9a752a2a4 --- /dev/null +++ b/module/apmawssdkgo/sqs_test.go @@ -0,0 +1,176 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// +build go1.13 + +package apmawssdkgo // import "go.elastic.co/apm/module/apmawssdkgo" + +import ( + "context" + "testing" + + "go.elastic.co/apm/apmtest" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/sqs" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestSQS(t *testing.T) { + region := "us-west-2" + addr := "sqs.testing.invalid" + cfg := aws.NewConfig(). + WithEndpoint(addr). + WithRegion(region). + WithDisableSSL(true). + WithCredentials(credentials.AnonymousCredentials) + + session := session.Must(session.NewSession(cfg)) + wrapped := WrapSession(session) + svc := sqs.New(wrapped) + for _, tc := range []struct { + fn func(context.Context, string) + name, action, resource string + queueURL string + ignored bool + }{ + { + name: "SQS POLL from MyQueue", + action: "poll", + resource: "sqs/MyQueue", + queueURL: "https://sqs.testing.invalid/123456789012/MyQueue", + fn: func(ctx context.Context, queueURL string) { + svc.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{ + QueueUrl: &queueURL, + AttributeNames: aws.StringSlice([]string{ + "SentTimestamp", + }), + MaxNumberOfMessages: aws.Int64(1), + MessageAttributeNames: aws.StringSlice([]string{ + "All", + }), + WaitTimeSeconds: aws.Int64(1), + }) + }, + }, + { + name: "SQS SEND to OtherQueue", + action: "send", + resource: "sqs/OtherQueue", + queueURL: "https://sqs.testing.invalid/123456789012/OtherQueue", + fn: func(ctx context.Context, queueURL string) { + svc.SendMessageWithContext(ctx, &sqs.SendMessageInput{ + QueueUrl: &queueURL, + MessageAttributes: map[string]*sqs.MessageAttributeValue{ + "attr": &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String("string attr"), + }, + }, + MessageBody: aws.String("msg body"), + }) + }, + }, + { + name: "SQS SEND_BATCH to OtherQueue", + action: "send_batch", + resource: "sqs/OtherQueue", + queueURL: "https://sqs.testing.invalid/123456789012/OtherQueue", + fn: func(ctx context.Context, queueURL string) { + svc.SendMessageBatchWithContext(ctx, &sqs.SendMessageBatchInput{ + QueueUrl: &queueURL, + Entries: []*sqs.SendMessageBatchRequestEntry{ + { + Id: aws.String("1"), + MessageAttributes: map[string]*sqs.MessageAttributeValue{ + "attr": &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String("string attr"), + }, + }, + MessageBody: aws.String("msg body"), + }, + }, + }) + }, + }, + { + name: "SQS DELETE from ThatQueue", + action: "delete", + resource: "sqs/ThatQueue", + queueURL: "https://sqs.testing.invalid/123456789012/ThatQueue", + fn: func(ctx context.Context, queueURL string) { + svc.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{ + QueueUrl: &queueURL, + ReceiptHandle: aws.String("receiptHandle"), + }) + }, + }, + { + name: "SQS DELETE from ThatQueue", + action: "delete", + resource: "sqs/ThatQueue", + ignored: true, + fn: func(ctx context.Context, _ string) { + svc.CreateQueueWithContext(ctx, &sqs.CreateQueueInput{ + QueueName: aws.String("SQS_QUEUE_NAME"), + Attributes: map[string]*string{ + "DelaySeconds": aws.String("60"), + "MessageRetentionPeriod": aws.String("86400"), + }, + }) + }, + }, + } { + tx, spans, errors := apmtest.WithTransaction(func(ctx context.Context) { + tc.fn(ctx, tc.queueURL) + }) + + if tc.ignored { + require.Len(t, spans, 0) + require.Len(t, errors, 0) + return + } + + require.Len(t, spans, 1) + require.Len(t, errors, 1) + + span := spans[0] + err := errors[0] + + assert.Equal(t, tx.ID, err.TransactionID) + assert.Equal(t, span.ID, err.ParentID) + + assert.Equal(t, tc.name, span.Name) + assert.Equal(t, "messaging", span.Type) + assert.Equal(t, "sqs", span.Subtype) + assert.Equal(t, tc.action, span.Action) + + service := span.Context.Destination.Service + assert.Equal(t, "sqs", service.Name) + assert.Equal(t, "messaging", service.Type) + assert.Equal(t, tc.resource, service.Resource) + assert.Equal(t, "sqs.testing.invalid", span.Context.Destination.Address) + + assert.Equal(t, region, span.Context.Destination.Cloud.Region) + + assert.Equal(t, tx.ID, span.ParentID) + } +} From e13ea249adf303efa8d99b46c2d6f6ec686a8dc0 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Wed, 14 Apr 2021 11:13:07 +0200 Subject: [PATCH 2/7] update documentation --- CHANGELOG.asciidoc | 1 + docs/instrumenting.asciidoc | 1 + docs/supported-tech.asciidoc | 9 +++++++++ 3 files changed, 11 insertions(+) diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 41317c570..76720c543 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -27,6 +27,7 @@ https://github.com/elastic/apm-agent-go/compare/v1.11.0...master[View commits] - module/apmawssdkgo: add support for instrumenting s3 RPC calls {pull}927[#(927)] - module/apmawssdkgo: add support for instrumenting dynamodb RPC calls {pull}928[#(928)] - SpanContext.SetDestinationService is a no-op if either Name or Resource is empty {pull}931[#(931)] +- module/apmawssdkgo: add support for instrumenting sqs RPC calls {pull}933[#(933)] [[release-notes-1.x]] === Go Agent version 1.x diff --git a/docs/instrumenting.asciidoc b/docs/instrumenting.asciidoc index e1a8fbbef..762731366 100644 --- a/docs/instrumenting.asciidoc +++ b/docs/instrumenting.asciidoc @@ -841,6 +841,7 @@ The following services are supported: - S3 - DynamoDB +- SQS Passing a `session.Session` wrapped with `apmawssdkgo.WrapSession` to these services from the AWS SDK will report spans within the current transaction. diff --git a/docs/supported-tech.asciidoc b/docs/supported-tech.asciidoc index a1ef40a07..e1fc54467 100644 --- a/docs/supported-tech.asciidoc +++ b/docs/supported-tech.asciidoc @@ -295,3 +295,12 @@ https://github.com/aws/aws-sdk-go[AWS SDK Go]. See <> for more information about AWS SDK Go instrumentation. + +[float] +[[supported-tech-messaging-systems]] +=== Object Storage +We provide instrumentation for AWS SQS. This is usable with +https://github.com/aws/aws-sdk-go[AWS SDK Go]. + +See <> for more information +about AWS SDK Go instrumentation. From 5b1003e49bc3d39123409072b72832045ffbf042 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Wed, 14 Apr 2021 11:18:51 +0200 Subject: [PATCH 3/7] go-lint --- module/apmawssdkgo/sqs.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/module/apmawssdkgo/sqs.go b/module/apmawssdkgo/sqs.go index 009ed186e..44c369cf6 100644 --- a/module/apmawssdkgo/sqs.go +++ b/module/apmawssdkgo/sqs.go @@ -27,7 +27,7 @@ import ( ) var ( - sqsErrMethodNotSupported = errors.New("method not supported") + errSQSMethodNotSupported = errors.New("method not supported") operationName = map[string]string{ "SendMessage": "send", "SendMessageBatch": "send_batch", @@ -44,7 +44,7 @@ type apmSQS struct { func newSQS(req *request.Request) (*apmSQS, error) { opName, ok := operationName[req.Operation.Name] if !ok { - return nil, sqsErrMethodNotSupported + return nil, errSQSMethodNotSupported } name := req.ClientInfo.ServiceID + " " + strings.ToUpper(opName) resourceName := serviceSQS From 2e82f930af1be19ce1185bd5b9955606695ad794 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Wed, 14 Apr 2021 11:25:46 +0200 Subject: [PATCH 4/7] simplify test case for ignored method --- module/apmawssdkgo/sqs_test.go | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/module/apmawssdkgo/sqs_test.go b/module/apmawssdkgo/sqs_test.go index 9a752a2a4..56da72dcd 100644 --- a/module/apmawssdkgo/sqs_test.go +++ b/module/apmawssdkgo/sqs_test.go @@ -124,17 +124,10 @@ func TestSQS(t *testing.T) { }, }, { - name: "SQS DELETE from ThatQueue", - action: "delete", - resource: "sqs/ThatQueue", - ignored: true, + ignored: true, fn: func(ctx context.Context, _ string) { svc.CreateQueueWithContext(ctx, &sqs.CreateQueueInput{ QueueName: aws.String("SQS_QUEUE_NAME"), - Attributes: map[string]*string{ - "DelaySeconds": aws.String("60"), - "MessageRetentionPeriod": aws.String("86400"), - }, }) }, }, From c59aa3cd446c1d288db4b20a398675239d7a9c04 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Wed, 14 Apr 2021 11:51:01 +0200 Subject: [PATCH 5/7] add TODO --- module/apmawssdkgo/sqs.go | 1 + 1 file changed, 1 insertion(+) diff --git a/module/apmawssdkgo/sqs.go b/module/apmawssdkgo/sqs.go index 44c369cf6..40d8feda0 100644 --- a/module/apmawssdkgo/sqs.go +++ b/module/apmawssdkgo/sqs.go @@ -70,6 +70,7 @@ func (s *apmSQS) resource() string { return s.resourceName } func (s *apmSQS) setAdditional(span *apm.Span) { span.Action = s.opName + // TODO(stn): record `context.message.queue.name` } func operationDirection(operationName string) string { From 24984a7d909ad9a924da5704fa4311610b7b6960 Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Wed, 14 Apr 2021 18:54:16 +0200 Subject: [PATCH 6/7] add trace context to message attributes Add the trace context header to the message attribute metadata when creating messages. --- module/apmawssdkgo/session.go | 45 ++++++++++++- module/apmawssdkgo/sqs.go | 42 ++++++++++++ module/apmawssdkgo/sqs_test.go | 116 +++++++++++++++++++++++---------- 3 files changed, 166 insertions(+), 37 deletions(-) diff --git a/module/apmawssdkgo/session.go b/module/apmawssdkgo/session.go index 504ae9f9a..c5785dfd8 100644 --- a/module/apmawssdkgo/session.go +++ b/module/apmawssdkgo/session.go @@ -36,6 +36,10 @@ func init() { // AWS SDK's request lifecycle. Supported services are listed in serviceTypeMap // variable below. func WrapSession(s *session.Session) *session.Session { + s.Handlers.Build.PushFrontNamed(request.NamedHandler{ + Name: "go.elastic.co/apm/module/apmawssdkgo/build", + Fn: build, + }) s.Handlers.Send.PushFrontNamed(request.NamedHandler{ Name: "go.elastic.co/apm/module/apmawssdkgo/send", Fn: send, @@ -68,13 +72,49 @@ type service interface { setAdditional(*apm.Span) } +func build(req *request.Request) { + spanSubtype := req.ClientInfo.ServiceName + spanType, ok := serviceTypeMap[spanSubtype] + if !ok { + return + } + + if spanSubtype == serviceSQS && !supportedSQSMethod(req) { + // Unsupported SQS method type. + return + } + + ctx := req.Context() + tx := apm.TransactionFromContext(ctx) + if tx == nil { + return + } + + // The span name is added in the `send()` function, after other + // handlers have generated the necessary information on the request. + span := tx.StartSpan("", spanType, apm.SpanFromContext(ctx)) + if !span.Dropped() { + ctx = apm.ContextWithSpan(ctx, span) + defer req.SetContext(ctx) + } else { + span.End() + span = nil + return + } + + if req.ClientInfo.ServiceName != serviceSQS { + return + } + addMessageAttributes(req, span) +} + func send(req *request.Request) { if req.RetryCount > 0 { return } spanSubtype := req.ClientInfo.ServiceName - spanType, ok := serviceTypeMap[spanSubtype] + _, ok := serviceTypeMap[spanSubtype] if !ok { return } @@ -104,7 +144,7 @@ func send(req *request.Request) { return } - span := tx.StartSpan(svc.spanName(), spanType, apm.SpanFromContext(ctx)) + span := apm.SpanFromContext(ctx) if !span.Dropped() { ctx = apm.ContextWithSpan(ctx, span) req.HTTPRequest = apmhttp.RequestWithContext(ctx, req.HTTPRequest) @@ -115,6 +155,7 @@ func send(req *request.Request) { return } + span.Name = svc.spanName() span.Subtype = spanSubtype span.Action = req.Operation.Name diff --git a/module/apmawssdkgo/sqs.go b/module/apmawssdkgo/sqs.go index 40d8feda0..8d0a0c207 100644 --- a/module/apmawssdkgo/sqs.go +++ b/module/apmawssdkgo/sqs.go @@ -22,8 +22,11 @@ import ( "strings" "go.elastic.co/apm" + "go.elastic.co/apm/module/apmhttp" + "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/request" + "github.com/aws/aws-sdk-go/service/sqs" ) var ( @@ -73,6 +76,45 @@ func (s *apmSQS) setAdditional(span *apm.Span) { // TODO(stn): record `context.message.queue.name` } +// addMessageAttributes adds message attributes to `SendMessage` and +// `SendMessageBatch` RPC calls. Other SQS RPC calls are ignored. +func addMessageAttributes(req *request.Request, span *apm.Span) { + switch req.Operation.Name { + case "SendMessage", "SendMessageBatch": + break + default: + return + + } + + traceContext := span.TraceContext() + msgAttr := &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(apmhttp.FormatTraceparentHeader(traceContext)), + } + + if req.Operation.Name == "SendMessage" { + input, ok := req.Params.(*sqs.SendMessageInput) + if !ok { + return + } + input.MessageAttributes["traceContext"] = msgAttr + } else if req.Operation.Name == "SendMessageBatch" { + input, ok := req.Params.(*sqs.SendMessageBatchInput) + if !ok { + return + } + for _, entry := range input.Entries { + entry.MessageAttributes["traceContext"] = msgAttr + } + } +} + +func supportedSQSMethod(req *request.Request) bool { + _, ok := operationName[req.Operation.Name] + return ok +} + func operationDirection(operationName string) string { switch operationName { case "SendMessage", "SendMessageBatch": diff --git a/module/apmawssdkgo/sqs_test.go b/module/apmawssdkgo/sqs_test.go index 56da72dcd..5575abedd 100644 --- a/module/apmawssdkgo/sqs_test.go +++ b/module/apmawssdkgo/sqs_test.go @@ -20,7 +20,14 @@ package apmawssdkgo // import "go.elastic.co/apm/module/apmawssdkgo" import ( + "bytes" "context" + "io" + "net" + "net/http" + "net/http/httptest" + "strconv" + "strings" "testing" "go.elastic.co/apm/apmtest" @@ -34,29 +41,19 @@ import ( ) func TestSQS(t *testing.T) { - region := "us-west-2" - addr := "sqs.testing.invalid" - cfg := aws.NewConfig(). - WithEndpoint(addr). - WithRegion(region). - WithDisableSSL(true). - WithCredentials(credentials.AnonymousCredentials) - - session := session.Must(session.NewSession(cfg)) - wrapped := WrapSession(session) - svc := sqs.New(wrapped) for _, tc := range []struct { - fn func(context.Context, string) - name, action, resource string - queueURL string - ignored bool + fn func(context.Context, *sqs.SQS, string) + name, action, resource string + queueURL string + ignored, hasTraceContext, hasError bool }{ { name: "SQS POLL from MyQueue", action: "poll", resource: "sqs/MyQueue", queueURL: "https://sqs.testing.invalid/123456789012/MyQueue", - fn: func(ctx context.Context, queueURL string) { + hasError: true, + fn: func(ctx context.Context, svc *sqs.SQS, queueURL string) { svc.ReceiveMessageWithContext(ctx, &sqs.ReceiveMessageInput{ QueueUrl: &queueURL, AttributeNames: aws.StringSlice([]string{ @@ -71,11 +68,12 @@ func TestSQS(t *testing.T) { }, }, { - name: "SQS SEND to OtherQueue", - action: "send", - resource: "sqs/OtherQueue", - queueURL: "https://sqs.testing.invalid/123456789012/OtherQueue", - fn: func(ctx context.Context, queueURL string) { + name: "SQS SEND to OtherQueue", + action: "send", + resource: "sqs/OtherQueue", + queueURL: "https://sqs.testing.invalid/123456789012/OtherQueue", + hasTraceContext: true, + fn: func(ctx context.Context, svc *sqs.SQS, queueURL string) { svc.SendMessageWithContext(ctx, &sqs.SendMessageInput{ QueueUrl: &queueURL, MessageAttributes: map[string]*sqs.MessageAttributeValue{ @@ -89,11 +87,12 @@ func TestSQS(t *testing.T) { }, }, { - name: "SQS SEND_BATCH to OtherQueue", - action: "send_batch", - resource: "sqs/OtherQueue", - queueURL: "https://sqs.testing.invalid/123456789012/OtherQueue", - fn: func(ctx context.Context, queueURL string) { + name: "SQS SEND_BATCH to OtherQueue", + action: "send_batch", + resource: "sqs/OtherQueue", + queueURL: "https://sqs.testing.invalid/123456789012/OtherQueue", + hasTraceContext: true, + fn: func(ctx context.Context, svc *sqs.SQS, queueURL string) { svc.SendMessageBatchWithContext(ctx, &sqs.SendMessageBatchInput{ QueueUrl: &queueURL, Entries: []*sqs.SendMessageBatchRequestEntry{ @@ -116,7 +115,7 @@ func TestSQS(t *testing.T) { action: "delete", resource: "sqs/ThatQueue", queueURL: "https://sqs.testing.invalid/123456789012/ThatQueue", - fn: func(ctx context.Context, queueURL string) { + fn: func(ctx context.Context, svc *sqs.SQS, queueURL string) { svc.DeleteMessageWithContext(ctx, &sqs.DeleteMessageInput{ QueueUrl: &queueURL, ReceiptHandle: aws.String("receiptHandle"), @@ -125,15 +124,36 @@ func TestSQS(t *testing.T) { }, { ignored: true, - fn: func(ctx context.Context, _ string) { + fn: func(ctx context.Context, svc *sqs.SQS, _ string) { svc.CreateQueueWithContext(ctx, &sqs.CreateQueueInput{ QueueName: aws.String("SQS_QUEUE_NAME"), }) }, }, } { + buf := new(bytes.Buffer) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if tc.hasError { + w.WriteHeader(http.StatusInternalServerError) + return + } + io.Copy(buf, r.Body) + })) + defer ts.Close() + + region := "us-west-2" + cfg := aws.NewConfig(). + WithEndpoint(ts.URL). + WithRegion(region). + WithDisableSSL(true). + WithCredentials(credentials.AnonymousCredentials) + + session := session.Must(session.NewSession(cfg)) + wrapped := WrapSession(session) + svc := sqs.New(wrapped) + tx, spans, errors := apmtest.WithTransaction(func(ctx context.Context) { - tc.fn(ctx, tc.queueURL) + tc.fn(ctx, svc, tc.queueURL) }) if tc.ignored { @@ -142,14 +162,36 @@ func TestSQS(t *testing.T) { return } - require.Len(t, spans, 1) - require.Len(t, errors, 1) + if tc.hasTraceContext { + kvs := make(map[string]string) + var traceContextPresent bool + for _, kvPair := range strings.Split(buf.String(), "&") { + kv := strings.Split(kvPair, "=") + kvs[kv[0]] = kv[1] + } + if v, ok := kvs["MessageAttribute.2.Name"]; ok { + traceContextPresent = true + assert.Equal(t, "traceContext", v) + assert.NotEmpty(t, kvs["MessageAttribute.2.Value.StringValue"]) + } + if v, ok := kvs["SendMessageBatchRequestEntry.1.MessageAttribute.2.Name"]; ok { + traceContextPresent = true + assert.Equal(t, "traceContext", v) + assert.NotEmpty(t, kvs["SendMessageBatchRequestEntry.1.MessageAttribute.2.Value.StringValue"]) + } + require.True(t, traceContextPresent) + } + buf.Reset() + require.Len(t, spans, 1) span := spans[0] - err := errors[0] - assert.Equal(t, tx.ID, err.TransactionID) - assert.Equal(t, span.ID, err.ParentID) + if tc.hasError { + require.Len(t, errors, 1) + err := errors[0] + assert.Equal(t, tx.ID, err.TransactionID) + assert.Equal(t, span.ID, err.ParentID) + } assert.Equal(t, tc.name, span.Name) assert.Equal(t, "messaging", span.Type) @@ -160,7 +202,11 @@ func TestSQS(t *testing.T) { assert.Equal(t, "sqs", service.Name) assert.Equal(t, "messaging", service.Type) assert.Equal(t, tc.resource, service.Resource) - assert.Equal(t, "sqs.testing.invalid", span.Context.Destination.Address) + + host, port, err := net.SplitHostPort(ts.URL[7:]) + require.NoError(t, err) + assert.Equal(t, host, span.Context.Destination.Address) + assert.Equal(t, port, strconv.Itoa(span.Context.Destination.Port)) assert.Equal(t, region, span.Context.Destination.Cloud.Region) From 0de1b6413460de44e7b9674c1631d7f987c04d9c Mon Sep 17 00:00:00 2001 From: stuart nelson Date: Thu, 15 Apr 2021 11:35:06 +0200 Subject: [PATCH 7/7] add additional tracing info to message attributes --- module/apmawssdkgo/session.go | 2 +- module/apmawssdkgo/sqs.go | 26 ++++++++++++--- module/apmawssdkgo/sqs_test.go | 58 +++++++++++++++++++--------------- 3 files changed, 55 insertions(+), 31 deletions(-) diff --git a/module/apmawssdkgo/session.go b/module/apmawssdkgo/session.go index c5785dfd8..60abff3fb 100644 --- a/module/apmawssdkgo/session.go +++ b/module/apmawssdkgo/session.go @@ -105,7 +105,7 @@ func build(req *request.Request) { if req.ClientInfo.ServiceName != serviceSQS { return } - addMessageAttributes(req, span) + addMessageAttributes(req, span, tx.ShouldPropagateLegacyHeader()) } func send(req *request.Request) { diff --git a/module/apmawssdkgo/sqs.go b/module/apmawssdkgo/sqs.go index 8d0a0c207..78646f2e0 100644 --- a/module/apmawssdkgo/sqs.go +++ b/module/apmawssdkgo/sqs.go @@ -78,7 +78,7 @@ func (s *apmSQS) setAdditional(span *apm.Span) { // addMessageAttributes adds message attributes to `SendMessage` and // `SendMessageBatch` RPC calls. Other SQS RPC calls are ignored. -func addMessageAttributes(req *request.Request, span *apm.Span) { +func addMessageAttributes(req *request.Request, span *apm.Span, propagateLegacyHeader bool) { switch req.Operation.Name { case "SendMessage", "SendMessageBatch": break @@ -92,20 +92,38 @@ func addMessageAttributes(req *request.Request, span *apm.Span) { DataType: aws.String("String"), StringValue: aws.String(apmhttp.FormatTraceparentHeader(traceContext)), } - + tracestate := traceContext.State.String() if req.Operation.Name == "SendMessage" { input, ok := req.Params.(*sqs.SendMessageInput) if !ok { return } - input.MessageAttributes["traceContext"] = msgAttr + setTracingAttributes(input.MessageAttributes, msgAttr, tracestate, propagateLegacyHeader) } else if req.Operation.Name == "SendMessageBatch" { input, ok := req.Params.(*sqs.SendMessageBatchInput) if !ok { return } for _, entry := range input.Entries { - entry.MessageAttributes["traceContext"] = msgAttr + setTracingAttributes(entry.MessageAttributes, msgAttr, tracestate, propagateLegacyHeader) + } + } +} + +func setTracingAttributes( + attrs map[string]*sqs.MessageAttributeValue, + value *sqs.MessageAttributeValue, + tracestate string, + propagateLegacyHeader bool, +) { + attrs[apmhttp.W3CTraceparentHeader] = value + if propagateLegacyHeader { + attrs[apmhttp.ElasticTraceparentHeader] = value + } + if tracestate != "" { + attrs[apmhttp.TracestateHeader] = &sqs.MessageAttributeValue{ + DataType: aws.String("String"), + StringValue: aws.String(tracestate), } } } diff --git a/module/apmawssdkgo/sqs_test.go b/module/apmawssdkgo/sqs_test.go index 5575abedd..a8a312b00 100644 --- a/module/apmawssdkgo/sqs_test.go +++ b/module/apmawssdkgo/sqs_test.go @@ -20,20 +20,19 @@ package apmawssdkgo // import "go.elastic.co/apm/module/apmawssdkgo" import ( - "bytes" "context" - "io" "net" "net/http" "net/http/httptest" "strconv" - "strings" "testing" "go.elastic.co/apm/apmtest" + "go.elastic.co/apm/module/apmhttp" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/request" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/sqs" "github.com/stretchr/testify/assert" @@ -131,13 +130,11 @@ func TestSQS(t *testing.T) { }, }, } { - buf := new(bytes.Buffer) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if tc.hasError { w.WriteHeader(http.StatusInternalServerError) return } - io.Copy(buf, r.Body) })) defer ts.Close() @@ -150,6 +147,13 @@ func TestSQS(t *testing.T) { session := session.Must(session.NewSession(cfg)) wrapped := WrapSession(session) + if tc.hasTraceContext { + wrapped.Handlers.Build.PushBackNamed(request.NamedHandler{ + Name: "spy_message_attrs_added", + Fn: testTracingAttributes(t), + }) + } + svc := sqs.New(wrapped) tx, spans, errors := apmtest.WithTransaction(func(ctx context.Context) { @@ -162,27 +166,6 @@ func TestSQS(t *testing.T) { return } - if tc.hasTraceContext { - kvs := make(map[string]string) - var traceContextPresent bool - for _, kvPair := range strings.Split(buf.String(), "&") { - kv := strings.Split(kvPair, "=") - kvs[kv[0]] = kv[1] - } - if v, ok := kvs["MessageAttribute.2.Name"]; ok { - traceContextPresent = true - assert.Equal(t, "traceContext", v) - assert.NotEmpty(t, kvs["MessageAttribute.2.Value.StringValue"]) - } - if v, ok := kvs["SendMessageBatchRequestEntry.1.MessageAttribute.2.Name"]; ok { - traceContextPresent = true - assert.Equal(t, "traceContext", v) - assert.NotEmpty(t, kvs["SendMessageBatchRequestEntry.1.MessageAttribute.2.Value.StringValue"]) - } - require.True(t, traceContextPresent) - } - buf.Reset() - require.Len(t, spans, 1) span := spans[0] @@ -213,3 +196,26 @@ func TestSQS(t *testing.T) { assert.Equal(t, tx.ID, span.ParentID) } } + +func testTracingAttributes(t *testing.T) func(*request.Request) { + return func(req *request.Request) { + testAttrs := func(t *testing.T, attrs map[string]*sqs.MessageAttributeValue) { + assert.Contains(t, attrs, apmhttp.W3CTraceparentHeader) + assert.Contains(t, attrs, apmhttp.ElasticTraceparentHeader) + assert.Contains(t, attrs, apmhttp.TracestateHeader) + } + if req.Operation.Name == "SendMessage" { + input, ok := req.Params.(*sqs.SendMessageInput) + require.True(t, ok) + testAttrs(t, input.MessageAttributes) + } else if req.Operation.Name == "SendMessageBatch" { + input, ok := req.Params.(*sqs.SendMessageBatchInput) + require.True(t, ok) + for _, entry := range input.Entries { + testAttrs(t, entry.MessageAttributes) + } + } else { + t.Fail() + } + } +}