Skip to content

Commit

Permalink
[DSM] Set checkpoints for DSM with SQS & Kinesis for consumers even w…
Browse files Browse the repository at this point in the history
…hen the producer did not have DSM enabled
  • Loading branch information
ericfirth committed Nov 8, 2024
1 parent 367bd2d commit 996f669
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 13 deletions.
8 changes: 4 additions & 4 deletions packages/datadog-plugin-aws-sdk/src/services/kinesis.js
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,11 @@ class Kinesis extends BaseAwsSdkPlugin {
response.Records.forEach(record => {
const parsedAttributes = JSON.parse(Buffer.from(record.Data).toString())

if (
parsedAttributes?._datadog && streamName
) {
if (streamName) {
const payloadSize = getSizeOrZero(record.Data)
this.tracer.decodeDataStreamsContext(parsedAttributes._datadog)
if (parsedAttributes?._datadog) {
this.tracer.decodeDataStreamsContext(parsedAttributes._datadog)
}
this.tracer
.setCheckpoint(['direction:in', `topic:${streamName}`, 'type:kinesis'], span, payloadSize)
}
Expand Down
16 changes: 8 additions & 8 deletions packages/datadog-plugin-aws-sdk/src/services/sqs.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class Sqs extends BaseAwsSdkPlugin {
// extract DSM context after as we might not have a parent-child but may have a DSM context

this.responseExtractDSMContext(
request.operation, request.params, response, span || null, { parsedMessageAttributes }
request.operation, request.params, response, span || null, { parsedAttributes: parsedMessageAttributes }
)
})

Expand Down Expand Up @@ -195,16 +195,16 @@ class Sqs extends BaseAwsSdkPlugin {
parsedAttributes = this.parseDatadogAttributes(message.MessageAttributes._datadog)
}
}
const payloadSize = getHeadersSize({
Body: message.Body,
MessageAttributes: message.MessageAttributes
})
const queue = params.QueueUrl.split('/').pop()
if (parsedAttributes) {
const payloadSize = getHeadersSize({
Body: message.Body,
MessageAttributes: message.MessageAttributes
})
const queue = params.QueueUrl.split('/').pop()
this.tracer.decodeDataStreamsContext(parsedAttributes)
this.tracer
.setCheckpoint(['direction:in', `topic:${queue}`, 'type:sqs'], span, payloadSize)
}
this.tracer
.setCheckpoint(['direction:in', `topic:${queue}`, 'type:sqs'], span, payloadSize)
})
}

Expand Down
26 changes: 26 additions & 0 deletions packages/datadog-plugin-aws-sdk/test/kinesis.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,32 @@ describe('Kinesis', function () {
})
})

it('emits DSM stats to the agent during Kinesis getRecord when the putRecord was done without DSM enabled', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = 0
// we should have only have 1 stats point since we only had 1 put operation
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived += statsBuckets.Stats.length
})
}
}, { timeoutMs: 10000 })
expect(statsPointsReceived).to.equal(1)
expect(agent.dsmStatsExistWithParentHash(agent, '0')).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

agent.reload('aws-sdk', { kinesis: { dsmEnabled: false } }, { dsmEnabled: false })
helpers.putTestRecord(kinesis, streamNameDSM, helpers.dataBuffer, (err, data) => {
if (err) return done(err)

agent.reload('aws-sdk', { kinesis: { dsmEnabled: true } }, { dsmEnabled: true })
helpers.getTestData(kinesis, streamNameDSM, data, (err) => {
if (err) return done(err)
})
})
})

it('emits DSM stats to the agent during Kinesis putRecords', done => {
// we need to stub Date.now() to ensure a new stats bucket is created for each call
// otherwise, all stats checkpoints will be combined into a single stats points
Expand Down
22 changes: 22 additions & 0 deletions packages/datadog-plugin-aws-sdk/test/sqs.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,28 @@ describe('Plugin', () => {
})
})

it('Should emit DSM stats when receiving a message when the producer was not instrumented', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = 0
// we should have 2 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived += statsBuckets.Stats.length
})
}
})
expect(statsPointsReceived).to.equal(1)
expect(agent.dsmStatsExistWithParentHash(agent, '0')).to.equal(true)
}).then(done, done)

agent.reload('aws-sdk', { sqs: { dsmEnabled: false } }, { dsmEnabled: false })
sqs.sendMessage({ MessageBody: 'test DSM', QueueUrl: QueueUrlDsm }, () => {
agent.reload('aws-sdk', { sqs: { dsmEnabled: true } }, { dsmEnabled: true })
sqs.receiveMessage({ QueueUrl: QueueUrlDsm, MessageAttributeNames: ['.*'] }, () => {})
})
});

Check failure on line 569 in packages/datadog-plugin-aws-sdk/test/sqs.spec.js

View workflow job for this annotation

GitHub Actions / lint

Extra semicolon

it('Should emit DSM stats to the agent when sending batch messages', done => {
// we need to stub Date.now() to ensure a new stats bucket is created for each call
// otherwise, all stats checkpoints will be combined into a single stats points
Expand Down
21 changes: 20 additions & 1 deletion packages/dd-trace/test/plugins/agent.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,24 @@ function dsmStatsExist (agent, expectedHash, expectedEdgeTags) {
return hashFound
}

function dsmStatsExistWithParentHash (agent, expectedParentHash) {
const dsmStats = agent.getDsmStats()
let hashFound = false
if (dsmStats.length !== 0) {
for (const statsTimeBucket of dsmStats) {
for (const statsBucket of statsTimeBucket.Stats) {
for (const stats of statsBucket.Stats) {
if (stats.ParentHash.toString() === expectedParentHash) {
hashFound = true
return hashFound
}
}
}
}
}
return hashFound
}

function addEnvironmentVariablesToHeaders (headers) {
// get all environment variables that start with "DD_"
const ddEnvVars = new Map(
Expand Down Expand Up @@ -424,5 +442,6 @@ module.exports = {
tracer,
testedPlugins,
getDsmStats,
dsmStatsExist
dsmStatsExist,
dsmStatsExistWithParentHash
}

0 comments on commit 996f669

Please sign in to comment.