Skip to content

Commit

Permalink
Celery: Updated broker tags (#239)
Browse files Browse the repository at this point in the history
* Update broker tags

* Update recorded broker spans

* Update unrelated couchbase tests
  • Loading branch information
pglombardo authored Jun 30, 2020
1 parent c71823c commit a087af6
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 18 deletions.
19 changes: 17 additions & 2 deletions instana/instrumentation/celery/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,21 @@
from .catalog import task_catalog_get, task_catalog_pop, task_catalog_push, get_task_id
from celery.contrib import rdb

try:
from urllib import parse
except ImportError:
import urlparse as parse
import urllib

def add_broker_tags(span, broker_url):
try:
url = parse.urlparse(broker_url)
span.set_tag("scheme", url.scheme)
span.set_tag("host", url.hostname)
span.set_tag("port", url.port)
except:
logger.debug("Error parsing broker URL: %s" % broker_url, exc_info=True)

@signals.task_prerun.connect
def task_prerun(*args, **kwargs):
try:
Expand All @@ -24,7 +39,7 @@ def task_prerun(*args, **kwargs):
scope = tracer.start_active_span("celery-worker", child_of=ctx)
scope.span.set_tag("task", task.name)
scope.span.set_tag("task_id", task_id)
scope.span.set_tag("broker", task.app.conf['broker_url'])
add_broker_tags(scope.span, task.app.conf['broker_url'])

# Store the scope on the task to eventually close it out on the "after" signal
task_catalog_push(task, task_id, scope, True)
Expand Down Expand Up @@ -86,8 +101,8 @@ def before_task_publish(*args, **kwargs):

scope = tracer.start_active_span("celery-client", child_of=parent_span)
scope.span.set_tag("task", task_name)
scope.span.set_tag("broker", task.app.conf['broker_url'])
scope.span.set_tag("task_id", task_id)
add_broker_tags(scope.span, task.app.conf['broker_url'])

# Context propagation
context_headers = {}
Expand Down
8 changes: 6 additions & 2 deletions instana/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,9 @@ def _populate_entry_span_data(self, span):
elif span.operation_name == "celery-worker":
self.data["celery"]["task"] = span.tags.pop('task', None)
self.data["celery"]["task_id"] = span.tags.pop('task_id', None)
self.data["celery"]["broker"] = span.tags.pop('broker', None)
self.data["celery"]["scheme"] = span.tags.pop('scheme', None)
self.data["celery"]["host"] = span.tags.pop('host', None)
self.data["celery"]["port"] = span.tags.pop('port', None)
self.data["celery"]["retry-reason"] = span.tags.pop('retry-reason', None)
self.data["celery"]["error"] = span.tags.pop('error', None)

Expand Down Expand Up @@ -314,7 +316,9 @@ def _populate_exit_span_data(self, span):
elif span.operation_name == "celery-client":
self.data["celery"]["task"] = span.tags.pop('task', None)
self.data["celery"]["task_id"] = span.tags.pop('task_id', None)
self.data["celery"]["broker"] = span.tags.pop('broker', None)
self.data["celery"]["scheme"] = span.tags.pop('scheme', None)
self.data["celery"]["host"] = span.tags.pop('host', None)
self.data["celery"]["port"] = span.tags.pop('port', None)
self.data["celery"]["error"] = span.tags.pop('error', None)

elif span.operation_name == "couchbase":
Expand Down
9 changes: 3 additions & 6 deletions tests/clients/test_couchbase.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import absolute_import

import pytest
import time
import unittest

from instana.singletons import tracer
Expand Down Expand Up @@ -28,16 +28,15 @@ class TestStandardCouchDB(unittest.TestCase):
def setUp(self):
""" Clear all spans before a test run """
self.recorder = tracer.recorder
self.recorder.clear_spans()
self.cluster = Cluster('couchbase://%s' % testenv['couchdb_host'])
self.bucket = Bucket('couchbase://%s/travel-sample' % testenv['couchdb_host'],
username=testenv['couchdb_username'], password=testenv['couchdb_password'])
# self.bucket = self.cluster.open_bucket('travel-sample')
self.bucket.upsert('test-key', 1)
self.recorder.clear_spans()

def tearDown(self):
""" Do nothing for now """
return None
time.sleep(0.5)

def test_vanilla_get(self):
res = self.bucket.get("test-key")
Expand Down Expand Up @@ -474,7 +473,6 @@ def test_prepend_multi(self):
self.assertEqual(cb_span.data["couchbase"]["bucket"], 'travel-sample')
self.assertEqual(cb_span.data["couchbase"]["type"], 'prepend_multi')

@pytest.mark.skip(reason="Failing test for unchanged instrumentation; todo")
def test_get(self):
res = None

Expand Down Expand Up @@ -1080,7 +1078,6 @@ def test_ping(self):
self.assertEqual(cb_span.data["couchbase"]["bucket"], 'travel-sample')
self.assertEqual(cb_span.data["couchbase"]["type"], 'ping')

@pytest.mark.skip
def test_diagnostics(self):
res = None

Expand Down
32 changes: 24 additions & 8 deletions tests/frameworks/test_celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,17 @@ def test_apply_async(celery_app, celery_worker):
assert(client_span.p == test_span.s)

assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"])
assert("redis://localhost:6379" == client_span.data["celery"]["broker"])
assert("redis" == client_span.data["celery"]["scheme"])
assert("localhost" == client_span.data["celery"]["host"])
assert(6379 == client_span.data["celery"]["port"])
assert(client_span.data["celery"]["task_id"])
assert(client_span.data["celery"]["error"] == None)
assert(client_span.ec == None)

assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"])
assert("redis://localhost:6379" == worker_span.data["celery"]["broker"])
assert("redis" == worker_span.data["celery"]["scheme"])
assert("localhost" == worker_span.data["celery"]["host"])
assert(6379 == worker_span.data["celery"]["port"])
assert(worker_span.data["celery"]["task_id"])
assert(worker_span.data["celery"]["error"] == None)
assert(worker_span.data["celery"]["retry-reason"] == None)
Expand Down Expand Up @@ -90,13 +94,17 @@ def test_delay(celery_app, celery_worker):
assert(client_span.p == test_span.s)

assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"])
assert("redis://localhost:6379" == client_span.data["celery"]["broker"])
assert("redis" == client_span.data["celery"]["scheme"])
assert("localhost" == client_span.data["celery"]["host"])
assert(6379 == client_span.data["celery"]["port"])
assert(client_span.data["celery"]["task_id"])
assert(client_span.data["celery"]["error"] == None)
assert(client_span.ec == None)

assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"])
assert("redis://localhost:6379" == worker_span.data["celery"]["broker"])
assert("redis" == worker_span.data["celery"]["scheme"])
assert("localhost" == worker_span.data["celery"]["host"])
assert(6379 == worker_span.data["celery"]["port"])
assert(worker_span.data["celery"]["task_id"])
assert(worker_span.data["celery"]["error"] == None)
assert(worker_span.data["celery"]["retry-reason"] == None)
Expand Down Expand Up @@ -131,13 +139,17 @@ def test_send_task(celery_app, celery_worker):
assert(client_span.p == test_span.s)

assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"])
assert("redis://localhost:6379" == client_span.data["celery"]["broker"])
assert("redis" == client_span.data["celery"]["scheme"])
assert("localhost" == client_span.data["celery"]["host"])
assert(6379 == client_span.data["celery"]["port"])
assert(client_span.data["celery"]["task_id"])
assert(client_span.data["celery"]["error"] == None)
assert(client_span.ec == None)

assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"])
assert("redis://localhost:6379" == worker_span.data["celery"]["broker"])
assert("redis" == worker_span.data["celery"]["scheme"])
assert("localhost" == worker_span.data["celery"]["host"])
assert(6379 == worker_span.data["celery"]["port"])
assert(worker_span.data["celery"]["task_id"])
assert(worker_span.data["celery"]["error"] == None)
assert(worker_span.data["celery"]["retry-reason"] == None)
Expand Down Expand Up @@ -172,13 +184,17 @@ def test_error_reporting(celery_app, celery_worker):
assert(client_span.p == test_span.s)

assert("tests.frameworks.test_celery.will_raise_error" == client_span.data["celery"]["task"])
assert("redis://localhost:6379" == client_span.data["celery"]["broker"])
assert("redis" == client_span.data["celery"]["scheme"])
assert("localhost" == client_span.data["celery"]["host"])
assert(6379 == client_span.data["celery"]["port"])
assert(client_span.data["celery"]["task_id"])
assert(client_span.data["celery"]["error"] == None)
assert(client_span.ec == None)

assert("tests.frameworks.test_celery.will_raise_error" == worker_span.data["celery"]["task"])
assert("redis://localhost:6379" == worker_span.data["celery"]["broker"])
assert("redis" == worker_span.data["celery"]["scheme"])
assert("localhost" == worker_span.data["celery"]["host"])
assert(6379 == worker_span.data["celery"]["port"])
assert(worker_span.data["celery"]["task_id"])
assert(worker_span.data["celery"]["error"] == 'This is a simulated error')
assert(worker_span.data["celery"]["retry-reason"] == None)
Expand Down

0 comments on commit a087af6

Please sign in to comment.