diff --git a/.tav.yml b/.tav.yml index 1b03bebbd2..ee64c6e24c 100644 --- a/.tav.yml +++ b/.tav.yml @@ -360,3 +360,8 @@ body-parser: versions: '>=1.19.0' commands: - node test/sanitize-field-names/express.js + +aws-sdk: + versions: '>=2.858 <3' + commands: + - node test/instrumentation/modules/aws-sdk/sqs.js diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 1771ee01de..3e49a1b88d 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -34,6 +34,10 @@ Notes: [float] ===== Features +* Adds support for Amazon SQS queues via `aws-sdk` instrumentation that + partially implements the https://github.com/elastic/apm/blob/master/specs/agents/tracing-instrumentation-messaging.md[APM messaging spec], + and adds `queue.latency.min.ms`, `queue.latency.max.ms`, and `queue.latency.avg.ms` + metrics for SQS queues. * The APM agent's own internal logging now uses structured JSON logging using the https://getpino.io/#/docs/api?id=logger[pino API], and formatted in {ecs-logging-ref}/intro.html[ecs-logging] format. The log records on stdout diff --git a/docs/index.asciidoc b/docs/index.asciidoc index cf88a7de31..cf7f3e362a 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -39,6 +39,8 @@ include::./source-maps.asciidoc[] include::./distributed-tracing.asciidoc[] +include::./message-queues.asciidoc[] + include::./performance-tuning.asciidoc[] include::./troubleshooting.asciidoc[] diff --git a/docs/message-queues.asciidoc b/docs/message-queues.asciidoc new file mode 100644 index 0000000000..80310a2b0c --- /dev/null +++ b/docs/message-queues.asciidoc @@ -0,0 +1,104 @@ +[[message-queues]] +== Message queues + +The Node.js Agent will automatically create spans for activity to/from your Amazon SQS message queues. To record these spans, your message queue activity must occur during a transaction. If you're performing queue operations during an HTTP request from a <>, the agent will start a transaction automatically. However, if you're performing queue operations in a stand-alone program (such as a message processor), you'll need to use the Node.js Agent's <> method to manually create transactions for your messages. + +You can see an example of this in the following code sample. + +[source,js] +---- +const apm = require('elastic-apm-node').start({/*...*/}) +const AWS = require('aws-sdk'); +// Set the region +AWS.config.update({region: 'us-west'}); + +// Create an SQS service object +const sqs = new AWS.SQS({apiVersion: '2012-11-05'}); + +/* ... */ + +const transaction = apm.startTransaction("Process Messages", 'cli') <1> +sqs.receiveMessage(params, function(err, data) { + if(err) { + console.log("Receive Error", err); + } else { + console.log(`Length: ${data.Messages.length}`) + /* process messages */ + } + // end the transaction + transaction.end() <2> +}) +---- +<1> Prior to calling the `sqs.receiveMessage` method, start a new transaction. +<2> Only end the transaction _after_ the queue's processing callback finishes executing. The will ensure a transaction is active while processing your queue messages. + +[float] +[[message-queues-distributed-tracing]] +=== Distributed tracing and messaging queues + +To enable queue scheduling and queue processing with distributed tracing, use the Node.js Agent's API to _store_ a `traceparent` header with your queue message; then, provide that `traceparent` header when starting a new transaction. + +Here's a _new_ example that uses the Node.js Agent API to store the `traceparent` as a message attribute and then uses that attribute to link your new transaction with the original. + +**Storing the Traceparent** + +When sending the message, you'll want to add the trace as one of the `MessageAttributes`. +[source,js] +---- +// stores the traceparent when sending the queue message +const traceParent = apm.currentTransaction ? apm.currentTransaction.traceparent : '' + +// Use the Amazon SQS `MessageAttributes` to pass +// on the traceparent header +const params = { + /* ... other params ... */ + MessageAttributes: { + /* ... other attributes ... */ + "MyTraceparent":{ + DataType: "String", + StringValue: traceParent + } + } + +} +sqs.sendMessage(params, function(err, data) { + /* ... */ +}); +---- + +This will save the traceparent value so we can use it later on when receiving the messages. + +**Applying the Traceparent** + +When we receive our queue messages, we'll check the message for our Traceparent header, and use it to start a new transaction. By starting a transaction with this traceparent header we'll be linking the sending and receiving via distributed tracing. + +[source,js] +---- +// uses the traceparent to start a transaction + +sqs.receiveMessage(params, function(err, data) { + if(!data.Messages) { + return + } + + // loop over your returned messages + for(const message of data.Messages) { <1> + // start a transaction to process each message, using our previously + // saved distributed tracing traceparent header + let traceparent + if(message.MessageAttributes.MyTraceparent) { + traceparent = message.MessageAttributes.MyTraceparent.StringValue + } + const transactionMessage = apm.startTransaction('RECEIVE_TRANSACTION', 'cli', { + childOf:traceparent <2> + }) + /* ... process message ... */ + transactionMessage.end() <3> + } +}) + +---- +<1> Even though we only scheduled one queue message, Amazon's SQS API returns an array of _multiple_ messages. Therefore we'll need to loop over each one. +<2> We extract the traceparent header we'd previously save, and use it to start a transaction. +<3> Once we're done processing a single message, we end the transaction and move on to the next. + diff --git a/docs/supported-technologies.asciidoc b/docs/supported-technologies.asciidoc index aa4f0cdf45..4dfbd148e3 100644 --- a/docs/supported-technologies.asciidoc +++ b/docs/supported-technologies.asciidoc @@ -72,6 +72,7 @@ The Node.js agent will automatically instrument the following modules to give yo [options="header"] |======================================================================= |Module |Version |Note +|https://www.npmjs.com/package/aws-sdk[aws-sdk] |>1 <3 |Will instrument SQS send/receive/delete messages |https://www.npmjs.com/package/cassandra-driver[cassandra-driver] |>=3.0.0 |Will instrument all queries |https://www.npmjs.com/package/elasticsearch[elasticsearch] |>=8.0.0 |Will instrument all queries |https://www.npmjs.com/package/@elastic/elasticsearch[@elastic/elasticsearch] |>=7.0.0 <8.0.0 |Will instrument all queries diff --git a/lib/agent.js b/lib/agent.js index ce09378dde..4bf3197f93 100644 --- a/lib/agent.js +++ b/lib/agent.js @@ -456,7 +456,6 @@ Agent.prototype.handleUncaughtExceptions = function (cb) { // The `uncaughtException` listener inhibits this behavor, and it's // therefore necessary to manually do this to not break expectations. if (agent._conf.logUncaughtExceptions === true) console.error(err) - agent.logger.debug('Elastic APM caught unhandled exception: %s', err.message) agent.captureError(err, { handled: false }, function () { diff --git a/lib/instrumentation/index.js b/lib/instrumentation/index.js index 1443bb00f0..fa33f55c7b 100644 --- a/lib/instrumentation/index.js +++ b/lib/instrumentation/index.js @@ -13,6 +13,7 @@ var Transaction = require('./transaction') var MODULES = [ '@elastic/elasticsearch', 'apollo-server-core', + 'aws-sdk', 'bluebird', 'cassandra-driver', 'elasticsearch', @@ -304,6 +305,35 @@ Instrumentation.prototype.setSpanOutcome = function (outcome) { var wrapped = Symbol('elastic-apm-wrapped-function') +// Binds a callback function to the currently active span +// +// An instrumentation programmer can use this function to wrap a callback +// function of another function at the call-time of the original function. +// The pattern is +// +// 1. Instrumentation programmer uses shimmer.wrap to wrap a function that also +// has an asyncronous callback as an argument +// +// 2. In the code that executes before calling the original function, extract +// the callback argument and pass it to bindFunction, which will return a +// new function +// +// 3. Pass the function returned by bindFunction in place of the callback +// argument when calling the original function. +// +// bindFunction function will "save" the currently active span via closure, +// and when the callback is invoked, the span and transaction active when +// the program called original function will be set as active. This ensures +// the callback function gets instrument on "the right" transaction and span. +// +// The instrumentation programmer is still responsible for starting a span, +// and ending a span. Additionally, this function will set a span's sync +// property to `false` -- it's up to the instrumentation programmer to ensure +// that the callback they're binding is really async. If bindFunction is +// passed a callback that the wrapped function executes synchronously, it will +// still mark the span's `async` property as `false`. +// +// @param {function} original Instrumentation.prototype.bindFunction = function (original) { if (typeof original !== 'function' || original.name === 'elasticAPMCallbackWrapper') return original diff --git a/lib/instrumentation/modules/aws-sdk.js b/lib/instrumentation/modules/aws-sdk.js new file mode 100644 index 0000000000..67c133c1fe --- /dev/null +++ b/lib/instrumentation/modules/aws-sdk.js @@ -0,0 +1,40 @@ +'use strict' +const semver = require('semver') +const shimmer = require('../shimmer') +const { instrumentationSqs } = require('./aws-sdk/sqs') + +// Called in place of AWS.Request.send and AWS.Request.promise +// +// Determines which amazon service an API request is for +// and then passes call on to an appropriate instrumentation +// function. +function instrumentOperation (orig, origArguments, request, AWS, agent, { version, enabled }) { + if (request.service.serviceIdentifier === 'sqs') { + return instrumentationSqs(orig, origArguments, request, AWS, agent, { version, enabled }) + } + + // if we're still here, then we still need to call the original method + return orig.apply(request, origArguments) +} + +// main entry point for aws-sdk instrumentation +module.exports = function (AWS, agent, { version, enabled }) { + if (!enabled) return AWS + if (!semver.satisfies(version, '>1 <3')) { + agent.logger.debug('aws-sdk version %s not supported - aborting...', version) + return AWS + } + + shimmer.wrap(AWS.Request.prototype, 'send', function (orig) { + return function _wrappedAWSRequestSend () { + return instrumentOperation(orig, arguments, this, AWS, agent, { version, enabled }) + } + }) + + shimmer.wrap(AWS.Request.prototype, 'promise', function (orig) { + return function _wrappedAWSRequestPromise () { + return instrumentOperation(orig, arguments, this, AWS, agent, { version, enabled }) + } + }) + return AWS +} diff --git a/lib/instrumentation/modules/aws-sdk/sqs.js b/lib/instrumentation/modules/aws-sdk/sqs.js new file mode 100644 index 0000000000..92201b2a85 --- /dev/null +++ b/lib/instrumentation/modules/aws-sdk/sqs.js @@ -0,0 +1,192 @@ +'use strict' +const { URL } = require('url') +const constants = require('../../../constants') +const OPERATIONS_TO_ACTIONS = { + deleteMessage: 'delete', + deleteMessageBatch: 'delete_batch', + receiveMessage: 'poll', + sendMessageBatch: 'send_batch', + sendMessage: 'send', + unknown: 'unknown' +} +const OPERATIONS = Object.keys(OPERATIONS_TO_ACTIONS) +const TYPE = 'messaging' +const SUBTYPE = 'sqs' +const queueMetrics = new Map() + +// Returns Message Queue action from AWS SDK method name +function getActionFromRequest (request) { + request = request || {} + const operation = request.operation ? request.operation : 'unknown' + const action = OPERATIONS_TO_ACTIONS[operation] + + return action +} + +// Returns preposition to use in span name +// +// POLL from ... +// SEND to ... +function getToFromFromOperation (operation) { + let result = 'from' + if (operation === 'sendMessage' || operation === 'sendMessageBatch') { + result = 'to' + } + return result +} + +// Parses queue/topic name from AWS queue URL +function getQueueNameFromRequest (request) { + const unknown = 'unknown' + if (!request || !request.params || !request.params.QueueUrl) { + return unknown + } + try { + const url = new URL(request.params.QueueUrl) + return url.pathname.split('/').pop() + } catch (e) { + return unknown + } +} + +// Parses region name from AWS service configuration +function getRegionFromRequest (request) { + const region = request && request.service && + request.service.config && request.service.config.region + return region || '' +} + +// Creates message destination context suitable for setDestinationContext +function getMessageDestinationContextFromRequest (request) { + const destination = { + service: { + name: SUBTYPE, + resource: `${SUBTYPE}/${getQueueNameFromRequest(request)}`, + type: TYPE + }, + cloud: { + region: getRegionFromRequest(request) + } + } + return destination +} + +// create message context suitable for setMessageContext +function getMessageContextFromRequest (request) { + const message = { + queue: { + name: getQueueNameFromRequest(request) + } + } + return message +} + +// Record queue related metrics +// +// Creates metric collector objects on first run, and +// updates their data with data from received messages +function recordMetrics (queueName, data, agent) { + const messages = data && data.Messages + if (!messages || messages.length < 1) { + return + } + if (!queueMetrics.get(queueName)) { + const collector = agent._metrics.createQueueMetricsCollector(queueName) + queueMetrics.set(queueName, collector) + } + const metrics = queueMetrics.get(queueName) + + for (const message of messages) { + const sentTimestamp = message.Attributes && message.Attributes.SentTimestamp + const delay = (new Date()).getTime() - sentTimestamp + metrics.updateStats(delay) + } +} + +// Creates the span name from request information +function getSpanNameFromRequest (request) { + const action = getActionFromRequest(request) + const toFrom = getToFromFromOperation(request.operation) + const queueName = getQueueNameFromRequest(request) + + const name = `${SUBTYPE.toUpperCase()} ${action.toUpperCase()} ${toFrom} ${queueName}` + return name +} + +function shouldIgnoreRequest (request, agent) { + const operation = request && request.operation + // are we interested in this operation/method call? + if (OPERATIONS.indexOf(operation) === -1) { + return true + } + + // is the named queue on our ignore list? + if (agent._conf && agent._conf.ignoreMessageQueuesRegExp) { + const queueName = getQueueNameFromRequest(request) + for (const rule of agent._conf.ignoreMessageQueuesRegExp) { + if (rule.test(queueName)) { + return true + } + } + } + + if (!agent.currentTransaction) { + agent.logger.trace('no active transaction found, skipping sqs instrumentation') + return true + } + + return false +} + +// Main entrypoint for SQS instrumentation +// +// Must call (or one of its function calls must call) the +// `orig` function/method +function instrumentationSqs (orig, origArguments, request, AWS, agent, { version, enabled }) { + if (shouldIgnoreRequest(request, agent)) { + return orig.apply(request, origArguments) + } + + const type = TYPE + const subtype = SUBTYPE + const action = getActionFromRequest(request) + const name = getSpanNameFromRequest(request) + const span = agent.startSpan(name, type, subtype, action) + span.setDestinationContext(getMessageDestinationContextFromRequest(request)) + span.setMessageContext(getMessageContextFromRequest(request)) + + request.on('complete', function (response) { + if (response && response.error) { + const errOpts = { + skipOutcome: true + } + agent.captureError(response.error, errOpts) + span._setOutcomeFromErrorCapture(constants.OUTCOME_FAILURE) + } + + // we'll need to manually mark this span as async. The actual async hop + // is captured by the agent's async hooks instrumentation + span.sync = false + span.end() + + if (request.operation === 'receiveMessage' && response && response.data) { + recordMetrics(getQueueNameFromRequest(request), response.data, agent) + } + }) + + const origResult = orig.apply(request, origArguments) + + return origResult +} + +module.exports = { + instrumentationSqs, + + // exported for tests + getToFromFromOperation, + getActionFromRequest, + getQueueNameFromRequest, + getRegionFromRequest, + getMessageDestinationContextFromRequest, + shouldIgnoreRequest +} diff --git a/lib/instrumentation/span.js b/lib/instrumentation/span.js index 8692c20308..32322a1593 100644 --- a/lib/instrumentation/span.js +++ b/lib/instrumentation/span.js @@ -33,6 +33,7 @@ function Span (transaction, name, ...args) { this._db = null this._http = null this._destination = null + this._message = null this._stackObj = null this.transaction = transaction @@ -96,6 +97,10 @@ Span.prototype.setDestinationContext = function (context) { this._destination = Object.assign(this._destination || {}, context) } +Span.prototype.setMessageContext = function (context) { + this._message = Object.assign(this._message || {}, context) +} + Span.prototype.setOutcome = function (outcome) { if (!this._isValidOutcome(outcome)) { this._agent.logger.trace( @@ -214,12 +219,13 @@ Span.prototype._encode = function (cb) { outcome: self.outcome } - if (self._db || self._http || self._labels || self._destination) { + if (self._db || self._http || self._labels || self._destination || self._message) { payload.context = { db: self._db || undefined, http: self._http || undefined, tags: self._labels || undefined, - destination: self._destination || undefined + destination: self._destination || undefined, + message: self._message || undefined } } diff --git a/lib/metrics/index.js b/lib/metrics/index.js index 8b10267b55..bc759a2e4d 100644 --- a/lib/metrics/index.js +++ b/lib/metrics/index.js @@ -1,6 +1,7 @@ 'use strict' const MetricsRegistry = require('./registry') +const { createQueueMetrics } = require('./queue') const registrySymbol = Symbol('metrics-registry') const agentSymbol = Symbol('metrics-agent') @@ -54,6 +55,14 @@ class Metrics { getOrCreateGauge (...args) { return this[registrySymbol].getOrCreateGauge(...args) } + + // factory function for creating a queue metrics collector + // + // called from instrumentation, only when the agent receives a queue message + createQueueMetricsCollector (queueOrTopicName) { + const collector = createQueueMetrics(queueOrTopicName, this[registrySymbol]) + return collector + } } module.exports = Metrics diff --git a/lib/metrics/queue.js b/lib/metrics/queue.js new file mode 100644 index 0000000000..d3211d4059 --- /dev/null +++ b/lib/metrics/queue.js @@ -0,0 +1,84 @@ +'use strict' +class QueueMetricsCollector { + constructor () { + this.stats = { + 'queue.latency.min.ms': 0, + 'queue.latency.max.ms': 0, + 'queue.latency.avg.ms': 0 + } + + this.total = 0 + this.count = 0 + this.min = 0 + this.max = 0 + } + + // Updates the data used to generate our stats + // + // Unlike the Stats and RuntimeCollector, this function + // also returns the instantiated collector. + // + // @param {number} time A javascript ms timestamp + updateStats (time) { + if (this.min === 0 || this.min > time) { + this.min = time + } + if (this.max < time) { + this.max = time + } + + this.count++ + this.total += time + } + + // standard `collect` method for stats collectors + // + // called by the MetricReporter object prior to its sending data + // + // @param {function} cb callback function + collect (cb) { + // update average based on count and total + this.stats['queue.latency.avg.ms'] = 0 + if (this.count > 0) { + this.stats['queue.latency.avg.ms'] = this.total / this.count + } + + this.stats['queue.latency.max.ms'] = this.max + this.stats['queue.latency.min.ms'] = this.min + + // reset for next run + this.total = 0 + this.count = 0 + this.max = 0 + this.min = 0 + if (cb) process.nextTick(cb) + } +} + +// Creates and Registers a Metric Collector +// +// Unlike the Stats and RuntimeCollector, this function +// also returns the instantiated collector. +// +// @param {string} queueOrTopicName +// @param {MetricRegistry} registry +// @returns QueueMetricsCollector +function createQueueMetrics (queueOrTopicName, registry) { + const collector = new QueueMetricsCollector() + registry.registerCollector(collector) + for (const metric of Object.keys(collector.stats)) { + registry.getOrCreateGauge( + metric, + function returnCurrentValue () { + return collector.stats[metric] + }, + { queue_name: queueOrTopicName } + ) + } + return collector +} + +module.exports = { + createQueueMetrics, + QueueMetricsCollector +} diff --git a/test/agent.js b/test/agent.js index b3c5eb8938..43ebf1450c 100644 --- a/test/agent.js +++ b/test/agent.js @@ -649,7 +649,7 @@ test('filters', function (t) { setTimeout(function () { t.end() server.close() - }, 50) + }, 200) }) }) diff --git a/test/instrumentation/modules/aws-sdk/fixtures-sqs.js b/test/instrumentation/modules/aws-sdk/fixtures-sqs.js new file mode 100644 index 0000000000..5883c1b047 --- /dev/null +++ b/test/instrumentation/modules/aws-sdk/fixtures-sqs.js @@ -0,0 +1,155 @@ +'use strict' +module.exports = { + sendMessage: { + request: { + DelaySeconds: 10, + MessageAttributes: { + Title: { + DataType: 'String', + StringValue: 'The Whistler' + }, + Author: { + DataType: 'String', + StringValue: 'John Grisham' + }, + WeeksOn: { + DataType: 'Number', + StringValue: '6' + } + }, + MessageBody: 'Information about current NY Times fiction bestseller for week of 12/11/2016.' + + }, + response: ` + + + 4b8a5a94-e3a3-43b7-9a98-16179f3b894f + bbdc5fdb8be7251f5c910905db994bab + d25a6aea97eb8f585bfa92d314504a92 + + + d1ec32a3-1f87-5264-ad1b-6a6eaa0d3385 + + ` + }, + sendMessageBatch: { + request: { + // Remove DelaySeconds parameter and value for FIFO queues + Entries: [{ + Id: 'foo', + DelaySeconds: 10, + MessageAttributes: { + Title: { + DataType: 'String', + StringValue: 'The Whistler' + }, + Author: { + DataType: 'String', + StringValue: 'John Grisham' + }, + WeeksOn: { + DataType: 'Number', + StringValue: '6' + } + }, + MessageBody: 'Information about current NY Times fiction bestseller for week of 12/11/2016.' + }] + }, + respnse: ` + + + + foo + 44e78a10-85f7-49d7-88af-a6b8e760a89f + bbdc5fdb8be7251f5c910905db994bab + d25a6aea97eb8f585bfa92d314504a92 + + + + 10c1406c-b56a-544e-92e2-18a17b0b0988 + + ` + }, + deleteMessage: { + request: { + ReceiptHandle: 'AQEBylmNUj4N0S/U4rDCOgiJks1yfJVcInUpvhe5hmLbeHnEd9q5uynTpJvXOBwHSlMrWZhtus7xJzULz/fi90Ni0cImfu+G9dqp6kIqVXYIItf0iOOT0+w6Yu2RHtuRCGOfxo28EKCBZRbREh6EAmXRL7IAoYZgkR/BI4c9dZi6MHXXwyjW93yFbK+CkMTVh/MoW8ADr9D/4rzf5fb7ipKht73Fe1j1gLCxiBuQiNj7owaxVPb/jVY3NEtWYDKXkhCOscdPoLb6CueADxXPn7mC/l5Kp8DTi6GoI39E3Qbq4kIylA7wmPS5wo+rffLqi9gASN+YpmUG/03+poOzgtM2q0ZYIrFNPjNKSriuWE16V6iTl0ng7uG4pmeCj9zKwaAu8SOZQwRHmMq9qhiyDzBqfDP3GQcZXO8i5WRLdG6nmoRkyUzXq6Zo50eWzzsK2hZ5' + }, + response: ` + + + + foo + + + + 7c240380-2a3d-53c6-b785-5b4e6d63acf8 + + ` + }, + deleteMessageBatch: { + request: { + Entries: [{ + Id: 'foo', + ReceiptHandle: 'AQEBylmNUj4N0S/U4rDCOgiJks1yfJVcInUpvhe5hmLbeHnEd9q5uynTpJvXOBwHSlMrWZhtus7xJzULz/fi90Ni0cImfu+G9dqp6kIqVXYIItf0iOOT0+w6Yu2RHtuRCGOfxo28EKCBZRbREh6EAmXRL7IAoYZgkR/BI4c9dZi6MHXXwyjW93yFbK+CkMTVh/MoW8ADr9D/4rzf5fb7ipKht73Fe1j1gLCxiBuQiNj7owaxVPb/jVY3NEtWYDKXkhCOscdPoLb6CueADxXPn7mC/l5Kp8DTi6GoI39E3Qbq4kIylA7wmPS5wo+rffLqi9gASN+YpmUG/03+poOzgtM2q0ZYIrFNPjNKSriuWE16V6iTl0ng7uG4pmeCj9zKwaAu8SOZQwRHmMq9qhiyDzBqfDP3GQcZXO8i5WRLdG6nmoRkyUzXq6Zo50eWzzsK2hZ5' + }] + }, + response: ` + + + 2ce5ae8b-9308-5c11-8484-f64644126cd4 + + ` + }, + receiveMessage: { + request: { + AttributeNames: [ + 'SentTimestamp' + ], + MaxNumberOfMessages: 1, + MessageAttributeNames: [ + 'All' + ], + VisibilityTimeout: 20 + }, + response: ` + + + + 7e9faa90-1256-4ea8-aed1-f0d55d45f778 + AQEBXxpSdLbnmP0031G1uHDyfdIvRFcl6kYINW8Av1c5TVg+Awybw8zOVIOniGcPxYDo+XkaTE7Ms0Og906TjZA/KmB+ssF5Ycx0yb2SoMeIsSJOHkk8GfrDpJLr91s/QgY1qrmdojZkB8vADQr3JMGvrpjY2FvVf1h+mMRY8dvzPAI8YNNI3jErWd5s8jJs/8QNiH84mLdWkPWMUCgWVfG85kHUcd4lN6P4Va/rGOcMnCEsLOZKTzdnxbs4N2aT3qCzBjut71RHi7kZCGqVWMrEnWswhWcFdLvrmXyrVtQ3FESDMDy28e3UryLZVcuHui9qefGE8P82bYDaMO7JSAx6+cbsKx6On8uwzyX9ycuIdnTKv8YpvY8NIFYU/sC+bk7ZGpeGCJOUKXkthdr/DAmDTJWF2HGQThLDbWKsMtHarCPSK52MPdUv4kEi6x1OEtTX + bbdc5fdb8be7251f5c910905db994bab + d25a6aea97eb8f585bfa92d314504a92 + Information about current NY Times fiction bestseller for week of 12/11/2016. + + SentTimestamp + 1616098068631 + + + Author + + John Grisham + String + + + + Title + + The Whistler + String + + + + WeeksOn + + 6 + Number + + + + + + c1f742a0-56ba-59a4-95d5-1a6e8dc7f577 + + ` + } +} diff --git a/test/instrumentation/modules/aws-sdk/sqs.js b/test/instrumentation/modules/aws-sdk/sqs.js new file mode 100644 index 0000000000..872cd9b926 --- /dev/null +++ b/test/instrumentation/modules/aws-sdk/sqs.js @@ -0,0 +1,628 @@ +'use strict' +const agent = require('../../../..').start({ + serviceName: 'test', + secretToken: 'test', + captureExceptions: false, + metricsInterval: 0, + centralConfig: false +}) + +const tape = require('tape') +const AWS = require('aws-sdk') +const express = require('express') +const bodyParser = require('body-parser') +const fixtures = require('./fixtures-sqs') +const logging = require('../../../../lib/logging') +const mockClient = require('../../../_mock_http_client') + +const { + getToFromFromOperation, + getActionFromRequest, + getQueueNameFromRequest, + getRegionFromRequest, + getMessageDestinationContextFromRequest, + shouldIgnoreRequest +} = + require('../../../../lib/instrumentation/modules/aws-sdk/sqs') + +initializeAwsSdk() + +tape.test('AWS SQS: Unit Test Functions', function (test) { + test.test('function getToFromFromOperation', function (t) { + t.equals(getToFromFromOperation('deleteMessage'), 'from') + t.equals(getToFromFromOperation('deleteMessageBatch'), 'from') + t.equals(getToFromFromOperation('receiveMessage'), 'from') + t.equals(getToFromFromOperation('sendMessageBatch'), 'to') + t.equals(getToFromFromOperation('sendMessage'), 'to') + t.end() + }) + + test.test('function getActionFromOperation', function (t) { + const request = {} + + request.operation = 'deleteMessage' + t.equals(getActionFromRequest(request), 'delete') + + request.operation = 'deleteMessageBatch' + t.equals(getActionFromRequest(request), 'delete_batch') + + request.operation = 'receiveMessage' + t.equals(getActionFromRequest(request), 'poll') + + request.operation = 'sendMessage' + t.equals(getActionFromRequest(request), 'send') + + request.operation = 'sendMessageBatch' + t.equals(getActionFromRequest(request), 'send_batch') + + request.operation = 'sendMessageBatch' + request.params = null + t.equals(getActionFromRequest(request), 'send_batch') + + request.operation = 'sendMessageBatch' + request.params = {} + t.equals(getActionFromRequest(request), 'send_batch') + + request.operation = 'receiveMessage' + request.params = {} + t.equals(getActionFromRequest(request), 'poll') + + request.operation = 'receiveMessage' + request.params = { WaitTimeSeconds: 0 } + t.equals(getActionFromRequest(request), 'poll') + + request.operation = 'receiveMessage' + request.params = { WaitTimeSeconds: -1 } + t.equals(getActionFromRequest(request), 'poll') + + request.operation = 'receiveMessage' + request.params = { WaitTimeSeconds: 1 } + t.equals(getActionFromRequest(request), 'poll') + t.end() + }) + + test.test('function getQueueNameFromRequest', function (t) { + const request = {} + t.equals(getQueueNameFromRequest(null), 'unknown') + t.equals(getQueueNameFromRequest(request), 'unknown') + + request.params = null + t.equals(getQueueNameFromRequest(request), 'unknown') + request.params = {} + t.equals(getQueueNameFromRequest(request), 'unknown') + + request.params.QueueUrl = null + t.equals(getQueueNameFromRequest(request), 'unknown') + request.params.QueueUrl = 5 + t.equals(getQueueNameFromRequest(request), 'unknown') + request.params.QueueUrl = 'foo/baz/bar' + t.equals(getQueueNameFromRequest(request), 'unknown') + + request.params.QueueUrl = 'http://foo/baz/bar' + t.equals(getQueueNameFromRequest(request), 'bar') + + request.params.QueueUrl = 'http://foo/baz/bar/bing?some=params&ok=true' + t.equals(getQueueNameFromRequest(request), 'bing') + t.end() + }) + + test.test('function getRegionFromRequest', function (t) { + const request = {} + t.equals(getRegionFromRequest(null), '') + t.equals(getRegionFromRequest(request), '') + + request.service = null + t.equals(getRegionFromRequest(request), '') + request.service = {} + t.equals(getRegionFromRequest(request), '') + + request.service.config = null + t.equals(getRegionFromRequest(request), '') + request.service.config = {} + t.equals(getRegionFromRequest(request), '') + + request.service.config.region = null + t.equals(getRegionFromRequest(request), '') + request.service.config.region = 'region-name' + t.equals(getRegionFromRequest(request), 'region-name') + + t.end() + }) + + test.test('function shouldIgnoreRequest', function (t) { + t.equals(shouldIgnoreRequest(null, null), true) + + const request = { + operation: 'deleteMessage', + params: { + QueueUrl: 'http://foo/baz/bar/bing?some=params&ok=true' + } + } + const agent = { + _conf: { + ignoreMessageQueuesRegExp: [] + }, + logger: logging.createLogger('off') + } + t.equals(shouldIgnoreRequest(request, agent), true) + + agent.currentTransaction = { mocked: 'transaction' } + t.equals(shouldIgnoreRequest(request, agent), false) + + agent._conf.ignoreMessageQueuesRegExp.push(/b.*g/) + t.equals(shouldIgnoreRequest(request, agent), true) + + agent.operation = 'fakeMethod' + t.equals(shouldIgnoreRequest(request, agent), true) + + t.end() + }) + + test.test('function getMessageDestinationContext', function (t) { + const request = { + service: { + config: { + region: 'region-name' + } + }, + params: { + QueueUrl: 'http://foo/baz/bar/bing?some=params&ok=true' + } + } + + t.equals(getRegionFromRequest(request), 'region-name') + t.equals(getQueueNameFromRequest(request), 'bing') + + t.deepEquals(getMessageDestinationContextFromRequest(request), { + service: { + name: 'sqs', + resource: 'sqs/bing', + type: 'messaging' + }, + cloud: { + region: 'region-name' + } + }) + t.end() + }) + + test.end() +}) + +tape.test('AWS SQS: End to End Tests', function (test) { + test.test('API: sendMessage', function (t) { + const app = createMockServer( + getXmlResponse('sendMessage') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + const [spanSqs, spanHttp] = getSqsAndOtherSpanFromData(data, t) + + t.equals(spanSqs.name, 'SQS SEND to our-queue', 'SQS span named correctly') + t.equals(spanSqs.type, 'messaging', 'span type set to messaging') + t.equals(spanSqs.subtype, 'sqs', 'span subtype set to sqs') + t.equals(spanSqs.action, 'send', 'span action matches API method called') + t.equals(spanSqs.context.destination.service.type, 'messaging', 'messaging context set') + t.equals(spanSqs.context.message.queue.name, 'our-queue', 'queue name context set') + + t.equals(spanHttp.type, 'external', 'other span is for HTTP request') + t.end() + }) + agent.startTransaction('myTransaction') + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('sendMessage', listener.address().port) + sqs.sendMessage(params, function (err, data) { + t.error(err) + agent.endTransaction() + listener.close() + }) + }) + }) + test.test('API: sendMessageBatch', function (t) { + const app = createMockServer( + getXmlResponse('sendMessageBatch') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + const [spanSqs, spanHttp] = getSqsAndOtherSpanFromData(data, t) + + t.equals(spanHttp.type, 'external', 'other span is for HTTP request') + + t.equals(spanSqs.name, 'SQS SEND_BATCH to our-queue', 'SQS span named correctly') + t.equals(spanSqs.type, 'messaging', 'span type set to messaging') + t.equals(spanSqs.subtype, 'sqs', 'span subtype set to sqs') + t.equals(spanSqs.action, 'send_batch', 'span action matches API method called') + t.equals(spanSqs.context.destination.service.type, 'messaging', 'messaging context set') + t.equals(spanSqs.context.message.queue.name, 'our-queue', 'queue name context set') + + t.end() + }) + agent.startTransaction('myTransaction') + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('sendMessageBatch', listener.address().port) + sqs.sendMessageBatch(params, function (err, data) { + t.error(err) + agent.endTransaction() + listener.close() + }) + }) + }) + + test.test('API: deleteMessage', function (t) { + const app = createMockServer( + getXmlResponse('deleteMessage') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + const [spanSqs, spanHttp] = getSqsAndOtherSpanFromData(data, t) + + t.equals(spanSqs.name, 'SQS DELETE from our-queue', 'SQS span named correctly') + t.equals(spanSqs.type, 'messaging', 'span type set to messaging') + t.equals(spanSqs.subtype, 'sqs', 'span subtype set to sqs') + t.equals(spanSqs.action, 'delete', 'span action matches API method called') + t.equals(spanSqs.context.destination.service.type, 'messaging', 'messaging context set') + t.equals(spanSqs.context.message.queue.name, 'our-queue', 'queue name context set') + + t.equals(spanHttp.type, 'external', 'other span is for HTTP request') + + t.end() + }) + agent.startTransaction('myTransaction') + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('deleteMessage', listener.address().port) + sqs.deleteMessage(params, function (err, data) { + t.error(err) + agent.endTransaction() + listener.close() + }) + }) + }) + + test.test('API: deleteMessageBatch', function (t) { + const app = createMockServer( + getXmlResponse('deleteMessageBatch') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + const [spanSqs, spanHttp] = getSqsAndOtherSpanFromData(data, t) + + t.equals(spanHttp.type, 'external', 'second span is for HTTP request') + + t.equals(spanSqs.name, 'SQS DELETE_BATCH from our-queue', 'SQS span named correctly') + t.equals(spanSqs.type, 'messaging', 'span type set to messaging') + t.equals(spanSqs.subtype, 'sqs', 'span subtype set to sqs') + t.equals(spanSqs.action, 'delete_batch', 'span action matches API method called') + t.equals(spanSqs.context.destination.service.type, 'messaging', 'messaging context set') + t.equals(spanSqs.context.message.queue.name, 'our-queue', 'queue name context set') + + t.end() + }) + agent.startTransaction('myTransaction') + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('deleteMessageBatch', listener.address().port) + sqs.deleteMessageBatch(params, function (err, data) { + t.error(err) + agent.endTransaction() + listener.close() + }) + }) + }) + + test.test('API: receiveMessage', function (t) { + const app = createMockServer( + getXmlResponse('receiveMessage') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + const [spanSqs, spanHttp] = getSqsAndOtherSpanFromData(data, t) + + t.equals(spanHttp.type, 'external', 'other span is for HTTP request') + + t.equals(spanSqs.name, 'SQS POLL from our-queue', 'SQS span named correctly') + t.equals(spanSqs.type, 'messaging', 'span type set to messaging') + t.equals(spanSqs.subtype, 'sqs', 'span subtype set to sqs') + t.equals(spanSqs.action, 'poll', 'span action matches API method called') + t.equals(spanSqs.context.destination.service.type, 'messaging', 'messaging context set') + t.equals(spanSqs.context.message.queue.name, 'our-queue', 'queue name context set') + + t.end() + }) + agent.startTransaction('myTransaction') + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('receiveMessage', listener.address().port) + sqs.receiveMessage(params, function (err, data) { + t.error(err) + agent.endTransaction() + listener.close() + }) + }) + }) + + test.test('API: receiveMessage no transaction', function (t) { + const app = createMockServer( + getXmlResponse('receiveMessage') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + t.equals(data.spans.length, 0, 'no spans without a transaction') + t.end() + }) + + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('receiveMessage', listener.address().port) + sqs.receiveMessage(params, function (err, data) { + t.error(err) + listener.close() + }) + }) + }) + + test.test('API: sendMessage without a transaction', function (t) { + const app = createMockServer( + getXmlResponse('sendMessage') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + t.equals(data.spans.length, 0, 'no spans without a transaction') + t.end() + }) + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('sendMessage', listener.address().port) + sqs.sendMessage(params, function (err, data) { + t.error(err) + listener.close() + }) + }) + }) + + test.test('API: sendMessage without a transaction', function (t) { + const app = createMockServer( + getXmlResponse('sendMessage') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + t.equals(data.spans.length, 0, 'no spans without a transaction') + t.end() + }) + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('sendMessage', listener.address().port) + sqs.sendMessage(params, function (err, data) { + t.error(err) + listener.close() + }) + }) + }) + + test.test('API: sendMessage promise', function (t) { + const app = createMockServer( + getXmlResponse('sendMessage') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + const [spanSqs, spanHttp] = getSqsAndOtherSpanFromData(data, t) + + t.equals(spanSqs.name, 'SQS SEND to our-queue', 'SQS span named correctly') + t.equals(spanSqs.type, 'messaging', 'span type set to messaging') + t.equals(spanSqs.subtype, 'sqs', 'span subtype set to sqs') + t.equals(spanSqs.action, 'send', 'span action matches API method called') + t.equals(spanSqs.context.destination.service.type, 'messaging', 'messaging context set') + t.equals(spanSqs.context.message.queue.name, 'our-queue', 'queue name context set') + + t.equals(spanHttp.type, 'external', 'other span is for HTTP request') + t.end() + }) + agent.startTransaction('myTransaction') + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('sendMessage', listener.address().port) + const request = sqs.sendMessage(params).promise() + + request.then( + function (data) { + awsPromiseFinally(agent, listener) + }, + function (err) { + t.fail(err) + awsPromiseFinally(agent, listener) + } + ) + }) + }) + + test.test('API: sendMessageBatch promise', function (t) { + const app = createMockServer( + getXmlResponse('sendMessageBatch') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + const [spanSqs, spanHttp] = getSqsAndOtherSpanFromData(data, t) + + t.equals(spanHttp.type, 'external', 'other span is for HTTP request') + + t.equals(spanSqs.name, 'SQS SEND_BATCH to our-queue', 'SQS span named correctly') + t.equals(spanSqs.type, 'messaging', 'span type set to messaging') + t.equals(spanSqs.subtype, 'sqs', 'span subtype set to sqs') + t.equals(spanSqs.action, 'send_batch', 'span action matches API method called') + t.equals(spanSqs.context.destination.service.type, 'messaging', 'messaging context set') + t.equals(spanSqs.context.message.queue.name, 'our-queue', 'queue name context set') + + t.end() + }) + agent.startTransaction('myTransaction') + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('sendMessageBatch', listener.address().port) + const promise = sqs.sendMessageBatch(params).promise() + promise.then( + function (data) { + awsPromiseFinally(agent, listener) + }, + function (err) { + t.fail(err) + awsPromiseFinally(agent, listener) + } + ) + }) + }) + test.test('API: deleteMessage promise', function (t) { + const app = createMockServer( + getXmlResponse('deleteMessage') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + const [spanSqs, spanHttp] = getSqsAndOtherSpanFromData(data, t) + + t.equals(spanSqs.name, 'SQS DELETE from our-queue', 'SQS span named correctly') + t.equals(spanSqs.type, 'messaging', 'span type set to messaging') + t.equals(spanSqs.subtype, 'sqs', 'span subtype set to sqs') + t.equals(spanSqs.action, 'delete', 'span action matches API method called') + t.equals(spanSqs.context.destination.service.type, 'messaging', 'messaging context set') + t.equals(spanSqs.context.message.queue.name, 'our-queue', 'queue name context set') + + t.equals(spanHttp.type, 'external', 'other span is for HTTP request') + + t.end() + }) + agent.startTransaction('myTransaction') + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('deleteMessage', listener.address().port) + const promise = sqs.deleteMessage(params).promise() + promise.then( + function (data) { + awsPromiseFinally(agent, listener) + }, + function (err) { + t.fail(err) + awsPromiseFinally(agent, listener) + } + ) + }) + }) + + test.test('API: deleteMessageBatch promise', function (t) { + const app = createMockServer( + getXmlResponse('deleteMessageBatch') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + const [spanSqs, spanHttp] = getSqsAndOtherSpanFromData(data, t) + + t.equals(spanHttp.type, 'external', 'second span is for HTTP request') + + t.equals(spanSqs.name, 'SQS DELETE_BATCH from our-queue', 'SQS span named correctly') + t.equals(spanSqs.type, 'messaging', 'span type set to messaging') + t.equals(spanSqs.subtype, 'sqs', 'span subtype set to sqs') + t.equals(spanSqs.action, 'delete_batch', 'span action matches API method called') + t.equals(spanSqs.context.destination.service.type, 'messaging', 'messaging context set') + t.equals(spanSqs.context.message.queue.name, 'our-queue', 'queue name context set') + + t.end() + }) + agent.startTransaction('myTransaction') + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('deleteMessageBatch', listener.address().port) + const promise = sqs.deleteMessageBatch(params).promise() + promise.then( + function (data) { + awsPromiseFinally(agent, listener) + }, + function (err) { + t.fail(err) + awsPromiseFinally(agent, listener) + } + ) + }) + }) + + test.test('API: receiveMessage promise', function (t) { + const app = createMockServer( + getXmlResponse('receiveMessage') + ) + const listener = app.listen(0, function () { + resetAgent(function (data) { + const [spanSqs, spanHttp] = getSqsAndOtherSpanFromData(data, t) + + t.equals(spanHttp.type, 'external', 'other span is for HTTP request') + + t.equals(spanSqs.name, 'SQS POLL from our-queue', 'SQS span named correctly') + t.equals(spanSqs.type, 'messaging', 'span type set to messaging') + t.equals(spanSqs.subtype, 'sqs', 'span subtype set to sqs') + t.equals(spanSqs.action, 'poll', 'span action matches API method called') + t.equals(spanSqs.context.destination.service.type, 'messaging', 'messaging context set') + t.equals(spanSqs.context.message.queue.name, 'our-queue', 'queue name context set') + + t.end() + }) + agent.startTransaction('myTransaction') + const sqs = new AWS.SQS({ apiVersion: '2012-11-05' }) + const params = getParams('receiveMessage', listener.address().port) + const promise = sqs.receiveMessage(params).promise() + promise.then( + function (data) { + awsPromiseFinally(agent, listener) + }, + function (err) { + t.fail(err) + awsPromiseFinally(agent, listener) + } + ) + }) + }) + test.end() +}) + +function awsPromiseFinally (agent, listener) { + agent.endTransaction() + listener.close() +} + +function createMockServer (xmlResponse) { + const app = express() + app.use(bodyParser.urlencoded({ extended: false })) + app.post('/', (req, res) => { + res.setHeader('Content-Type', 'text/xml') + res.send(xmlResponse) + }) + return app +} + +function getXmlResponse (method) { + return fixtures[method].response +} + +function getParams (method, port) { + const params = fixtures[method].request + params.QueueUrl = `http://localhost:${port}/1/our-queue` + return params +} + +function initializeAwsSdk () { + // SDk requires a region to be set + AWS.config.update({ region: 'us-west' }) + + // without fake credentials the aws-sdk will attempt to fetch + // credentials as though it was on an EC2 instance + process.env.AWS_ACCESS_KEY_ID = 'fake-1' + process.env.AWS_SECRET_ACCESS_KEY = 'fake-2' +} + +// extracts the SQS and "other" (so far just HTTP) span +// from resetAgent's data. The order the agent will +// write spans is not deterministic and is subject to +// network latency in the mock server. +function getSqsAndOtherSpanFromData (data, t) { + t.equals(data.spans.length, 2, 'generated two spans') + const values = [] + if (data.spans[0].name.indexOf('SQS') === 0) { + values.push(data.spans[0]) + values.push(data.spans[1]) + } else { + values.push(data.spans[1]) + values.push(data.spans[0]) + } + + return values +} + +function resetAgent (cb) { + agent._instrumentation.currentTransaction = null + agent._transport = mockClient(cb) +} diff --git a/test/instrumentation/span.js b/test/instrumentation/span.js index d1712fda0b..58a2e72ed9 100644 --- a/test/instrumentation/span.js +++ b/test/instrumentation/span.js @@ -240,7 +240,7 @@ test('#_encode() - with meta data', function myTest2 (t) { t.strictEqual(payload.type, 'bar') t.strictEqual(payload.timestamp, span._timer.start) t.ok(payload.duration > 0) - t.deepEqual(payload.context, { db: { statement: 'foo', type: 'bar' }, http: undefined, tags: { baz: '1' }, destination: undefined }) + t.deepEqual(payload.context, { db: { statement: 'foo', type: 'bar' }, http: undefined, tags: { baz: '1' }, destination: undefined, message: undefined }) assert.stacktrace(t, 'myTest2', __filename, payload.stacktrace, agent) t.end() }) diff --git a/test/test.js b/test/test.js index c5dd5ba778..ff7951191d 100644 --- a/test/test.js +++ b/test/test.js @@ -89,6 +89,7 @@ var directories = [ 'test/instrumentation/modules/mysql2', 'test/instrumentation/modules/pg', 'test/instrumentation/modules/restify', + 'test/instrumentation/modules/aws-sdk', 'test/integration', 'test/integration/api-schema', 'test/lambda',