Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: sqs queue instrumentation #2013

Merged
merged 62 commits into from
Apr 7, 2021
Merged
Show file tree
Hide file tree
Changes from 55 commits
Commits
Show all changes
62 commits
Select commit Hold shift + click to select a range
25f0a73
feat: instrument amazon SQS
Mar 15, 2021
e2bfc9b
feat: received message span measures work done to process it, start o…
Mar 15, 2021
77c2e92
feat: configuration
Mar 17, 2021
869e326
feat: tests
Mar 18, 2021
3bf9daa
feat: integration tests
Mar 19, 2021
6852fdb
test: integration test for five main method calls
Mar 19, 2021
fb4dcd7
fix: lint
Mar 19, 2021
20c85b0
fix: sqs is always polling
Mar 19, 2021
e5fa3a4
docs: first draft of documentation
Mar 19, 2021
48bcb77
feat: logging for no transaction, adding semver check
Mar 19, 2021
96251a6
fix: AWS not mysql2
Mar 19, 2021
8eeb926
test: add tav configuration
Mar 19, 2021
29b26d7
docs: post-tech review for docs
Mar 22, 2021
29987f9
Merge branch 'master' into astorm/aws-sdk-sqs
Mar 23, 2021
e963940
fix: merge debris
Mar 23, 2021
63ca069
feat: metrics
Mar 23, 2021
8dca1c7
chore: doc blocks
Mar 23, 2021
0b89dc2
fix: pre-commit fixes
Mar 23, 2021
79bc833
fix: more doc blocks
Mar 23, 2021
a05f7b0
fix: lint, aws sqs request failures
Mar 23, 2021
527df68
fix: node 8 syntax issue
Mar 23, 2021
7d95562
Merge branch 'master' into astorm/aws-sdk-sqs
Mar 24, 2021
90ac7ce
Merge branch 'master' into astorm/aws-sdk-sqs
astorm Mar 26, 2021
2a7ec7c
Merge branch 'master' into astorm/aws-sdk-sqs
Mar 29, 2021
621c1bb
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
0417cf0
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
5677e47
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
df09585
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
b2ab2d3
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
423b65f
Update lib/instrumentation/modules/aws-sdk/sqs.js
astorm Mar 29, 2021
6abec2a
docs: expanded changelog
Mar 29, 2021
504e5e9
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
e67e733
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
de9ee64
chore: grammar
Mar 29, 2021
c35b6ea
chore: subtype
Mar 29, 2021
9851c66
chore: enabled guard
Mar 29, 2021
d38762d
Update docs/message-queues.asciidoc
astorm Mar 29, 2021
a88cc95
docs: remove confusing extra caveat
Mar 29, 2021
0bfefd7
docs: no such things as level 4 titles
Mar 29, 2021
2f8de38
docs: small rewrite of dt
Mar 29, 2021
cb56aa6
chore: flakey test
Mar 30, 2021
e14a6c9
feat: refactor to use bindFunction
Mar 30, 2021
49084cc
chore: test race condition
Mar 30, 2021
a3d3f2b
chore: any is a broad statement
Mar 30, 2021
7f2827f
Update lib/instrumentation/index.js
astorm Apr 1, 2021
058038e
Update lib/instrumentation/index.js
astorm Apr 1, 2021
337c3ea
Update lib/instrumentation/index.js
astorm Apr 1, 2021
1865b85
Update lib/instrumentation/index.js
astorm Apr 1, 2021
d7c4eee
Merge branch 'master' into astorm/aws-sdk-sqs
astorm Apr 1, 2021
02f6620
feat: shift span creation strat., adjust tests, ensure tests run in ci
Apr 1, 2021
7304c29
feat: docs, messaging context
Apr 1, 2021
3bc5a45
feat: adding support for .promise
Apr 1, 2021
9434579
feature: promise and tests for same
Apr 1, 2021
95a5d9b
feat: logger for tests, old age variable
Apr 1, 2021
0d111c8
fix: fixing test fixture to be message aware
Apr 1, 2021
c5603c1
chore: remove error logging
Apr 2, 2021
6c88f7f
chore: change to message instead of messaging
Apr 5, 2021
d99a68f
chore: update comment
Apr 5, 2021
2941bda
chore: unhardcode queue
Apr 5, 2021
e4eee59
chore: what's a Metics
Apr 6, 2021
4646ca0
dive bomb into alan's PR to see if I can temporarily fix tests
trentm Apr 6, 2021
b34cacf
Merge branch 'master' into astorm/aws-sdk-sqs
Apr 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .tav.yml
Original file line number Diff line number Diff line change
Expand Up @@ -360,3 +360,8 @@ body-parser:
versions: '>=1.19.0'
commands:
- node test/sanitize-field-names/express.js

