Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support for amqp10 #254

Merged
merged 2 commits into from
Aug 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,12 @@ docker-base: &docker-base
image: docker.elastic.co/elasticsearch/elasticsearch-oss:6.2.4
- &rabbitmq
image: rabbitmq:3.6-alpine
- &qpid
image: scholzj/qpid-cpp:1.38.0
command: -p 5673
environment:
- QPIDD_ADMIN_USERNAME=admin
- QPIDD_ADMIN_PASSWORD=admin
build-node-base: &node-base
<<: *docker-base
working_directory: ~/dd-trace-js
Expand Down Expand Up @@ -74,6 +80,7 @@ jobs:
- *mongo
- *elasticsearch
- *rabbitmq
- *qpid
build-node-6:
<<: *node-base
docker:
Expand All @@ -84,6 +91,7 @@ jobs:
- *mongo
- *elasticsearch
- *rabbitmq
- *qpid
build-node-8:
<<: *node-base
docker:
Expand All @@ -94,6 +102,7 @@ jobs:
- *mongo
- *elasticsearch
- *rabbitmq
- *qpid
build-node-latest:
<<: *node-base
docker:
Expand All @@ -104,6 +113,7 @@ jobs:
- *mongo
- *elasticsearch
- *rabbitmq
- *qpid

workflows:
version: 2
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require,safe-buffer,MIT,Copyright Feross Aboukhadijeh
require,shimmer,BSD-2-Clause,Copyright Forrest L Norvell
require,url-parse,MIT,Copyright 2015 Unshift.io Arnout Kazemier the Contributors
dev,@airbnb/node-memwatch,WTFPL,Copyright Lloyd Hilaiel
dev,amqp10,MIT,Copyright 2014 Michael Lanzetta
dev,amqplib,MIT,Copyright 2013-2014 Michael Bridgen
dev,axios,MIT,Copyright 2014-present Matt Zabriskie
dev,benchmark,MIT,Copyright 2010-2016 Mathias Bynens Robert Kieffer John-David Dalton
Expand Down
8 changes: 8 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,11 @@ services:
image: rabbitmq:3.6-alpine
ports:
- "127.0.0.1:5672:5672"
qpid:
image: scholzj/qpid-cpp:1.38.0
command: -p 5673
environment:
- QPIDD_ADMIN_USERNAME=admin
- QPIDD_ADMIN_PASSWORD=admin
ports:
- "127.0.0.1:5673:5673"
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"url-parse": "^1.2.0"
},
"devDependencies": {
"amqp10": "^3.6.0",
"amqplib": "^0.5.2",
"axios": "^0.18.0",
"benchmark": "^2.1.4",
Expand Down
148 changes: 148 additions & 0 deletions src/plugins/amqp10.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
'use strict'

function createWrapSend (tracer, config) {
return function wrapSend (send) {
return function sendWithTrace (msg, options) {
const span = startSendSpan(tracer, config, this)

try {
const promise = send.apply(this, arguments)

return wrapPromise(promise, span)
} catch (e) {
finish(span, e)
throw e
}
}
}
}

function createWrapMessageReceived (tracer, config) {
return function wrapMessageReceived (messageReceived) {
return function messageReceivedWithTrace (transferFrame) {
if (transferFrame.aborted || transferFrame.more) {
return messageReceived.apply(this, arguments)
}

const span = startReceiveSpan(tracer, config, this)

process.nextTick(() => {
tracer.scopeManager().activate(span, true)
messageReceived.apply(this, arguments)
})
}
}
}

function startSendSpan (tracer, config, link) {
const address = link.session.connection.address
const target = getAddress(link)

const span = tracer.startSpan(`amqp.send`, {
tags: {
'resource.name': `send ${target}`,
'span.kind': 'producer',
'amqp.link.target.address': target,
'amqp.link.role': 'sender',
'out.host': address.host,
'out.port': address.port
}
})

addTags(tracer, config, span, link)

return span
}

function startReceiveSpan (tracer, config, link) {
const source = getAddress(link)
const span = tracer.startSpan(`amqp.receive`, {
tags: {
'resource.name': `receive ${source}`,
'span.kind': 'consumer',
'amqp.link.source.address': source,
'amqp.link.role': 'receiver'
}
})

addTags(tracer, config, span, link)

return span
}

function addTags (tracer, config, span, link) {
const address = link.session.connection.address

span.addTags({
'service.name': config.service || `${tracer._service}-amqp`,
'span.type': 'worker',
'amqp.link.name': link.name,
'amqp.link.handle': link.handle,
'amqp.connection.host': address.host,
'amqp.connection.port': address.port
})

if (address.user) {
span.setTag('amqp.connection.user', address.user)
}

return span
}

function finish (span, error) {
if (error) {
span.addTags({
'error.type': error.name,
'error.msg': error.message,
'error.stack': error.stack
})
}

span.finish()
}

function wrapPromise (promise, span) {
if (!promise) {
finish(span)
return promise
}

return promise
.then(() => {
finish(span)
return promise
})
.catch(err => {
finish(span, err)
return promise
})
}

function getAddress (link) {
return link.name.split('_').slice(0, -1).join('_')
}

module.exports = [
{
name: 'amqp10',
file: 'lib/sender_link.js',
versions: ['3.x'],
patch (SenderLink, tracer, config) {
this.wrap(SenderLink.prototype, 'send', createWrapSend(tracer, config))
},
unpatch (SenderLink) {
this.unwrap(SenderLink.prototype, 'send')
}
},
{
name: 'amqp10',
file: 'lib/receiver_link.js',
versions: ['3.x'],
patch (ReceiverLink, tracer, config) {
this.wrap(ReceiverLink.prototype, '_messageReceived', createWrapMessageReceived(tracer, config))
},
unpatch (ReceiverLink) {
this.unwrap(ReceiverLink.prototype, '_messageReceived')
}
}
]
35 changes: 35 additions & 0 deletions test/leak/plugins/amqp10.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
'use strict'

require('../../..')
.init({ plugins: false, sampleRate: 0 })
.use('amqp10')

const test = require('tape')
const profile = require('../../profile')

test('amqp10 plugin should not leak', t => {
const amqp = require('amqp10')
const client = new amqp.Client()

return client.connect('amqp://admin:admin@localhost:5673')
.then(() => {
return Promise.all([
client.createReceiver('amq.topic'),
client.createSender('amq.topic')
])
})
.then(handlers => {
const receiver = handlers[0]
const sender = handlers[1]

profile(t, operation)
.then(() => receiver.detach())
.then(() => sender.detach())
.then(() => client.disconnect())

function operation (done) {
sender.send({ key: 'value' })
receiver.once('message', done)
}
})
})
Loading