Skip to content

Commit

Permalink
Fix amqp instrumentation (#4839)
Browse files Browse the repository at this point in the history
* Fix amqp instrumentation
  • Loading branch information
piochelepiotr authored Nov 1, 2024
1 parent 28eb958 commit c03d608
Show file tree
Hide file tree
Showing 3 changed files with 137 additions and 15 deletions.
70 changes: 65 additions & 5 deletions packages/datadog-instrumentations/src/amqplib.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,70 @@ addHook({ name: 'amqplib', file: 'lib/defs.js', versions: [MIN_VERSION] }, defs
return defs
})

addHook({ name: 'amqplib', file: 'lib/channel_model.js', versions: [MIN_VERSION] }, x => {
shimmer.wrap(x.Channel.prototype, 'get', getMessage => function (queue, options) {
return getMessage.apply(this, arguments).then(message => {
if (message === null) {
return message
}
startCh.publish({ method: 'basic.get', message, fields: message.fields, queue })
// finish right away
finishCh.publish()
return message
})
})
shimmer.wrap(x.Channel.prototype, 'consume', consume => function (queue, callback, options) {
if (!startCh.hasSubscribers) {
return consume.apply(this, arguments)
}
arguments[1] = (message, ...args) => {
if (message === null) {
return callback(message, ...args)
}
startCh.publish({ method: 'basic.deliver', message, fields: message.fields, queue })
const result = callback(message, ...args)
finishCh.publish()
return result
}
return consume.apply(this, arguments)
})
return x
})

addHook({ name: 'amqplib', file: 'lib/callback_model.js', versions: [MIN_VERSION] }, channel => {
shimmer.wrap(channel.Channel.prototype, 'get', getMessage => function (queue, options, callback) {
if (!startCh.hasSubscribers) {
return getMessage.apply(this, arguments)
}
arguments[2] = (error, message, ...args) => {
if (error !== null || message === null) {
return callback(error, message, ...args)
}
startCh.publish({ method: 'basic.get', message, fields: message.fields, queue })
const result = callback(error, message, ...args)
finishCh.publish()
return result
}
return getMessage.apply(this, arguments)
})
shimmer.wrap(channel.Channel.prototype, 'consume', consume => function (queue, callback) {
if (!startCh.hasSubscribers) {
return consume.apply(this, arguments)
}
arguments[1] = (message, ...args) => {
if (message === null) {
return callback(message, ...args)
}
startCh.publish({ method: 'basic.deliver', message, fields: message.fields, queue })
const result = callback(message, ...args)
finishCh.publish()
return result
}
return consume.apply(this, arguments)
})
return channel
})

