Skip to content

Commit

Permalink
add support for amqp10 (#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
rochdev authored Aug 22, 2018
1 parent 60a7ea4 commit d7e63c9
Show file tree
Hide file tree
Showing 8 changed files with 429 additions and 5 deletions.
10 changes: 10 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ docker-base: &docker-base
image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.2.4
- &rabbitmq
image: rabbitmq:3.6-alpine
- &qpid
image: scholzj/qpid-cpp:1.38.0
command: -p 5673
environment:
- QPIDD_ADMIN_USERNAME=admin
- QPIDD_ADMIN_PASSWORD=admin
build-node-base: &node-base
<<: *docker-base
working_directory: ~/dd-trace-js
Expand Down Expand Up @@ -74,6 +80,7 @@ jobs:
- *mongo
- *elasticsearch
- *rabbitmq
- *qpid
build-node-6:
<<: *node-base
docker:
Expand All @@ -84,6 +91,7 @@ jobs:
- *mongo
- *elasticsearch
- *rabbitmq
- *qpid
build-node-8:
<<: *node-base
docker:
Expand All @@ -94,6 +102,7 @@ jobs:
- *mongo
- *elasticsearch
- *rabbitmq
- *qpid
build-node-latest:
<<: *node-base
docker:
Expand All @@ -104,6 +113,7 @@ jobs:
- *mongo
- *elasticsearch
- *rabbitmq
- *qpid

workflows:
version: 2
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require,safe-buffer,MIT,Copyright Feross Aboukhadijeh
require,shimmer,BSD-2-Clause,Copyright Forrest L Norvell
require,url-parse,MIT,Copyright 2015 Unshift.io Arnout Kazemier the Contributors
dev,@airbnb/node-memwatch,WTFPL,Copyright Lloyd Hilaiel
dev,amqp10,MIT,Copyright 2014 Michael Lanzetta
dev,amqplib,MIT,Copyright 2013-2014 Michael Bridgen
dev,axios,MIT,Copyright 2014-present Matt Zabriskie
dev,benchmark,MIT,Copyright 2010-2016 Mathias Bynens Robert Kieffer John-David Dalton
Expand Down
8 changes: 8 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,11 @@ services:
image: rabbitmq:3.6-alpine
ports:
- "127.0.0.1:5672:5672"
qpid:
image: scholzj/qpid-cpp:1.38.0
command: -p 5673
environment:
- QPIDD_ADMIN_USERNAME=admin
- QPIDD_ADMIN_PASSWORD=admin
ports:
- "127.0.0.1:5673:5673"
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"url-parse": "^1.2.0"
},
"devDependencies": {
"amqp10": "^3.6.0",
"amqplib": "^0.5.2",
"axios": "^0.18.0",
"benchmark": "^2.1.4",
Expand Down
148 changes: 148 additions & 0 deletions src/plugins/amqp10.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
'use strict'

function createWrapSend (tracer, config) {
return function wrapSend (send) {
return function sendWithTrace (msg, options) {
const span = startSendSpan(tracer, config, this)

try {
const promise = send.apply(this, arguments)

return wrapPromise(promise, span)
} catch (e) {
finish(span, e)
throw e
}
}
}
}

function createWrapMessageReceived (tracer, config) {
return function wrapMessageReceived (messageReceived) {
return function messageReceivedWithTrace (transferFrame) {
if (transferFrame.aborted || transferFrame.more) {
return messageReceived.apply(this, arguments)
}

const span = startReceiveSpan(tracer, config, this)

process.nextTick(() => {
tracer.scopeManager().activate(span, true)
messageReceived.apply(this, arguments)
})
}
}
}

function startSendSpan (tracer, config, link) {
const address = link.session.connection.address
const target = getAddress(link)

const span = tracer.startSpan(`amqp.send`, {
tags: {
'resource.name': `send ${target}`,
'span.kind': 'producer',
'amqp.link.target.address': target,
'amqp.link.role': 'sender',
'out.host': address.host,
'out.port': address.port
}
})

addTags(tracer, config, span, link)

return span
}

function startReceiveSpan (tracer, config, link) {
const source = getAddress(link)
const span = tracer.startSpan(`amqp.receive`, {
tags: {
'resource.name': `receive ${source}`,
'span.kind': 'consumer',
'amqp.link.source.address': source,
'amqp.link.role': 'receiver'
}
})

addTags(tracer, config, span, link)

return span
}

function addTags (tracer, config, span, link) {
const address = link.session.connection.address

span.addTags({
'service.name': config.service || `${tracer._service}-amqp`,
'span.type': 'worker',
'amqp.link.name': link.name,
'amqp.link.handle': link.handle,
'amqp.connection.host': address.host,
'amqp.connection.port': address.port
})

if (address.user) {
span.setTag('amqp.connection.user', address.user)
}

return span
}

function finish (span, error) {
if (error) {
span.addTags({
'error.type': error.name,
'error.msg': error.message,
'error.stack': error.stack
})
}

span.finish()
}

function wrapPromise (promise, span) {
if (!promise) {
finish(span)
return promise
}

return promise
.then(() => {
finish(span)
return promise
})
.catch(err => {
finish(span, err)
return promise
})
}

function getAddress (link) {
return link.name.split('_').slice(0, -1).join('_')
}

module.exports = [
{
name: 'amqp10',
file: 'lib/sender_link.js',
versions: ['3.x'],
patch (SenderLink, tracer, config) {
this.wrap(SenderLink.prototype, 'send', createWrapSend(tracer, config))
},
unpatch (SenderLink) {
this.unwrap(SenderLink.prototype, 'send')
}
},
{
name: 'amqp10',
file: 'lib/receiver_link.js',
versions: ['3.x'],
patch (ReceiverLink, tracer, config) {
this.wrap(ReceiverLink.prototype, '_messageReceived', createWrapMessageReceived(tracer, config))
},
unpatch (ReceiverLink) {
this.unwrap(ReceiverLink.prototype, '_messageReceived')
}
}
]
35 changes: 35 additions & 0 deletions test/leak/plugins/amqp10.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict'

require('../../..')
.init({ plugins: false, sampleRate: 0 })
.use('amqp10')

const test = require('tape')
const profile = require('../../profile')

test('amqp10 plugin should not leak', t => {
const amqp = require('amqp10')
const client = new amqp.Client()

return client.connect('amqp://admin:admin@localhost:5673')
.then(() => {
return Promise.all([
client.createReceiver('amq.topic'),
client.createSender('amq.topic')
])
})
.then(handlers => {
const receiver = handlers[0]
const sender = handlers[1]

profile(t, operation)
.then(() => receiver.detach())
.then(() => sender.detach())
.then(() => client.disconnect())

function operation (done) {
sender.send({ key: 'value' })
receiver.once('message', done)
}
})
})
Loading

0 comments on commit d7e63c9

Please sign in to comment.