aws-sdk:
versions: '>=2.858 <3'
astorm marked this conversation as resolved.
Show resolved Hide resolved
commands:
- node test/instrumentation/modules/aws-sdk/sqs.js
4 changes: 4 additions & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ Notes:
[float]
===== Features

* Adds support for Amazon SQS queues via `aws-sdk` instrumentation that
partially implements the https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-messaging.md[APM messaging spec],
and adds `queue.latency.min.ms`, `queue.latency.max.ms`, and `queue.latency.avg.ms`
metrics for SQS queues.
astorm marked this conversation as resolved.
Show resolved Hide resolved
* The APM agent's own internal logging now uses structured JSON logging using
the https://getpino.io/#/docs/api?id=logger[pino API], and formatted in
{ecs-logging-ref}/intro.html[ecs-logging] format. The log records on stdout
Expand Down
2 changes: 2 additions & 0 deletions docs/index.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ include::./source-maps.asciidoc[]

include::./distributed-tracing.asciidoc[]

include::./message-queues.asciidoc[]

include::./performance-tuning.asciidoc[]
astorm marked this conversation as resolved.
Show resolved Hide resolved

include::./troubleshooting.asciidoc[]
Expand Down
104 changes: 104 additions & 0 deletions docs/message-queues.asciidoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
[[message-queues]]
== Message queues

The Node.js Agent will automatically create spans for activity to/from your Amazon SQS message queues. To record these spans, your message queue activity must occur during a transaction. If you're performing queue operations during an HTTP request from a <<compatibility-frameworks,supported framework>>, the agent will start a transaction automatically. However, if you're performing queue operations in a stand-alone program (such as a message processor), you'll need to use the Node.js Agent's <<apm-start-transaction,`startTransaction()`>> method to manually create transactions for your messages.

You can see an example of this in the following code sample.

[source,js]
----
const apm = require('elastic-apm-node').start({/*...*/})
const AWS = require('aws-sdk');
// Set the region
AWS.config.update({region: 'us-west'});

// Create an SQS service object
const sqs = new AWS.SQS({apiVersion: '2012-11-05'});

/* ... */

const transaction = apm.startTransaction("Process Messages", 'cli') <1>
sqs.receiveMessage(params, function(err, data) {
if(err) {
console.log("Receive Error", err);
} else {
console.log(`Length: ${data.Messages.length}`)
/* process messages */
}
// end the transaction
transaction.end() <2>
})
----
<1> Prior to calling the `sqs.receiveMessage` method, start a new transaction.
<2> Only end the transaction _after_ the queue's processing callback finishes executing. The will ensure a transaction is active while processing your queue messages.

[float]
[[message-queues-distributed-tracing]]
=== Distributed tracing and messaging queues

To enable queue scheduling and queue processing with distributed tracing, use the Node.js Agent's API to _store_ a `traceparent` header with your queue message; then, provide that `traceparent` header when starting a new transaction.

Here's a _new_ example that uses the Node.js Agent API to store the `traceparent` as a message attribute and then uses that attribute to link your new transaction with the original.

**Storing the Traceparent**

When sending the message, you'll want to add the trace as one of the `MessageAttributes`.
[source,js]
----
// stores the traceparent when sending the queue message
const traceParent = apm.currentTransaction ? apm.currentTransaction.traceparent : ''

// Use the Amazon SQS `MessageAttributes` to pass
// on the traceparent header
const params = {
/* ... other params ... */
MessageAttributes: {
/* ... other attributes ... */
"MyTraceparent":{
DataType: "String",
StringValue: traceParent
}
}

}
sqs.sendMessage(params, function(err, data) {
/* ... */
});
----

This will save the traceparent value so we can use it later on when receiving the messages.

**Applying the Traceparent**

When we receive our queue messages, we'll check the message for our Traceparent header, and use it to start a new transaction. By starting a transaction with this traceparent header we'll be linking the sending and receiving via distributed tracing.

[source,js]
----
// uses the traceparent to start a transaction

sqs.receiveMessage(params, function(err, data) {
if(!data.Messages) {
return
}

// loop over your returned messages
for(const message of data.Messages) { <1>
// start a transaction to process each message, using our previously
// saved distributed tracing traceparent header
let traceparent
if(message.MessageAttributes.MyTraceparent) {
traceparent = message.MessageAttributes.MyTraceparent.StringValue
}
const transactionMessage = apm.startTransaction('RECEIVE_TRANSACTION', 'cli', {
childOf:traceparent <2>
})
/* ... process message ... */
transactionMessage.end() <3>
}
})

