Skip to content

Commit

Permalink
[serverless] Add S3 Span Pointers (#4875)
Browse files Browse the repository at this point in the history
* Add span pointer info on S3 `putObject`, `copyObject`, and `completeMultipartUpload` requests.

* Unit tests

* small improvement

* Create `addSpanPointer()` so we don't have to export a context with 0s for trace+span id; add debug logs

* Add integration test for completeMultipartUpload; update unit test

* Rename to `addSpanPointers()`

* Update comments and make getting eTag more reliable

* Validate parameters before calling `generateS3PointerHash`

* add unit tests

* Rename var to `SPAN_LINK_POINTER_KIND`; standardize the hashing function.

* Set the span link kind in the `addSpanPointer()` functions so that downstream callers don't have to worry about passing it.

* Move constants to constants.js; move `generatePointerHash` to util.js
  • Loading branch information
nhulston authored Nov 18, 2024
1 parent 2072a1f commit 6392a2e
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 9 deletions.
5 changes: 5 additions & 0 deletions packages/datadog-plugin-aws-sdk/src/base.js
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class BaseAwsSdkPlugin extends ClientPlugin {
this.responseExtractDSMContext(operation, params, response.data ?? response, span)
}
this.addResponseTags(span, response)
this.addSpanPointers(span, response)
this.finish(span, response, response.error)
})
}
Expand All @@ -101,6 +102,10 @@ class BaseAwsSdkPlugin extends ClientPlugin {
// implemented by subclasses, or not
}

addSpanPointers (span, response) {
// Optionally implemented by subclasses, for services where we're unable to inject trace context
}

