Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instrument RPC calls to amazon SQS #933

Merged
merged 7 commits into from
Apr 15, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ https://github.com/elastic/apm-agent-go/compare/v1.11.0...master[View commits]
- module/apmawssdkgo: add support for instrumenting s3 RPC calls {pull}927[#(927)]
- module/apmawssdkgo: add support for instrumenting dynamodb RPC calls {pull}928[#(928)]
- SpanContext.SetDestinationService is a no-op if either Name or Resource is empty {pull}931[#(931)]
- module/apmawssdkgo: add support for instrumenting sqs RPC calls {pull}933[#(933)]

[[release-notes-1.x]]
=== Go Agent version 1.x
Expand Down
1 change: 1 addition & 0 deletions docs/instrumenting.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ The following services are supported:

- S3
- DynamoDB
- SQS

Passing a `session.Session` wrapped with `apmawssdkgo.WrapSession` to these
services from the AWS SDK will report spans within the current transaction.
Expand Down
9 changes: 9 additions & 0 deletions docs/supported-tech.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -295,3 +295,12 @@ https://github.com/aws/aws-sdk-go[AWS SDK Go].

See <<builtin-modules-apmawssdkgo, module/apmawssdkgo>> for more information
about AWS SDK Go instrumentation.

[float]
[[supported-tech-messaging-systems]]
=== Object Storage
We provide instrumentation for AWS SQS. This is usable with
https://github.com/aws/aws-sdk-go[AWS SDK Go].

See <<builtin-modules-apmawssdkgo, module/apmawssdkgo>> for more information
about AWS SDK Go instrumentation.
57 changes: 54 additions & 3 deletions module/apmawssdkgo/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ func init() {
// AWS SDK's request lifecycle. Supported services are listed in serviceTypeMap
// variable below.
func WrapSession(s *session.Session) *session.Session {
s.Handlers.Build.PushFrontNamed(request.NamedHandler{
Name: "go.elastic.co/apm/module/apmawssdkgo/build",
Fn: build,
})
s.Handlers.Send.PushFrontNamed(request.NamedHandler{
Name: "go.elastic.co/apm/module/apmawssdkgo/send",
Fn: send,
Expand All @@ -51,12 +55,14 @@ func WrapSession(s *session.Session) *session.Session {
const (
serviceS3 = "s3"
serviceDynamoDB = "dynamodb"
serviceSQS = "sqs"
)

var (
serviceTypeMap = map[string]string{
serviceS3: "storage",
serviceDynamoDB: "db",
serviceSQS: "messaging",
}
)

Expand All @@ -66,13 +72,49 @@ type service interface {
setAdditional(*apm.Span)
}

func build(req *request.Request) {
spanSubtype := req.ClientInfo.ServiceName
spanType, ok := serviceTypeMap[spanSubtype]
if !ok {
return
}

if spanSubtype == serviceSQS && !supportedSQSMethod(req) {
// Unsupported SQS method type.
return
}

ctx := req.Context()
tx := apm.TransactionFromContext(ctx)
if tx == nil {
return
}

// The span name is added in the `send()` function, after other
// handlers have generated the necessary information on the request.
span := tx.StartSpan("", spanType, apm.SpanFromContext(ctx))
if !span.Dropped() {
ctx = apm.ContextWithSpan(ctx, span)
defer req.SetContext(ctx)
} else {
span.End()
span = nil
return
}

if req.ClientInfo.ServiceName != serviceSQS {
return
}
addMessageAttributes(req, span)
}

func send(req *request.Request) {
axw marked this conversation as resolved.
Show resolved Hide resolved
if req.RetryCount > 0 {
return
}

spanSubtype := req.ClientInfo.ServiceName
spanType, ok := serviceTypeMap[spanSubtype]
_, ok := serviceTypeMap[spanSubtype]
if !ok {
return
}
Expand All @@ -83,18 +125,26 @@ func send(req *request.Request) {
return
}

var svc service
var (
svc service
err error
)
switch spanSubtype {
case serviceS3:
svc = newS3(req)
case serviceDynamoDB:
svc = newDynamoDB(req)
case serviceSQS:
if svc, err = newSQS(req); err != nil {
// Unsupported method type or queue name.
return
}
default:
// Unsupported type
return
}

span := tx.StartSpan(svc.spanName(), spanType, apm.SpanFromContext(ctx))
span := apm.SpanFromContext(ctx)
if !span.Dropped() {
ctx = apm.ContextWithSpan(ctx, span)
req.HTTPRequest = apmhttp.RequestWithContext(ctx, req.HTTPRequest)
Expand All @@ -105,6 +155,7 @@ func send(req *request.Request) {
return
}

span.Name = svc.spanName()
span.Subtype = spanSubtype
span.Action = req.Operation.Name

Expand Down
130 changes: 130 additions & 0 deletions module/apmawssdkgo/sqs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package apmawssdkgo // import "go.elastic.co/apm/module/apmawssdkgo"

import (
"errors"
"strings"

"go.elastic.co/apm"
"go.elastic.co/apm/module/apmhttp"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/request"
"github.com/aws/aws-sdk-go/service/sqs"
)

var (
errSQSMethodNotSupported = errors.New("method not supported")
operationName = map[string]string{
"SendMessage": "send",
"SendMessageBatch": "send_batch",
"DeleteMessage": "delete",
"DeleteMessageBatch": "delete_batch",
"ReceiveMessage": "poll",
}
)

type apmSQS struct {
name, opName, resourceName string
}

func newSQS(req *request.Request) (*apmSQS, error) {
opName, ok := operationName[req.Operation.Name]
if !ok {
return nil, errSQSMethodNotSupported
}
name := req.ClientInfo.ServiceID + " " + strings.ToUpper(opName)
resourceName := serviceSQS

queueName := getQueueName(req)
if queueName != "" {
name += " " + operationDirection(req.Operation.Name) + " " + queueName
resourceName += "/" + queueName
}

s := &apmSQS{
name: name,
opName: opName,
resourceName: resourceName,
}

return s, nil
}

func (s *apmSQS) spanName() string { return s.name }

func (s *apmSQS) resource() string { return s.resourceName }

func (s *apmSQS) setAdditional(span *apm.Span) {
span.Action = s.opName
axw marked this conversation as resolved.
Show resolved Hide resolved
// TODO(stn): record `context.message.queue.name`
}

// addMessageAttributes adds message attributes to `SendMessage` and
// `SendMessageBatch` RPC calls. Other SQS RPC calls are ignored.
func addMessageAttributes(req *request.Request, span *apm.Span) {
switch req.Operation.Name {
case "SendMessage", "SendMessageBatch":
break
default:
return

}

traceContext := span.TraceContext()
msgAttr := &sqs.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String(apmhttp.FormatTraceparentHeader(traceContext)),
}

if req.Operation.Name == "SendMessage" {
input, ok := req.Params.(*sqs.SendMessageInput)
if !ok {
return
}
input.MessageAttributes["traceContext"] = msgAttr
axw marked this conversation as resolved.
Show resolved Hide resolved
} else if req.Operation.Name == "SendMessageBatch" {
input, ok := req.Params.(*sqs.SendMessageBatchInput)
if !ok {
return
}
for _, entry := range input.Entries {
entry.MessageAttributes["traceContext"] = msgAttr
axw marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

func supportedSQSMethod(req *request.Request) bool {
_, ok := operationName[req.Operation.Name]
return ok
}

func operationDirection(operationName string) string {
switch operationName {
case "SendMessage", "SendMessageBatch":
return "to"
default:
return "from"
}
}

func getQueueName(req *request.Request) string {
parts := strings.Split(req.HTTPRequest.FormValue("QueueUrl"), "/")
return parts[len(parts)-1]
}
Loading