----
<1> Even though we only scheduled one queue message, Amazon's SQS API returns an array of _multiple_ messages. Therefore we'll need to loop over each one.
<2> We extract the traceparent header we'd previously save, and use it to start a transaction.
<3> Once we're done processing a single message, we end the transaction and move on to the next.

1 change: 1 addition & 0 deletions docs/supported-technologies.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ The Node.js agent will automatically instrument the following modules to give yo
[options="header"]
|=======================================================================
|Module |Version |Note
|https://www.npmjs.com/package/aws-sdk[aws-sdk] |>1 <3 |Will instrument SQS send/receive/delete messages
|https://www.npmjs.com/package/cassandra-driver[cassandra-driver] |>=3.0.0 |Will instrument all queries
|https://www.npmjs.com/package/elasticsearch[elasticsearch] |>=8.0.0 |Will instrument all queries
|https://www.npmjs.com/package/@elastic/elasticsearch[@elastic/elasticsearch] |>=7.0.0 <8.0.0 |Will instrument all queries
Expand Down
1 change: 1 addition & 0 deletions lib/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,7 @@ Agent.prototype.captureError = function (err, opts, cb) {

if (agent._transport) {
agent.logger.info('Sending error to Elastic APM: %o', { id })
console.log(err)
agent._transport.sendError(error, function () {
agent.flush(function (err) {
if (cb) cb(err, id)
Expand Down
30 changes: 30 additions & 0 deletions lib/instrumentation/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var Transaction = require('./transaction')
var MODULES = [
'@elastic/elasticsearch',
'apollo-server-core',
'aws-sdk',
'bluebird',
'cassandra-driver',
'elasticsearch',
Expand Down Expand Up @@ -304,6 +305,35 @@ Instrumentation.prototype.setSpanOutcome = function (outcome) {

var wrapped = Symbol('elastic-apm-wrapped-function')

// Binds a callback function to the currently active span
//
// An instrumentation programmer can use this function to wrap a callback
// function of another function at the call-time of the original function.
// The pattern is
//
// 1. Instrumentation programmer uses shimmer.wrap to wrap a function that also
// has an asyncronous callback as an argument
//
// 2. In the code that executes before calling the original function, extract
// the callback argument and pass it to bindFunction, which will return a
// new function
//
// 3. Pass the function returned by bindFunction in place of the callback
// argument when calling the original function.
//
// bindFunction function will "save" the currently active span via closure,
// and when the callback is invoked, the span and transaction active when
// the program called original function will be set as active. This ensures
// the callback function gets instrument on "the right" transaction and span.
//
// The instrumentation programmer is still responsible for starting a span,
// and ending a span. Additionally, this function will set a span's sync
// property to `false` -- it's up to the instrumentation programmer to ensure
// that the callback they're binding is really async. If bindFunction is
// passed a callback that the wrapped function executes synchronously, it will
// still mark the span's `async` property as `false`.
//
// @param {function} original
Instrumentation.prototype.bindFunction = function (original) {
if (typeof original !== 'function' || original.name === 'elasticAPMCallbackWrapper') return original

Expand Down
40 changes: 40 additions & 0 deletions lib/instrumentation/modules/aws-sdk.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
'use strict'
const semver = require('semver')
const shimmer = require('../shimmer')
const { instrumentationSqs } = require('./aws-sdk/sqs')

// Called in place of AWS.Request.send
astorm marked this conversation as resolved.
Show resolved Hide resolved
//
// Determines which amazon service an API request is for
// and then passes call on to an appropriate instrumentation
// function.
function instrumentOperation (orig, origArguments, request, AWS, agent, { version, enabled }) {
if (request.service.serviceIdentifier === 'sqs') {
return instrumentationSqs(orig, origArguments, request, AWS, agent, { version, enabled })
}

// if we're still here, then we still need to call the original method
return orig.apply(request, origArguments)
}

// main entry point for aws-sdk instrumentation
module.exports = function (AWS, agent, { version, enabled }) {
astorm marked this conversation as resolved.
Show resolved Hide resolved
if (!enabled) return AWS
if (!semver.satisfies(version, '>1 <3')) {
agent.logger.debug('aws-sdk version %s not supported - aborting...', version)
return AWS
}

shimmer.wrap(AWS.Request.prototype, 'send', function (orig) {
return function _wrappedAWSRequestSend () {
return instrumentOperation(orig, arguments, this, AWS, agent, { version, enabled })
astorm marked this conversation as resolved.
Show resolved Hide resolved
}
})

shimmer.wrap(AWS.Request.prototype, 'promise', function (orig) {
return function _wrappedAWSRequestPromise () {
return instrumentOperation(orig, arguments, this, AWS, agent, { version, enabled })
}
})
return AWS
}
Loading