operationFromRequest (request) {
// can be overriden by subclasses
return this.operationName({
Expand Down
34 changes: 34 additions & 0 deletions packages/datadog-plugin-aws-sdk/src/services/s3.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
'use strict'

const BaseAwsSdkPlugin = require('../base')
const log = require('../../../dd-trace/src/log')
const { generatePointerHash } = require('../../../dd-trace/src/util')
const { S3_PTR_KIND, SPAN_POINTER_DIRECTION } = require('../../../dd-trace/src/constants')

class S3 extends BaseAwsSdkPlugin {
static get id () { return 's3' }
Expand All @@ -18,6 +21,37 @@ class S3 extends BaseAwsSdkPlugin {
bucketname: params.Bucket
})
}

addSpanPointers (span, response) {
const request = response?.request
const operationName = request?.operation
if (!['putObject', 'copyObject', 'completeMultipartUpload'].includes(operationName)) {
// We don't create span links for other S3 operations.
return
}

// AWS v2: https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/S3.html
// AWS v3: https://docs.aws.amazon.com/AWSJavaScriptSDK/v3/latest/client/s3/
const bucketName = request?.params?.Bucket
const objectKey = request?.params?.Key
let eTag =
response?.ETag || // v3 PutObject & CompleteMultipartUpload
response?.CopyObjectResult?.ETag || // v3 CopyObject
response?.data?.ETag || // v2 PutObject & CompleteMultipartUpload
response?.data?.CopyObjectResult?.ETag // v2 CopyObject

if (!bucketName || !objectKey || !eTag) {
log.debug('Unable to calculate span pointer hash because of missing parameters.')
return
}

// https://github.com/DataDog/dd-span-pointer-rules/blob/main/AWS/S3/Object/README.md
if (eTag.startsWith('"') && eTag.endsWith('"')) {
eTag = eTag.slice(1, -1)
}
const pointerHash = generatePointerHash([bucketName, objectKey, eTag])
span.addSpanPointer(S3_PTR_KIND, SPAN_POINTER_DIRECTION.DOWNSTREAM, pointerHash)
}
}

module.exports = S3
146 changes: 139 additions & 7 deletions packages/datadog-plugin-aws-sdk/test/s3.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ const agent = require('../../dd-trace/test/plugins/agent')
const { setup } = require('./spec_helpers')
const axios = require('axios')
const { rawExpectedSchema } = require('./s3-naming')
const { S3_PTR_KIND, SPAN_POINTER_DIRECTION } = require('../../dd-trace/src/constants')

const bucketName = 's3-bucket-name-test'

Expand Down Expand Up @@ -36,20 +37,19 @@ describe('Plugin', () => {

before(done => {
AWS = require(`../../../versions/${s3ClientName}@${version}`).get()
s3 = new AWS.S3({ endpoint: 'http://127.0.0.1:4566', s3ForcePathStyle: true, region: 'us-east-1' })

// Fix for LocationConstraint issue - only for SDK v2
if (s3ClientName === 'aws-sdk') {
s3.api.globalEndpoint = '127.0.0.1'
}

s3 = new AWS.S3({ endpoint: 'http://127.0.0.1:4567', s3ForcePathStyle: true, region: 'us-east-1' })
s3.createBucket({ Bucket: bucketName }, (err) => {
if (err) return done(err)
done()
})
})

after(done => {
s3.deleteBucket({ Bucket: bucketName }, () => {
done()
})
})

after(async () => {
await resetLocalStackS3()
return agent.close({ ritmReset: false })
Expand All @@ -74,6 +74,138 @@ describe('Plugin', () => {
rawExpectedSchema.outbound
)

describe('span pointers', () => {
it('should add span pointer for putObject operation', (done) => {
agent.use(traces => {
try {
const span = traces[0][0]
const links = JSON.parse(span.meta?.['_dd.span_links'] || '[]')

expect(links).to.have.lengthOf(1)
expect(links[0].attributes).to.deep.equal({
'ptr.kind': S3_PTR_KIND,
'ptr.dir': SPAN_POINTER_DIRECTION.DOWNSTREAM,
'ptr.hash': '6d1a2fe194c6579187408f827f942be3',
'link.kind': 'span-pointer'
})
done()
} catch (error) {
done(error)
}
}).catch(done)

s3.putObject({
Bucket: bucketName,
Key: 'test-key',
Body: 'test body'
}, (err) => {
if (err) {
done(err)
}
})
})

it('should add span pointer for copyObject operation', (done) => {
agent.use(traces => {
try {
const span = traces[0][0]
const links = JSON.parse(span.meta?.['_dd.span_links'] || '[]')

expect(links).to.have.lengthOf(1)
expect(links[0].attributes).to.deep.equal({
'ptr.kind': S3_PTR_KIND,
'ptr.dir': SPAN_POINTER_DIRECTION.DOWNSTREAM,
'ptr.hash': '1542053ce6d393c424b1374bac1fc0c5',
'link.kind': 'span-pointer'
})
done()
} catch (error) {
done(error)
}
}).catch(done)

s3.copyObject({
Bucket: bucketName,
Key: 'new-key',
CopySource: `${bucketName}/test-key`
}, (err) => {
if (err) {
done(err)
}
})
})

it('should add span pointer for completeMultipartUpload operation', (done) => {
// Create 5MiB+ buffers for parts
const partSize = 5 * 1024 * 1024
const part1Data = Buffer.alloc(partSize, 'a')
const part2Data = Buffer.alloc(partSize, 'b')

// Start the multipart upload process
s3.createMultipartUpload({
Bucket: bucketName,
Key: 'multipart-test'
}, (err, multipartData) => {
if (err) return done(err)

// Upload both parts in parallel
Promise.all([
new Promise((resolve, reject) => {
s3.uploadPart({
Bucket: bucketName,
Key: 'multipart-test',
PartNumber: 1,
UploadId: multipartData.UploadId,
Body: part1Data
}, (err, data) => err ? reject(err) : resolve({ PartNumber: 1, ETag: data.ETag }))
}),
new Promise((resolve, reject) => {
s3.uploadPart({
Bucket: bucketName,
Key: 'multipart-test',
PartNumber: 2,
UploadId: multipartData.UploadId,
Body: part2Data
}, (err, data) => err ? reject(err) : resolve({ PartNumber: 2, ETag: data.ETag }))
})
]).then(parts => {
// Now complete the multipart upload
const completeParams = {
Bucket: bucketName,
Key: 'multipart-test',
UploadId: multipartData.UploadId,
MultipartUpload: {
Parts: parts
}
}

s3.completeMultipartUpload(completeParams, (err) => {
if (err) done(err)
agent.use(traces => {
const span = traces[0][0]
const operation = span.meta?.['aws.operation']
if (operation === 'completeMultipartUpload') {
try {
const links = JSON.parse(span.meta?.['_dd.span_links'] || '[]')
expect(links).to.have.lengthOf(1)
expect(links[0].attributes).to.deep.equal({
'ptr.kind': S3_PTR_KIND,
'ptr.dir': SPAN_POINTER_DIRECTION.DOWNSTREAM,
'ptr.hash': '422412aa6b472a7194f3e24f4b12b4a6',
'link.kind': 'span-pointer'
})
done()
} catch (error) {
done(error)
}
}
})
})
}).catch(done)
})
})
})

it('should allow disabling a specific span kind of a service', (done) => {
let total = 0

Expand Down
7 changes: 6 additions & 1 deletion packages/dd-trace/src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,10 @@ module.exports = {
SCHEMA_OPERATION: 'schema.operation',
SCHEMA_NAME: 'schema.name',
GRPC_CLIENT_ERROR_STATUSES: [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
GRPC_SERVER_ERROR_STATUSES: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16]
GRPC_SERVER_ERROR_STATUSES: [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
S3_PTR_KIND: 'aws.s3.object',
SPAN_POINTER_DIRECTION: Object.freeze({
UPSTREAM: 'u',
DOWNSTREAM: 'd'
})
}
1 change: 1 addition & 0 deletions packages/dd-trace/src/noop/span.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class NoopSpan {
setTag (key, value) { return this }
addTags (keyValueMap) { return this }
addLink (link) { return this }
addSpanPointer (ptrKind, ptrDir, ptrHash) { return this }
log () { return this }
logEvent () {}
finish (finishTime) {}
Expand Down
15 changes: 15 additions & 0 deletions packages/dd-trace/src/opentelemetry/span.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ const { SERVICE_NAME, RESOURCE_NAME } = require('../../../../ext/tags')
const kinds = require('../../../../ext/kinds')

const SpanContext = require('./span_context')
const id = require('../id')

// The one built into OTel rounds so we lose sub-millisecond precision.
function hrTimeToMilliseconds (time) {
Expand Down Expand Up @@ -217,6 +218,20 @@ class Span {
return this
}

addSpanPointer (ptrKind, ptrDir, ptrHash) {
const zeroContext = new SpanContext({
traceId: id('0'),
spanId: id('0')
})
const attributes = {
'ptr.kind': ptrKind,
'ptr.dir': ptrDir,
'ptr.hash': ptrHash,
'link.kind': 'span-pointer'
}
return this.addLink(zeroContext, attributes)
}

setStatus ({ code, message }) {
if (!this.ended && !this._hasStatus && code) {
this._hasStatus = true
Expand Down
14 changes: 14 additions & 0 deletions packages/dd-trace/src/opentracing/span.js
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,20 @@ class DatadogSpan {
})
}

addSpanPointer (ptrKind, ptrDir, ptrHash) {
const zeroContext = new SpanContext({
traceId: id('0'),
spanId: id('0')
})
const attributes = {
'ptr.kind': ptrKind,
'ptr.dir': ptrDir,
'ptr.hash': ptrHash,
'link.kind': 'span-pointer'
}
this.addLink(zeroContext, attributes)
}

addEvent (name, attributesOrStartTime, startTime) {
const event = { name }
if (attributesOrStartTime) {
Expand Down
17 changes: 16 additions & 1 deletion packages/dd-trace/src/util.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
'use strict'

const crypto = require('crypto')
const path = require('path')

function isTrue (str) {
Expand Down Expand Up @@ -73,11 +74,25 @@ function hasOwn (object, prop) {
return Object.prototype.hasOwnProperty.call(object, prop)
}

/**
* Generates a unique hash from an array of strings by joining them with | before hashing.
* Used to uniquely identify AWS requests for span pointers.
* @param {string[]} components - Array of strings to hash
* @returns {string} A 32-character hash uniquely identifying the components
*/
function generatePointerHash (components) {
// If passing S3's ETag as a component, make sure any quotes have already been removed!
const dataToHash = components.join('|')
const hash = crypto.createHash('sha256').update(dataToHash).digest('hex')
return hash.substring(0, 32)
}

module.exports = {
isTrue,
isFalse,
isError,
globMatch,
calculateDDBasePath,
hasOwn
hasOwn,
generatePointerHash
}
27 changes: 27 additions & 0 deletions packages/dd-trace/test/opentelemetry/span.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,33 @@ describe('OTel Span', () => {
expect(_links).to.have.lengthOf(2)
})

it('should add span pointers', () => {
const span = makeSpan('name')
const { _links } = span._ddSpan

span.addSpanPointer('pointer_kind', 'd', 'abc123')
expect(_links).to.have.lengthOf(1)
expect(_links[0].attributes).to.deep.equal({
'ptr.kind': 'pointer_kind',
'ptr.dir': 'd',
'ptr.hash': 'abc123',
'link.kind': 'span-pointer'
})
expect(_links[0].context.toTraceId()).to.equal('0')
expect(_links[0].context.toSpanId()).to.equal('0')

span.addSpanPointer('another_kind', 'd', '1234567')
expect(_links).to.have.lengthOf(2)
expect(_links[1].attributes).to.deep.equal({
'ptr.kind': 'another_kind',
'ptr.dir': 'd',
'ptr.hash': '1234567',
'link.kind': 'span-pointer'
})
expect(_links[1].context.toTraceId()).to.equal('0')
expect(_links[1].context.toSpanId()).to.equal('0')
})

it('should set status', () => {
const unset = makeSpan('name')
const unsetCtx = unset._ddSpan.context()
Expand Down
Loading

0 comments on commit 6392a2e

Please sign in to comment.