diff --git a/instana/instrumentation/celery/hooks.py b/instana/instrumentation/celery/hooks.py index e1dc82fd3..7045dfc81 100644 --- a/instana/instrumentation/celery/hooks.py +++ b/instana/instrumentation/celery/hooks.py @@ -19,29 +19,44 @@ def add_broker_tags(span, broker_url): try: url = parse.urlparse(broker_url) span.set_tag("scheme", url.scheme) - span.set_tag("host", url.hostname) - span.set_tag("port", url.port) + + if url.hostname is None: + span.set_tag("host", 'localhost') + else: + span.set_tag("host", url.hostname) + + if url.port is None: + # Set default port if not specified + if url.scheme == 'redis': + span.set_tag("port", "6379") + elif 'amqp' in url.scheme: + span.set_tag("port", "5672") + elif 'sqs' in url.scheme: + span.set_tag("port", "443") + else: + span.set_tag("port", str(url.port)) except: logger.debug("Error parsing broker URL: %s" % broker_url, exc_info=True) @signals.task_prerun.connect def task_prerun(*args, **kwargs): try: + ctx = None task = kwargs.get('sender', None) task_id = kwargs.get('task_id', None) task = registry.tasks.get(task.name) headers = task.request.get('headers', {}) - ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, headers) + if headers is not None: + ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, headers) - if ctx is not None: - scope = tracer.start_active_span("celery-worker", child_of=ctx) - scope.span.set_tag("task", task.name) - scope.span.set_tag("task_id", task_id) - add_broker_tags(scope.span, task.app.conf['broker_url']) + scope = tracer.start_active_span("celery-worker", child_of=ctx) + scope.span.set_tag("task", task.name) + scope.span.set_tag("task_id", task_id) + add_broker_tags(scope.span, task.app.conf['broker_url']) - # Store the scope on the task to eventually close it out on the "after" signal - task_catalog_push(task, task_id, scope, True) + # Store the scope on the task to eventually close it out on the "after" signal + task_catalog_push(task, task_id, scope, True) except: logger.debug("task_prerun: ", exc_info=True) diff --git a/instana/instrumentation/logging.py b/instana/instrumentation/logging.py index 8cfd2d6b4..a81a381a1 100644 --- a/instana/instrumentation/logging.py +++ b/instana/instrumentation/logging.py @@ -1,8 +1,9 @@ from __future__ import absolute_import +import sys import wrapt import logging -import sys +import collections from ..log import logger from ..singletons import tracer @@ -18,10 +19,14 @@ def log_with_instana(wrapped, instance, argv, kwargs): # Only needed if we're tracing and serious log if parent_span and argv[0] >= logging.WARN: + + msg = str(argv[1]) + args = argv[2] + if args and len(args) == 1 and isinstance(args[0], collections.Mapping) and args[0]: + args = args[0] + # get the formatted log message - # clients such as suds-jurko log things such as: Fault(Server: 'Server side fault example.') - # So make sure we're working with a string - msg = str(argv[1]) % argv[2] + msg = msg % args # get additional information if an exception is being handled parameters = None @@ -37,8 +42,8 @@ def log_with_instana(wrapped, instance, argv, kwargs): # extra tags for an error if argv[0] >= logging.ERROR: scope.span.mark_as_errored() - except Exception as e: - logger.debug('Exception: %s', e, exc_info=True) + except Exception: + logger.debug('log_with_instana:', exc_info=True) finally: return wrapped(*argv, **kwargs) diff --git a/tests/frameworks/test_celery.py b/tests/frameworks/test_celery.py index 497a537cb..dc90d28a9 100644 --- a/tests/frameworks/test_celery.py +++ b/tests/frameworks/test_celery.py @@ -16,6 +16,15 @@ def will_raise_error(): raise Exception('This is a simulated error') +def filter_out_ping_tasks(spans): + filtered_spans = [] + for span in spans: + is_ping_task = (span.n == 'celery-worker' and span.data['celery']['task'] == 'celery.ping') + if not is_ping_task: + filtered_spans.append(span) + return filtered_spans + + def setup_method(): """ Clear all spans before a test run """ tracer.recorder.clear_spans() @@ -29,7 +38,7 @@ def test_apply_async(celery_app, celery_worker): # Wait for jobs to finish time.sleep(0.5) - spans = tracer.recorder.queued_spans() + spans = filter_out_ping_tasks(tracer.recorder.queued_spans()) assert len(spans) == 3 filter = lambda span: span.n == "sdk" @@ -51,7 +60,7 @@ def test_apply_async(celery_app, celery_worker): assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"]) assert("redis" == client_span.data["celery"]["scheme"]) assert("localhost" == client_span.data["celery"]["host"]) - assert(6379 == client_span.data["celery"]["port"]) + assert("6379" == client_span.data["celery"]["port"]) assert(client_span.data["celery"]["task_id"]) assert(client_span.data["celery"]["error"] == None) assert(client_span.ec == None) @@ -59,7 +68,7 @@ def test_apply_async(celery_app, celery_worker): assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"]) assert("redis" == worker_span.data["celery"]["scheme"]) assert("localhost" == worker_span.data["celery"]["host"]) - assert(6379 == worker_span.data["celery"]["port"]) + assert("6379" == worker_span.data["celery"]["port"]) assert(worker_span.data["celery"]["task_id"]) assert(worker_span.data["celery"]["error"] == None) assert(worker_span.data["celery"]["retry-reason"] == None) @@ -74,7 +83,7 @@ def test_delay(celery_app, celery_worker): # Wait for jobs to finish time.sleep(0.5) - spans = tracer.recorder.queued_spans() + spans = filter_out_ping_tasks(tracer.recorder.queued_spans()) assert len(spans) == 3 filter = lambda span: span.n == "sdk" @@ -96,7 +105,7 @@ def test_delay(celery_app, celery_worker): assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"]) assert("redis" == client_span.data["celery"]["scheme"]) assert("localhost" == client_span.data["celery"]["host"]) - assert(6379 == client_span.data["celery"]["port"]) + assert("6379" == client_span.data["celery"]["port"]) assert(client_span.data["celery"]["task_id"]) assert(client_span.data["celery"]["error"] == None) assert(client_span.ec == None) @@ -104,7 +113,7 @@ def test_delay(celery_app, celery_worker): assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"]) assert("redis" == worker_span.data["celery"]["scheme"]) assert("localhost" == worker_span.data["celery"]["host"]) - assert(6379 == worker_span.data["celery"]["port"]) + assert("6379" == worker_span.data["celery"]["port"]) assert(worker_span.data["celery"]["task_id"]) assert(worker_span.data["celery"]["error"] == None) assert(worker_span.data["celery"]["retry-reason"] == None) @@ -119,7 +128,7 @@ def test_send_task(celery_app, celery_worker): # Wait for jobs to finish time.sleep(0.5) - spans = tracer.recorder.queued_spans() + spans = filter_out_ping_tasks(tracer.recorder.queued_spans()) assert len(spans) == 3 filter = lambda span: span.n == "sdk" @@ -141,7 +150,7 @@ def test_send_task(celery_app, celery_worker): assert("tests.frameworks.test_celery.add" == client_span.data["celery"]["task"]) assert("redis" == client_span.data["celery"]["scheme"]) assert("localhost" == client_span.data["celery"]["host"]) - assert(6379 == client_span.data["celery"]["port"]) + assert("6379" == client_span.data["celery"]["port"]) assert(client_span.data["celery"]["task_id"]) assert(client_span.data["celery"]["error"] == None) assert(client_span.ec == None) @@ -149,7 +158,7 @@ def test_send_task(celery_app, celery_worker): assert("tests.frameworks.test_celery.add" == worker_span.data["celery"]["task"]) assert("redis" == worker_span.data["celery"]["scheme"]) assert("localhost" == worker_span.data["celery"]["host"]) - assert(6379 == worker_span.data["celery"]["port"]) + assert("6379" == worker_span.data["celery"]["port"]) assert(worker_span.data["celery"]["task_id"]) assert(worker_span.data["celery"]["error"] == None) assert(worker_span.data["celery"]["retry-reason"] == None) @@ -164,8 +173,8 @@ def test_error_reporting(celery_app, celery_worker): # Wait for jobs to finish time.sleep(0.5) - spans = tracer.recorder.queued_spans() - assert len(spans) == 3 + spans = filter_out_ping_tasks(tracer.recorder.queued_spans()) + assert len(spans) == 4 filter = lambda span: span.n == "sdk" test_span = get_first_span_by_filter(spans, filter) @@ -175,18 +184,26 @@ def test_error_reporting(celery_app, celery_worker): client_span = get_first_span_by_filter(spans, filter) assert(client_span) + filter = lambda span: span.n == "log" + log_span = get_first_span_by_filter(spans, filter) + assert(log_span) + filter = lambda span: span.n == "celery-worker" worker_span = get_first_span_by_filter(spans, filter) assert(worker_span) assert(client_span.t == test_span.t) assert(client_span.t == worker_span.t) + assert(client_span.t == log_span.t) + assert(client_span.p == test_span.s) + assert(worker_span.p == client_span.s) + assert(log_span.p == worker_span.s) assert("tests.frameworks.test_celery.will_raise_error" == client_span.data["celery"]["task"]) assert("redis" == client_span.data["celery"]["scheme"]) assert("localhost" == client_span.data["celery"]["host"]) - assert(6379 == client_span.data["celery"]["port"]) + assert("6379" == client_span.data["celery"]["port"]) assert(client_span.data["celery"]["task_id"]) assert(client_span.data["celery"]["error"] == None) assert(client_span.ec == None) @@ -194,7 +211,7 @@ def test_error_reporting(celery_app, celery_worker): assert("tests.frameworks.test_celery.will_raise_error" == worker_span.data["celery"]["task"]) assert("redis" == worker_span.data["celery"]["scheme"]) assert("localhost" == worker_span.data["celery"]["host"]) - assert(6379 == worker_span.data["celery"]["port"]) + assert("6379" == worker_span.data["celery"]["port"]) assert(worker_span.data["celery"]["task_id"]) assert(worker_span.data["celery"]["error"] == 'This is a simulated error') assert(worker_span.data["celery"]["retry-reason"] == None)