diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index e41b69a23e..d61d400b96 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -691,6 +691,36 @@ def _on_botocore_patched_bedrock_api_call_success(ctx, reqid, latency, input_tok span.set_tag_str("bedrock.usage.completion_tokens", output_token_count) +def _propagate_context(ctx, headers): + distributed_tracing_enabled = ctx["integration_config"].distributed_tracing_enabled + call_key = ctx.get_item("call_key") + if call_key is None: + log.warning("call_key not found in ctx") + if distributed_tracing_enabled and call_key: + span = ctx[ctx["call_key"]] + HTTPPropagator.inject(span.context, headers) + + +def _after_job_execution(ctx, job_failed, span_tags): + """sets job.status and job.origin span tags after job is performed""" + # get_status() returns None when ttl=0 + call_key = ctx.get_item("call_key") + if call_key: + span = ctx[ctx["call_key"]] + if span: + if job_failed: + span.error = 1 + for k in span_tags.keys(): + span.set_tag_str(k, span_tags[k]) + + +def _on_end_of_traced_method_in_fork(ctx): + """Force flush to agent since the process `os.exit()`s + immediately after this method returns + """ + ctx["pin"].tracer.flush() + + def _on_botocore_bedrock_process_response( ctx: core.ExecutionContext, formatted_response: Dict[str, Any], @@ -830,6 +860,9 @@ def listen(): core.on("test_visibility.enable", _on_test_visibility_enable) core.on("test_visibility.disable", _on_test_visibility_disable) core.on("test_visibility.is_enabled", _on_test_visibility_is_enabled, "is_enabled") + core.on("rq.worker.perform_job", _after_job_execution) + core.on("rq.worker.after.perform.job", _on_end_of_traced_method_in_fork) + core.on("rq.queue.enqueue_job", _propagate_context) for context_name in ( "flask.call", @@ -848,6 +881,11 @@ def listen(): "botocore.patched_stepfunctions_api_call", "botocore.patched_bedrock_api_call", "redis.command", + "rq.queue.enqueue_job", + "rq.traced_queue_fetch_job", + "rq.worker.perform_job", + "rq.job.perform", + "rq.job.fetch_many", ): core.on(f"context.started.start_span.{context_name}", _start_span) diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index 62648ca4e6..9369b2010b 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -81,6 +81,7 @@ from ddtrace import Pin from ddtrace import config from ddtrace.constants import SPAN_KIND +from ddtrace.internal import core from ddtrace.internal.constants import COMPONENT from ddtrace.internal.schema import schematize_messaging_operation from ddtrace.internal.schema import schematize_service_name @@ -90,7 +91,6 @@ from ...ext import SpanKind from ...ext import SpanTypes -from ...propagation.http import HTTPPropagator from .. import trace_utils @@ -114,6 +114,11 @@ ) +JOB_ID = "job.id" +QUEUE_NAME = "queue.name" +JOB_FUNC_NAME = "job.func_name" + + def get_version(): # type: () -> str import rq @@ -134,37 +139,44 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): else: resource = func_name - with pin.tracer.trace( - schematize_messaging_operation("rq.queue.enqueue_job", provider="rq", direction=SpanDirection.OUTBOUND), + with core.context_with_data( + "rq.queue.enqueue_job", + span_name=schematize_messaging_operation( + "rq.queue.enqueue_job", provider="rq", direction=SpanDirection.OUTBOUND + ), + pin=pin, service=trace_utils.int_service(pin, config.rq), resource=resource, span_type=SpanTypes.WORKER, - ) as span: - span.set_tag_str(COMPONENT, config.rq.integration_name) - - # set span.kind to the type of request being performed - span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) - - span.set_tag_str("queue.name", instance.name) - span.set_tag_str("job.id", job.get_id()) - span.set_tag_str("job.func_name", job.func_name) - + call_key="queue.enqueue_job", + integration_config=config.rq_worker, + tags={ + COMPONENT: config.rq.integration_name, + SPAN_KIND: SpanKind.PRODUCER, + QUEUE_NAME: instance.name, + JOB_ID: job.get_id(), + JOB_FUNC_NAME: job.func_name, + }, + ) as ctx, ctx[ctx["call_key"]]: # If the queue is_async then add distributed tracing headers to the job - if instance.is_async and config.rq.distributed_tracing_enabled: - HTTPPropagator.inject(span.context, job.meta) + if instance.is_async: + core.dispatch("rq.queue.enqueue_job", [ctx, job.meta]) return func(*args, **kwargs) @trace_utils.with_traced_module def traced_queue_fetch_job(rq, pin, func, instance, args, kwargs): - with pin.tracer.trace( - schematize_messaging_operation("rq.queue.fetch_job", provider="rq", direction=SpanDirection.PROCESSING), + job_id = get_argument_value(args, kwargs, 0, "job_id") + with core.context_with_data( + "rq.traced_queue_fetch_job", + span_name=schematize_messaging_operation( + "rq.queue.fetch_job", provider="rq", direction=SpanDirection.PROCESSING + ), + pin=pin, service=trace_utils.int_service(pin, config.rq), - ) as span: - span.set_tag_str(COMPONENT, config.rq.integration_name) - - job_id = get_argument_value(args, kwargs, 0, "job_id") - span.set_tag_str("job.id", job_id) + call_key="traced_queue_fetch_job", + tags={COMPONENT: config.rq.integration_name, JOB_ID: job_id}, + ) as ctx, ctx[ctx["call_key"]]: return func(*args, **kwargs) @@ -174,35 +186,31 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): # `perform_job` is executed in a freshly forked, short-lived instance job = get_argument_value(args, kwargs, 0, "job") - if config.rq_worker.distributed_tracing_enabled: - ctx = HTTPPropagator.extract(job.meta) - if ctx.trace_id: - pin.tracer.context_provider.activate(ctx) - try: - with pin.tracer.trace( + with core.context_with_data( "rq.worker.perform_job", + span_name="rq.worker.perform_job", service=trace_utils.int_service(pin, config.rq_worker), + pin=pin, span_type=SpanTypes.WORKER, resource=job.func_name, - ) as span: - span.set_tag_str(COMPONENT, config.rq.integration_name) - - # set span.kind to the type of request being performed - span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) - span.set_tag_str("job.id", job.get_id()) + call_key="worker.perform_job", + distributed_headers_config=config.rq_worker, + distributed_headers=job.meta, + tags={COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.CONSUMER, JOB_ID: job.get_id()}, + ) as ctx, ctx[ctx["call_key"]]: try: return func(*args, **kwargs) finally: - # get_status() returns None when ttl=0 - span.set_tag_str("job.status", job.get_status() or "None") - span.set_tag_str("job.origin", job.origin) - if job.is_failed: - span.error = 1 + # call _after_perform_job handler for job status and origin + span_tags = {"job.status": job.get_status() or "None", "job.origin": job.origin} + job_failed = job.is_failed + core.dispatch("rq.worker.perform_job", [ctx, job_failed, span_tags]) + finally: # Force flush to agent since the process `os.exit()`s # immediately after this method returns - pin.tracer.flush() + core.dispatch("rq.worker.after.perform.job", [ctx]) @trace_utils.with_traced_module @@ -213,24 +221,31 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs): # Inherit the service name from whatever parent exists. # eg. in a worker, a perform_job parent span will exist with the worker # service. - with pin.tracer.trace("rq.job.perform", resource=job.func_name) as span: - span.set_tag_str(COMPONENT, config.rq.integration_name) - - span.set_tag("job.id", job.get_id()) + with core.context_with_data( + "rq.job.perform", + span_name="rq.job.perform", + resource=job.func_name, + call_key="job.perform", + pin=pin, + tags={COMPONENT: config.rq.integration_name, JOB_ID: job.get_id()}, + ) as ctx, ctx[ctx["call_key"]]: return func(*args, **kwargs) @trace_utils.with_traced_module def traced_job_fetch_many(rq, pin, func, instance, args, kwargs): """Trace rq.Job.fetch_many(...)""" - with pin.tracer.trace( - schematize_messaging_operation("rq.job.fetch_many", provider="rq", direction=SpanDirection.PROCESSING), + job_ids = get_argument_value(args, kwargs, 0, "job_ids") + with core.context_with_data( + "rq.job.fetch_many", + span_name=schematize_messaging_operation( + "rq.job.fetch_many", provider="rq", direction=SpanDirection.PROCESSING + ), service=trace_utils.ext_service(pin, config.rq_worker), - ) as span: - span.set_tag_str(COMPONENT, config.rq.integration_name) - - job_ids = get_argument_value(args, kwargs, 0, "job_ids") - span.set_tag("job_ids", job_ids) + call_key="job.fetch_many", + pin=pin, + tags={COMPONENT: config.rq.integration_name, JOB_ID: job_ids}, + ) as ctx, ctx[ctx["call_key"]]: return func(*args, **kwargs)