Skip to content

Commit

Permalink
add dsm for google pub sub (#3855)
Browse files Browse the repository at this point in the history
* add dsm for google pub sub
  • Loading branch information
wconti27 authored Oct 30, 2024
1 parent 70ec90e commit 9e65a80
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 3 deletions.
9 changes: 8 additions & 1 deletion packages/datadog-plugin-google-cloud-pubsub/src/consumer.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'

const { getMessageSize } = require('../../dd-trace/src/datastreams/processor')
const ConsumerPlugin = require('../../dd-trace/src/plugins/consumer')

class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
Expand All @@ -11,7 +12,7 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
const topic = subscription.metadata && subscription.metadata.topic
const childOf = this.tracer.extract('text_map', message.attributes) || null

this.startSpan({
const span = this.startSpan({
childOf,
resource: topic,
type: 'worker',
Expand All @@ -23,6 +24,12 @@ class GoogleCloudPubsubConsumerPlugin extends ConsumerPlugin {
'pubsub.ack': 0
}
})
if (this.config.dsmEnabled && message?.attributes) {
const payloadSize = getMessageSize(message)
this.tracer.decodeDataStreamsContext(message.attributes)
this.tracer
.setCheckpoint(['direction:in', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize)
}
}

finish (message) {
Expand Down
8 changes: 8 additions & 0 deletions packages/datadog-plugin-google-cloud-pubsub/src/producer.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
'use strict'

const ProducerPlugin = require('../../dd-trace/src/plugins/producer')
const { DsmPathwayCodec } = require('../../dd-trace/src/datastreams/pathway')
const { getHeadersSize } = require('../../dd-trace/src/datastreams/processor')

class GoogleCloudPubsubProducerPlugin extends ProducerPlugin {
static get id () { return 'google-cloud-pubsub' }
Expand All @@ -25,6 +27,12 @@ class GoogleCloudPubsubProducerPlugin extends ProducerPlugin {
msg.attributes = {}
}
this.tracer.inject(span, 'text_map', msg.attributes)
if (this.config.dsmEnabled) {
const payloadSize = getHeadersSize(msg)
const dataStreamsContext = this.tracer
.setCheckpoint(['direction:out', `topic:${topic}`, 'type:google-pubsub'], span, payloadSize)
DsmPathwayCodec.encode(dataStreamsContext, msg.attributes)
}
}
}
}
Expand Down
118 changes: 116 additions & 2 deletions packages/datadog-plugin-google-cloud-pubsub/test/index.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ const id = require('../../dd-trace/src/id')
const { ERROR_MESSAGE, ERROR_TYPE, ERROR_STACK } = require('../../dd-trace/src/constants')

const { expectedSchema, rawExpectedSchema } = require('./naming')
const { computePathwayHash } = require('../../dd-trace/src/datastreams/pathway')
const { ENTRY_PARENT_HASH, DataStreamsProcessor } = require('../../dd-trace/src/datastreams/processor')

// The roundtrip to the pubsub emulator takes time. Sometimes a *long* time.
const TIMEOUT = 30000
const dsmTopicName = 'dsm-topic'

describe('Plugin', () => {
let tracer
Expand All @@ -18,6 +21,7 @@ describe('Plugin', () => {

before(() => {
process.env.PUBSUB_EMULATOR_HOST = 'localhost:8081'
process.env.DD_DATA_STREAMS_ENABLED = true
})

after(() => {
Expand All @@ -34,10 +38,12 @@ describe('Plugin', () => {
let resource
let v1
let gax
let expectedProducerHash
let expectedConsumerHash

describe('without configuration', () => {
beforeEach(() => {
return agent.load('google-cloud-pubsub')
return agent.load('google-cloud-pubsub', { dsmEnabled: false })
})

beforeEach(() => {
Expand Down Expand Up @@ -296,7 +302,8 @@ describe('Plugin', () => {
describe('with configuration', () => {
beforeEach(() => {
return agent.load('google-cloud-pubsub', {
service: 'a_test_service'
service: 'a_test_service',
dsmEnabled: false
})
})

Expand All @@ -322,6 +329,113 @@ describe('Plugin', () => {
})
})

describe('data stream monitoring', () => {
let dsmTopic
let sub
let consume

beforeEach(() => {
return agent.load('google-cloud-pubsub', {
dsmEnabled: true
})
})

before(async () => {
const { PubSub } = require(`../../../versions/@google-cloud/pubsub@${version}`).get()
project = getProjectId()
resource = `projects/${project}/topics/${dsmTopicName}`
pubsub = new PubSub({ projectId: project })
tracer.use('google-cloud-pubsub', { dsmEnabled: true })

dsmTopic = await pubsub.createTopic(dsmTopicName)
dsmTopic = dsmTopic[0]
sub = await dsmTopic.createSubscription('DSM')
sub = sub[0]
consume = function (cb) {
sub.on('message', cb)
}

const dsmFullTopic = `projects/${project}/topics/${dsmTopicName}`

expectedProducerHash = computePathwayHash(
'test',
'tester',
['direction:out', 'topic:' + dsmFullTopic, 'type:google-pubsub'],
ENTRY_PARENT_HASH
)
expectedConsumerHash = computePathwayHash(
'test',
'tester',
['direction:in', 'topic:' + dsmFullTopic, 'type:google-pubsub'],
expectedProducerHash
)
})

describe('should set a DSM checkpoint', () => {
it('on produce', async () => {
await publish(dsmTopic, { data: Buffer.from('DSM produce checkpoint') })

agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = 0
// we should have 1 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived += statsBuckets.Stats.length
})
}
})
expect(statsPointsReceived).to.be.at.least(1)
expect(agent.dsmStatsExist(agent, expectedProducerHash.readBigUInt64BE(0).toString())).to.equal(true)
}, { timeoutMs: TIMEOUT })
})

it('on consume', async () => {
await publish(dsmTopic, { data: Buffer.from('DSM consume checkpoint') })
await consume(async () => {
agent.expectPipelineStats(dsmStats => {
let statsPointsReceived = 0
// we should have 2 dsm stats points
dsmStats.forEach((timeStatsBucket) => {
if (timeStatsBucket && timeStatsBucket.Stats) {
timeStatsBucket.Stats.forEach((statsBuckets) => {
statsPointsReceived += statsBuckets.Stats.length
})
}
})
expect(statsPointsReceived).to.be.at.least(2)
expect(agent.dsmStatsExist(agent, expectedConsumerHash.readBigUInt64BE(0).toString())).to.equal(true)
}, { timeoutMs: TIMEOUT })
})
})
})

describe('it should set a message payload size', () => {
let recordCheckpointSpy

beforeEach(() => {
recordCheckpointSpy = sinon.spy(DataStreamsProcessor.prototype, 'recordCheckpoint')
})

afterEach(() => {
DataStreamsProcessor.prototype.recordCheckpoint.restore()
})

it('when producing a message', async () => {
await publish(dsmTopic, { data: Buffer.from('DSM produce payload size') })
expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize'))
})

it('when consuming a message', async () => {
await publish(dsmTopic, { data: Buffer.from('DSM consume payload size') })

await consume(async () => {
expect(recordCheckpointSpy.args[0][0].hasOwnProperty('payloadSize'))
})
})
})
})

function expectSpanWithDefaults (expected) {
const prefixedResource = [expected.meta['pubsub.method'], resource].filter(x => x).join(' ')
const service = expected.meta['pubsub.method'] ? 'test-pubsub' : 'test'
Expand Down

0 comments on commit 9e65a80

Please sign in to comment.