Skip to content

Commit

Permalink
feat(dsm): implement avro schemas for avsc package (#4726)
Browse files Browse the repository at this point in the history
* add avro (avsc) schemas support for DSM
  • Loading branch information
wconti27 authored and bengl committed Oct 16, 2024
1 parent ff10093 commit 57c45ee
Show file tree
Hide file tree
Showing 19 changed files with 625 additions and 7 deletions.
9 changes: 9 additions & 0 deletions .github/workflows/plugins.yml
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,15 @@ jobs:
- uses: actions/checkout@v4
- uses: ./.github/actions/plugins/test-and-upstream

avsc:
runs-on: ubuntu-latest
env:
PLUGINS: avsc
DD_DATA_STREAMS_ENABLED: true
steps:
- uses: actions/checkout@v4
- uses: ./.github/actions/plugins/test-and-upstream

aws-sdk:
strategy:
matrix:
Expand Down
2 changes: 2 additions & 0 deletions docs/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ tracer.use('pg', {
<h5 id="amqplib"></h5>
<h5 id="amqplib-tags"></h5>
<h5 id="amqplib-config"></h5>
<h5 id="avsc"></h5>
<h5 id="aws-sdk"></h5>
<h5 id="aws-sdk-tags"></h5>
<h5 id="aws-sdk-config"></h5>
Expand Down Expand Up @@ -102,6 +103,7 @@ tracer.use('pg', {

* [amqp10](./interfaces/export_.plugins.amqp10.html)
* [amqplib](./interfaces/export_.plugins.amqplib.html)
* [avsc](./interfaces/export_.plugins.avsc.html)
* [aws-sdk](./interfaces/export_.plugins.aws_sdk.html)
* [bluebird](./interfaces/export_.plugins.bluebird.html)
* [couchbase](./interfaces/export_.plugins.couchbase.html)
Expand Down
1 change: 1 addition & 0 deletions docs/add-redirects.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ echo "writing redirects..."
declare -a plugins=(
"amqp10"
"amqplib"
"avsc"
"aws_sdk"
"bluebird"
"couchbase"
Expand Down
1 change: 1 addition & 0 deletions docs/test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ const openSearchOptions: plugins.opensearch = {

tracer.use('amqp10');
tracer.use('amqplib');
tracer.use('avsc');
tracer.use('aws-sdk');
tracer.use('aws-sdk', awsSdkOptions);
tracer.use('bunyan');
Expand Down
7 changes: 7 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ interface Plugins {
"amqp10": tracer.plugins.amqp10;
"amqplib": tracer.plugins.amqplib;
"apollo": tracer.plugins.apollo;
"avsc": tracer.plugins.avsc;
"aws-sdk": tracer.plugins.aws_sdk;
"bunyan": tracer.plugins.bunyan;
"cassandra-driver": tracer.plugins.cassandra_driver;
Expand Down Expand Up @@ -1192,6 +1193,12 @@ declare namespace tracer {
signature?: boolean;
}

/**
* This plugin automatically patches the [avsc](https://github.com/mtth/avsc) module
* to collect avro message schemas when Datastreams Monitoring is enabled.
*/
interface avsc extends Integration {}

/**
* This plugin automatically instruments the
* [aws-sdk](https://github.com/aws/aws-sdk-js) module.
Expand Down
37 changes: 37 additions & 0 deletions packages/datadog-instrumentations/src/avsc.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
const shimmer = require('../../datadog-shimmer')
const { addHook } = require('./helpers/instrument')

const dc = require('dc-polyfill')
const serializeChannel = dc.channel('apm:avsc:serialize-start')
const deserializeChannel = dc.channel('apm:avsc:deserialize-end')

function wrapSerialization (Type) {
shimmer.wrap(Type.prototype, 'toBuffer', original => function () {
if (!serializeChannel.hasSubscribers) {
return original.apply(this, arguments)
}
serializeChannel.publish({ messageClass: this })
return original.apply(this, arguments)
})
}

function wrapDeserialization (Type) {
shimmer.wrap(Type.prototype, 'fromBuffer', original => function () {
if (!deserializeChannel.hasSubscribers) {
return original.apply(this, arguments)
}
const result = original.apply(this, arguments)
deserializeChannel.publish({ messageClass: result })
return result
})
}

addHook({
name: 'avsc',
versions: ['>=5.0.0']
}, avro => {
wrapDeserialization(avro.Type)
wrapSerialization(avro.Type)

return avro
})
1 change: 1 addition & 0 deletions packages/datadog-instrumentations/src/helpers/hooks.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ module.exports = {
aerospike: () => require('../aerospike'),
amqp10: () => require('../amqp10'),
amqplib: () => require('../amqplib'),
avsc: () => require('../avsc'),
'aws-sdk': () => require('../aws-sdk'),
bluebird: () => require('../bluebird'),
'body-parser': () => require('../body-parser'),
Expand Down
9 changes: 9 additions & 0 deletions packages/datadog-plugin-avsc/src/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
const SchemaPlugin = require('../../dd-trace/src/plugins/schema')
const SchemaExtractor = require('./schema_iterator')

class AvscPlugin extends SchemaPlugin {
static get id () { return 'avsc' }
static get schemaExtractor () { return SchemaExtractor }
}

module.exports = AvscPlugin
169 changes: 169 additions & 0 deletions packages/datadog-plugin-avsc/src/schema_iterator.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
const AVRO = 'avro'
const {
SCHEMA_DEFINITION,
SCHEMA_ID,
SCHEMA_NAME,
SCHEMA_OPERATION,
SCHEMA_WEIGHT,
SCHEMA_TYPE
} = require('../../dd-trace/src/constants')
const log = require('../../dd-trace/src/log')
const {
SchemaBuilder
} = require('../../dd-trace/src/datastreams/schemas/schema_builder')

class SchemaExtractor {
constructor (schema) {
this.schema = schema
}

static getType (type) {
const typeMapping = {
string: 'string',
int: 'integer',
long: 'integer',
float: 'number',
double: 'number',
boolean: 'boolean',
bytes: 'string',
record: 'object',
enum: 'string',
array: 'array',
map: 'object',
fixed: 'string'
}
const typeName = type.typeName ?? type.name ?? type
return typeName === 'null' ? typeName : typeMapping[typeName] || 'string'
}

static extractProperty (field, schemaName, fieldName, builder, depth) {
let array = false
let type
let format
let enumValues
let description
let ref

const fieldType = field.type?.types ?? field.type?.typeName ?? field.type

if (Array.isArray(fieldType)) {
// Union Type
type = 'union[' + fieldType.map(t => SchemaExtractor.getType(t.type || t)).join(',') + ']'
} else if (fieldType === 'array') {
// Array Type
array = true
const nestedType = field.type.itemsType.typeName
type = SchemaExtractor.getType(nestedType)
} else if (fieldType === 'record') {
// Nested Record Type
type = 'object'
ref = `#/components/schemas/${field.type.name}`
if (!SchemaExtractor.extractSchema(field.type, builder, depth + 1, this)) {
return false
}
} else if (fieldType === 'enum') {
enumValues = []
let i = 0
type = 'string'
while (field.type.symbols[i]) {
enumValues.push(field.type.symbols[i])
i += 1
}
} else {
// Primitive type
type = SchemaExtractor.getType(fieldType.type || fieldType)
if (fieldType === 'bytes') {
format = 'byte'
} else if (fieldType === 'int') {
format = 'int32'
} else if (fieldType === 'long') {
format = 'int64'
} else if (fieldType === 'float') {
format = 'float'
} else if (fieldType === 'double') {
format = 'double'
}
}

return builder.addProperty(schemaName, fieldName, array, type, description, ref, format, enumValues)
}

static extractSchema (schema, builder, depth, extractor) {
depth += 1
const schemaName = schema.name
if (extractor) {
// if we already have a defined extractor, this is a nested schema. create a new extractor for the nested
// schema, ensure it is added to our schema builder's cache, and replace the builders iterator with our
// nested schema iterator / extractor. Once complete, add the new schema to our builder's schemas.
const nestedSchemaExtractor = new SchemaExtractor(schema)
builder.iterator = nestedSchemaExtractor
const nestedSchema = SchemaBuilder.getSchema(schemaName, nestedSchemaExtractor, builder)
for (const nestedSubSchemaName in nestedSchema.components.schemas) {
if (nestedSchema.components.schemas.hasOwnProperty(nestedSubSchemaName)) {
builder.schema.components.schemas[nestedSubSchemaName] = nestedSchema.components.schemas[nestedSubSchemaName]
}
}
return true
} else {
if (!builder.shouldExtractSchema(schemaName, depth)) {
return false
}
for (const field of schema.fields) {
if (!this.extractProperty(field, schemaName, field.name, builder, depth)) {
log.warn(`DSM: Unable to extract field with name: ${field.name} from Avro schema with name: ${schemaName}`)
}
}
}
return true
}

static extractSchemas (descriptor, dataStreamsProcessor) {
return dataStreamsProcessor.getSchema(descriptor.name, new SchemaExtractor(descriptor))
}

iterateOverSchema (builder) {
this.constructor.extractSchema(this.schema, builder, 0)
}

static attachSchemaOnSpan (args, span, operation, tracer) {
const { messageClass } = args
const descriptor = messageClass?.constructor?.type ?? messageClass

if (!descriptor || !span) {
return
}

if (span.context()._tags[SCHEMA_TYPE] && operation === 'serialization') {
// we have already added a schema to this span, this call is an encode of nested schema types
return
}

span.setTag(SCHEMA_TYPE, AVRO)
span.setTag(SCHEMA_NAME, descriptor.name)
span.setTag(SCHEMA_OPERATION, operation)

if (!tracer._dataStreamsProcessor.canSampleSchema(operation)) {
return
}

// if the span is unsampled, do not sample the schema
if (!tracer._prioritySampler.isSampled(span)) {
return
}

const weight = tracer._dataStreamsProcessor.trySampleSchema(operation)
if (weight === 0) {
return
}

const schemaData = SchemaBuilder.getSchemaDefinition(
this.extractSchemas(descriptor, tracer._dataStreamsProcessor)
)

span.setTag(SCHEMA_DEFINITION, schemaData.definition)
span.setTag(SCHEMA_WEIGHT, weight)
span.setTag(SCHEMA_ID, schemaData.id)
}
}

module.exports = SchemaExtractor
31 changes: 31 additions & 0 deletions packages/datadog-plugin-avsc/test/helpers.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
const fs = require('fs')

async function loadMessage (avro, messageTypeName) {
if (messageTypeName === 'User') {
// Read and parse the Avro schema
const schema = JSON.parse(fs.readFileSync('packages/datadog-plugin-avsc/test/schemas/user.avsc', 'utf8'))

// Create a file and write Avro data
const filePath = 'packages/datadog-plugin-avsc/test/schemas/users.avro'

return {
schema,
path: filePath
}
} else if (messageTypeName === 'AdvancedUser') {
// Read and parse the Avro schema
const schema = JSON.parse(fs.readFileSync('packages/datadog-plugin-avsc/test/schemas/advanced_user.avsc', 'utf8'))

// Create a file and write Avro data
const filePath = 'packages/datadog-plugin-avsc/test/schemas/advanced_users.avro'

return {
schema,
path: filePath
}
}
}

module.exports = {
loadMessage
}
Loading

0 comments on commit 57c45ee

Please sign in to comment.