Skip to content

Commit

Permalink
Add support for instrumenting AWS SNS (#938)
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartnelson3 authored Apr 19, 2021
1 parent 3c5fa4e commit 36ee3f6
Show file tree
Hide file tree
Showing 8 changed files with 325 additions and 12 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ https://github.com/elastic/apm-agent-go/compare/v1.11.0...master[View commits]
- module/apmawssdkgo: add support for instrumenting dynamodb RPC calls {pull}928[#(928)]
- SpanContext.SetDestinationService is a no-op if either Name or Resource is empty {pull}931[#(931)]
- module/apmawssdkgo: add support for instrumenting sqs RPC calls {pull}933[#(933)]
- module/apmgrpc: record underlying HTTP/2 context {pull}904[#(904)]
- module/apmawssdkgo: add support for instrumenting sns RPC calls {pull}938[#(938)]
- Parse "//" comments in SQL/CQL {pull}937[#(937)]
- module/apmgrpc: record underlying HTTP/2 context {pull}904[#(904)]
[[release-notes-1.x]]
=== Go Agent version 1.x
Expand Down
1 change: 1 addition & 0 deletions docs/instrumenting.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -842,6 +842,7 @@ The following services are supported:
- S3
- DynamoDB
- SQS
- SNS

Passing a `session.Session` wrapped with `apmawssdkgo.WrapSession` to these
services from the AWS SDK will report spans within the current transaction.
Expand Down
16 changes: 15 additions & 1 deletion docs/supported-tech.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ about Zerolog integration.
[float]
[[supported-tech-object-storage]]
=== Object Storage

[float]
==== Amazon S3
We provide instrumentation for AWS S3. This is usable with
https://github.com/aws/aws-sdk-go[AWS SDK Go].

Expand All @@ -298,9 +301,20 @@ about AWS SDK Go instrumentation.

[float]
[[supported-tech-messaging-systems]]
=== Object Storage
=== Messaging Systems

[float]
==== Amazon SQS
We provide instrumentation for AWS SQS. This is usable with
https://github.com/aws/aws-sdk-go[AWS SDK Go].

See <<builtin-modules-apmawssdkgo, module/apmawssdkgo>> for more information
about AWS SDK Go instrumentation.

[float]
==== Amazon SNS
We provide instrumentation for AWS SNS. This is usable with
https://github.com/aws/aws-sdk-go[AWS SDK Go].

See <<builtin-modules-apmawssdkgo, module/apmawssdkgo>> for more information
about AWS SDK Go instrumentation.
19 changes: 16 additions & 3 deletions module/apmawssdkgo/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,15 @@ const (
serviceS3 = "s3"
serviceDynamoDB = "dynamodb"
serviceSQS = "sqs"
serviceSNS = "sns"
)

var (
serviceTypeMap = map[string]string{
serviceS3: "storage",
serviceDynamoDB: "db",
serviceSQS: "messaging",
serviceSNS: "messaging",
}
)

Expand All @@ -79,8 +81,10 @@ func build(req *request.Request) {
return
}

if spanSubtype == serviceSNS && !supportedSNSMethod(req) {
return
}
if spanSubtype == serviceSQS && !supportedSQSMethod(req) {
// Unsupported SQS method type.
return
}

Expand All @@ -102,10 +106,14 @@ func build(req *request.Request) {
return
}

if req.ClientInfo.ServiceName != serviceSQS {
switch spanSubtype {
case serviceSQS:
addMessageAttributesSQS(req, span, tx.ShouldPropagateLegacyHeader())
case serviceSNS:
addMessageAttributesSNS(req, span, tx.ShouldPropagateLegacyHeader())
default:
return
}
addMessageAttributes(req, span, tx.ShouldPropagateLegacyHeader())
}

func send(req *request.Request) {
Expand Down Expand Up @@ -139,6 +147,11 @@ func send(req *request.Request) {
// Unsupported method type or queue name.
return
}
case serviceSNS:
if svc, err = newSNS(req); err != nil {
// Unsupported method type or queue name.
return
}
default:
// Unsupported type
return
Expand Down
119 changes: 119 additions & 0 deletions module/apmawssdkgo/sns.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package apmawssdkgo // import "go.elastic.co/apm/module/apmawssdkgo"

import (
"strings"

"go.elastic.co/apm"
"go.elastic.co/apm/module/apmhttp"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/sns"
)

type apmSNS struct {
name, opName, resourceName string
}

func newSNS(req *request.Request) (*apmSNS, error) {
if req.Operation.Name != "Publish" {
return nil, errMethodNotSupported
}
name := req.ClientInfo.ServiceID + " PUBLISH"
resourceName := serviceSNS

topicName := getTopicName(req)
if topicName != "" {
name += " " + topicName
resourceName += "/" + topicName
}

s := &apmSNS{
name: name,
opName: "publish",
resourceName: resourceName,
}

return s, nil
}

func (s *apmSNS) spanName() string { return s.name }

func (s *apmSNS) resource() string { return s.resourceName }

func (s *apmSNS) setAdditional(span *apm.Span) {
span.Action = s.opName
}

func getTopicName(req *request.Request) string {
// TODO: PhoneNumber is the third possibility, but I'm guessing we
// don't want to store that for customers?
arn := req.HTTPRequest.FormValue("TopicArn")
if arn == "" {
arn = req.HTTPRequest.FormValue("TargetArn")
}

// SNS ARN can be in the following formats:
// - arn:aws:sns:us-east-2:123456789012:MyTopic
// - arn:aws:sns:us-east-2:123456789012/MyTopic
parts := strings.Split(arn, "/")
if len(parts) == 1 {
parts = strings.Split(arn, ":")
}
return parts[len(parts)-1]
}

// addMessageAttributesSNS adds message attributes to `Publish` RPC calls.
// Other SNS RPC calls are ignored.
func addMessageAttributesSNS(req *request.Request, span *apm.Span, propagateLegacyHeader bool) {
if req.Operation.Name != "Publish" {
return
}

traceContext := span.TraceContext()
msgAttr := &sns.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(apmhttp.FormatTraceparentHeader(traceContext)),
}
tracestate := traceContext.State.String()

input, ok := req.Params.(*sns.PublishInput)
if !ok {
return
}

if input.MessageAttributes == nil {
input.MessageAttributes = make(map[string]*sns.MessageAttributeValue)
}
input.MessageAttributes[apmhttp.W3CTraceparentHeader] = msgAttr
if propagateLegacyHeader {
input.MessageAttributes[apmhttp.ElasticTraceparentHeader] = msgAttr
}
if tracestate != "" {
input.MessageAttributes[apmhttp.TracestateHeader] = &sns.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(tracestate),
}
}
}

func supportedSNSMethod(req *request.Request) bool {
return req.Operation.Name == "Publish"
}
165 changes: 165 additions & 0 deletions module/apmawssdkgo/sns_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// +build go1.13

package apmawssdkgo // import "go.elastic.co/apm/module/apmawssdkgo"

import (
"context"
"net"
"net/http"
"net/http/httptest"
"strconv"
"testing"

"go.elastic.co/apm/apmtest"
"go.elastic.co/apm/module/apmhttp"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestSNS(t *testing.T) {
for _, tc := range []struct {
fn func(context.Context, *sns.SNS)
name, action, resource string
ignored, hasTraceContext, hasError bool
}{
{
name: "SNS PUBLISH myTopic",
action: "publish",
resource: "sns/myTopic",
hasError: true,
hasTraceContext: true,
fn: func(ctx context.Context, svc *sns.SNS) {
svc.PublishWithContext(ctx, &sns.PublishInput{
Message: aws.String("my message"),
TopicArn: aws.String("arn:aws:sns:us-east-2:123456789012/myTopic"),
})
},
},
{
name: "SNS PUBLISH myTopic",
action: "publish",
resource: "sns/myTopic",
hasTraceContext: true,
fn: func(ctx context.Context, svc *sns.SNS) {
svc.PublishWithContext(ctx, &sns.PublishInput{
Message: aws.String("my message"),
TopicArn: aws.String("arn:aws:sns:us-east-2:123456789012:myTopic"),
})
},
},
{
ignored: true,
fn: func(ctx context.Context, svc *sns.SNS) {
svc.SubscribeWithContext(ctx, &sns.SubscribeInput{
Endpoint: aws.String("endpoint"),
Protocol: aws.String("email"),
ReturnSubscriptionArn: aws.Bool(true),
TopicArn: aws.String("arn:aws:sns:us-east-2:123456789012:myTopic"),
})
},
},
} {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if tc.hasError {
w.WriteHeader(http.StatusInternalServerError)
return
}
}))
defer ts.Close()

region := "us-west-2"
cfg := aws.NewConfig().
WithEndpoint(ts.URL).
WithRegion(region).
WithDisableSSL(true).
WithCredentials(credentials.AnonymousCredentials)

session := session.Must(session.NewSession(cfg))
wrapped := WrapSession(session)
if tc.hasTraceContext {
wrapped.Handlers.Build.PushBackNamed(request.NamedHandler{
Name: "spy_message_attrs_added",
Fn: testTracingAttributesSNS(t),
})
}

svc := sns.New(wrapped)

tx, spans, errors := apmtest.WithTransaction(func(ctx context.Context) {
tc.fn(ctx, svc)
})

if tc.ignored {
require.Len(t, spans, 0)
require.Len(t, errors, 0)
return
}

require.Len(t, spans, 1)
span := spans[0]

if tc.hasError {
require.Len(t, errors, 1)
err := errors[0]
assert.Equal(t, tx.ID, err.TransactionID)
assert.Equal(t, span.ID, err.ParentID)
}

assert.Equal(t, tc.name, span.Name)
assert.Equal(t, "messaging", span.Type)
assert.Equal(t, "sns", span.Subtype)
assert.Equal(t, tc.action, span.Action)

service := span.Context.Destination.Service
assert.Equal(t, "sns", service.Name)
assert.Equal(t, "messaging", service.Type)
assert.Equal(t, tc.resource, service.Resource)

host, port, err := net.SplitHostPort(ts.URL[7:])
require.NoError(t, err)
assert.Equal(t, host, span.Context.Destination.Address)
assert.Equal(t, port, strconv.Itoa(span.Context.Destination.Port))

assert.Equal(t, region, span.Context.Destination.Cloud.Region)

assert.Equal(t, tx.ID, span.ParentID)
}
}

func testTracingAttributesSNS(t *testing.T) func(*request.Request) {
return func(req *request.Request) {
if req.Operation.Name != "Publish" {
t.Fail()
}

input, ok := req.Params.(*sns.PublishInput)
require.True(t, ok)
attrs := input.MessageAttributes
assert.Contains(t, attrs, apmhttp.W3CTraceparentHeader)
assert.Contains(t, attrs, apmhttp.ElasticTraceparentHeader)
assert.Contains(t, attrs, apmhttp.TracestateHeader)
}
}
Loading

0 comments on commit 36ee3f6

Please sign in to comment.