addHook({ name: 'amqplib', file: 'lib/channel.js', versions: [MIN_VERSION] }, channel => {
shimmer.wrap(channel.Channel.prototype, 'sendImmediately', sendImmediately => function (method, fields) {
return instrument(sendImmediately, this, arguments, methods[method], fields)
Expand All @@ -33,15 +97,11 @@ addHook({ name: 'amqplib', file: 'lib/channel.js', versions: [MIN_VERSION] }, ch
shimmer.wrap(channel.Channel.prototype, 'sendMessage', sendMessage => function (fields) {
return instrument(sendMessage, this, arguments, 'basic.publish', fields, arguments[2])
})

shimmer.wrap(channel.BaseChannel.prototype, 'dispatchMessage', dispatchMessage => function (fields, message) {
return instrument(dispatchMessage, this, arguments, 'basic.deliver', fields, message)
})
return channel
})

function instrument (send, channel, args, method, fields, message) {
if (!startCh.hasSubscribers) {
if (!startCh.hasSubscribers || method === 'basic.get') {
return send.apply(channel, args)
}

Expand Down
8 changes: 4 additions & 4 deletions packages/datadog-plugin-amqplib/src/consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,18 @@ class AmqplibConsumerPlugin extends ConsumerPlugin {
static get id () { return 'amqplib' }
static get operation () { return 'command' }

start ({ method, fields, message }) {
start ({ method, fields, message, queue }) {
if (method !== 'basic.deliver' && method !== 'basic.get') return

const childOf = extract(this.tracer, message)

const queueName = queue || fields.queue || fields.routingKey
const span = this.startSpan({
childOf,
resource: getResourceName(method, fields),
type: 'worker',
meta: {
'amqp.queue': fields.queue,
'amqp.queue': queueName,
'amqp.exchange': fields.exchange,
'amqp.routingKey': fields.routingKey,
'amqp.consumerTag': fields.consumerTag,
Expand All @@ -32,10 +33,9 @@ class AmqplibConsumerPlugin extends ConsumerPlugin {
this.config.dsmEnabled && message?.properties?.headers
) {
const payloadSize = getAmqpMessageSize({ headers: message.properties.headers, content: message.content })
const queue = fields.queue ? fields.queue : fields.routingKey
this.tracer.decodeDataStreamsContext(message.properties.headers)
this.tracer
.setCheckpoint(['direction:in', `topic:${queue}`, 'type:rabbitmq'], span, payloadSize)
.setCheckpoint(['direction:in', `topic:${queueName}`, 'type:rabbitmq'], span, payloadSize)
}
}
}
Expand Down
74 changes: 68 additions & 6 deletions packages/datadog-plugin-amqplib/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -324,16 +324,22 @@ describe('Plugin', () => {

it('Should emit DSM stats to the agent when sending a message', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = 0
let statsPointsReceived = []
// we should have 1 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived += statsBuckets.Stats.length
statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats)
})
}
})
expect(statsPointsReceived).to.be.at.least(1)
expect(statsPointsReceived.length).to.be.at.least(1)
expect(statsPointsReceived[0].EdgeTags).to.deep.equal([
'direction:out',
'exchange:',
'has_routing_key:true',
'type:rabbitmq'
])
expect(agent.dsmStatsExist(agent, expectedProducerHash)).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

Expand All @@ -346,16 +352,18 @@ describe('Plugin', () => {

it('Should emit DSM stats to the agent when receiving a message', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = 0
let statsPointsReceived = []
// we should have 2 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived += statsBuckets.Stats.length
statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats)
})
}
})
expect(statsPointsReceived).to.be.at.least(1)
expect(statsPointsReceived.length).to.be.at.least(1)
expect(statsPointsReceived[0].EdgeTags).to.deep.equal(
['direction:in', 'topic:testDSM', 'type:rabbitmq'])
expect(agent.dsmStatsExist(agent, expectedConsumerHash)).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

Expand All @@ -368,6 +376,60 @@ describe('Plugin', () => {
})
})

it('Should emit DSM stats to the agent when sending another message', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = []
// we should have 1 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats)
})
}
})
expect(statsPointsReceived.length).to.be.at.least(1)
expect(statsPointsReceived[0].EdgeTags).to.deep.equal([
'direction:out',
'exchange:',
'has_routing_key:true',
'type:rabbitmq'
])
expect(agent.dsmStatsExist(agent, expectedProducerHash)).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertQueue('testDSM', {}, (err, ok) => {
if (err) return done(err)

channel.sendToQueue(ok.queue, Buffer.from('DSM pathway test'))
})
})

it('Should emit DSM stats to the agent when receiving a message with get', done => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = []
// we should have 2 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived = statsPointsReceived.concat(statsBuckets.Stats)
})
}
})
expect(statsPointsReceived.length).to.be.at.least(1)
expect(statsPointsReceived[0].EdgeTags).to.deep.equal(
['direction:in', 'topic:testDSM', 'type:rabbitmq'])
expect(agent.dsmStatsExist(agent, expectedConsumerHash)).to.equal(true)
}, { timeoutMs: 10000 }).then(done, done)

channel.assertQueue('testDSM', {}, (err, ok) => {
if (err) return done(err)

channel.get(ok.queue, {}, (err, ok) => {
if (err) done(err)
})
})
})

it('Should set pathway hash tag on a span when producing', (done) => {
channel.assertQueue('testDSM', {}, (err, ok) => {
if (err) return done(err)
Expand Down

0 comments on commit c03d608

Please sign in to comment.