From 14bed80fc0911ae5d060d9cc206a63310d80b33f Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Tue, 10 Sep 2024 09:59:30 -0700 Subject: [PATCH 01/16] integrated core api to rq integration --- ddtrace/_trace/trace_handlers.py | 5 ++++ ddtrace/contrib/rq/__init__.py | 41 ++++++++++++++++++++++---------- 2 files changed, 34 insertions(+), 12 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index 7afe9b8e6af..813f38e3575 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -848,6 +848,11 @@ def listen(): "botocore.patched_stepfunctions_api_call", "botocore.patched_bedrock_api_call", "redis.command", + "rq.queue.enqueue_job", + "rq.traced_queue_fetch_job", + "rq.worker.perform_job", + "rq.job.perform", + "rq.job.fetch_many", ): core.on(f"context.started.start_span.{context_name}", _start_span) diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index 62648ca4e6b..83ce7391930 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -81,6 +81,7 @@ from ddtrace import Pin from ddtrace import config from ddtrace.constants import SPAN_KIND +from ddtrace.internal import core from ddtrace.internal.constants import COMPONENT from ddtrace.internal.schema import schematize_messaging_operation from ddtrace.internal.schema import schematize_service_name @@ -134,12 +135,17 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): else: resource = func_name - with pin.tracer.trace( - schematize_messaging_operation("rq.queue.enqueue_job", provider="rq", direction=SpanDirection.OUTBOUND), + with core.context_with_data( + "rq.queue.enqueue_job", + span_name=schematize_messaging_operation( + "rq.queue.enqueue_job", provider="rq", direction=SpanDirection.OUTBOUND + ), + pin=pin, service=trace_utils.int_service(pin, config.rq), resource=resource, span_type=SpanTypes.WORKER, - ) as span: + call_key="queue.enqueue_job", + ) as ctx, ctx[ctx["call_key"]] as span: span.set_tag_str(COMPONENT, config.rq.integration_name) # set span.kind to the type of request being performed @@ -157,10 +163,14 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): @trace_utils.with_traced_module def traced_queue_fetch_job(rq, pin, func, instance, args, kwargs): - with pin.tracer.trace( - schematize_messaging_operation("rq.queue.fetch_job", provider="rq", direction=SpanDirection.PROCESSING), + with core.context_with_data( + "rq.traced_queue_fetch_job", + span_name=schematize_messaging_operation( + "rq.queue.fetch_job", provider="rq", direction=SpanDirection.PROCESSING + ), service=trace_utils.int_service(pin, config.rq), - ) as span: + call_key="traced_queue_fetch_job", + ) as ctx, ctx[ctx["call_key"]] as span: span.set_tag_str(COMPONENT, config.rq.integration_name) job_id = get_argument_value(args, kwargs, 0, "job_id") @@ -180,12 +190,13 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): pin.tracer.context_provider.activate(ctx) try: - with pin.tracer.trace( + with core.context_with_data( "rq.worker.perform_job", service=trace_utils.int_service(pin, config.rq_worker), span_type=SpanTypes.WORKER, resource=job.func_name, - ) as span: + call_key="worker.perform_job", + ) as ctx, ctx[ctx["call_key"]] as span: span.set_tag_str(COMPONENT, config.rq.integration_name) # set span.kind to the type of request being performed @@ -213,7 +224,9 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs): # Inherit the service name from whatever parent exists. # eg. in a worker, a perform_job parent span will exist with the worker # service. - with pin.tracer.trace("rq.job.perform", resource=job.func_name) as span: + with core.context_with_data("rq.job.perform", resource=job.func_name, call_key="job.perform") as ctx, ctx[ + ctx["call_key"] + ] as span: span.set_tag_str(COMPONENT, config.rq.integration_name) span.set_tag("job.id", job.get_id()) @@ -223,10 +236,14 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs): @trace_utils.with_traced_module def traced_job_fetch_many(rq, pin, func, instance, args, kwargs): """Trace rq.Job.fetch_many(...)""" - with pin.tracer.trace( - schematize_messaging_operation("rq.job.fetch_many", provider="rq", direction=SpanDirection.PROCESSING), + with core.context_with_data( + "rq.job.fetch_many", + span_name=schematize_messaging_operation( + "rq.job.fetch_many", provider="rq", direction=SpanDirection.PROCESSING + ), service=trace_utils.ext_service(pin, config.rq_worker), - ) as span: + call_key="job.fetch_many", + ) as ctx, ctx[ctx["call_key"]] as span: span.set_tag_str(COMPONENT, config.rq.integration_name) job_ids = get_argument_value(args, kwargs, 0, "job_ids") From 5e5582eb5deaca8c40ed29dfcd38d3568e24876f Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Tue, 10 Sep 2024 14:19:47 -0700 Subject: [PATCH 02/16] added pin to context --- ddtrace/contrib/rq/__init__.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index 83ce7391930..fdb22bdbadb 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -145,6 +145,7 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): resource=resource, span_type=SpanTypes.WORKER, call_key="queue.enqueue_job", + ) as ctx, ctx[ctx["call_key"]] as span: span.set_tag_str(COMPONENT, config.rq.integration_name) @@ -168,6 +169,7 @@ def traced_queue_fetch_job(rq, pin, func, instance, args, kwargs): span_name=schematize_messaging_operation( "rq.queue.fetch_job", provider="rq", direction=SpanDirection.PROCESSING ), + pin=pin, service=trace_utils.int_service(pin, config.rq), call_key="traced_queue_fetch_job", ) as ctx, ctx[ctx["call_key"]] as span: @@ -192,7 +194,9 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): try: with core.context_with_data( "rq.worker.perform_job", + span_name="rq.worker.perform_job", service=trace_utils.int_service(pin, config.rq_worker), + pin=pin, span_type=SpanTypes.WORKER, resource=job.func_name, call_key="worker.perform_job", @@ -202,6 +206,7 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): # set span.kind to the type of request being performed span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) span.set_tag_str("job.id", job.get_id()) + try: return func(*args, **kwargs) finally: @@ -224,7 +229,7 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs): # Inherit the service name from whatever parent exists. # eg. in a worker, a perform_job parent span will exist with the worker # service. - with core.context_with_data("rq.job.perform", resource=job.func_name, call_key="job.perform") as ctx, ctx[ + with core.context_with_data("rq.job.perform", span_name="rq.job.perform", resource=job.func_name, call_key="job.perform", pin=pin) as ctx, ctx[ ctx["call_key"] ] as span: span.set_tag_str(COMPONENT, config.rq.integration_name) @@ -243,6 +248,7 @@ def traced_job_fetch_many(rq, pin, func, instance, args, kwargs): ), service=trace_utils.ext_service(pin, config.rq_worker), call_key="job.fetch_many", + pin=pin, ) as ctx, ctx[ctx["call_key"]] as span: span.set_tag_str(COMPONENT, config.rq.integration_name) From 214bc2ab1fbd277dcf397631ce4b668ff2bff973 Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Tue, 10 Sep 2024 14:54:24 -0700 Subject: [PATCH 03/16] corrected format --- ddtrace/contrib/rq/__init__.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index fdb22bdbadb..bf566e38c14 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -145,7 +145,6 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): resource=resource, span_type=SpanTypes.WORKER, call_key="queue.enqueue_job", - ) as ctx, ctx[ctx["call_key"]] as span: span.set_tag_str(COMPONENT, config.rq.integration_name) @@ -229,9 +228,9 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs): # Inherit the service name from whatever parent exists. # eg. in a worker, a perform_job parent span will exist with the worker # service. - with core.context_with_data("rq.job.perform", span_name="rq.job.perform", resource=job.func_name, call_key="job.perform", pin=pin) as ctx, ctx[ - ctx["call_key"] - ] as span: + with core.context_with_data( + "rq.job.perform", span_name="rq.job.perform", resource=job.func_name, call_key="job.perform", pin=pin + ) as ctx, ctx[ctx["call_key"]] as span: span.set_tag_str(COMPONENT, config.rq.integration_name) span.set_tag("job.id", job.get_id()) From 63646465d5095bdcf66edeacb69790be3c9e87b2 Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Tue, 10 Sep 2024 16:55:04 -0700 Subject: [PATCH 04/16] refactoring code --- ddtrace/contrib/rq/__init__.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index bf566e38c14..74197b97158 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -217,7 +217,8 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): finally: # Force flush to agent since the process `os.exit()`s # immediately after this method returns - pin.tracer.flush() + # pin.tracer.flush() + ctx["pin"].tracer.flush() @trace_utils.with_traced_module From 3903fbe8f790a57ea8987f64fb6f73f6a94a6527 Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Tue, 10 Sep 2024 16:56:14 -0700 Subject: [PATCH 05/16] refactoring code --- ddtrace/contrib/rq/__init__.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index 74197b97158..4b4d36385de 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -217,7 +217,6 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): finally: # Force flush to agent since the process `os.exit()`s # immediately after this method returns - # pin.tracer.flush() ctx["pin"].tracer.flush() From c1bdbab21d587bffb196ea5257a594e5bc0a9b45 Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Wed, 11 Sep 2024 11:20:02 -0700 Subject: [PATCH 06/16] refactoring rq --- ddtrace/contrib/rq/__init__.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index 4b4d36385de..d986e56e38e 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -185,10 +185,10 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): # `perform_job` is executed in a freshly forked, short-lived instance job = get_argument_value(args, kwargs, 0, "job") - if config.rq_worker.distributed_tracing_enabled: - ctx = HTTPPropagator.extract(job.meta) - if ctx.trace_id: - pin.tracer.context_provider.activate(ctx) + # if config.rq_worker.distributed_tracing_enabled: + # ctx = HTTPPropagator.extract(job.meta) + # if ctx.trace_id: + # pin.tracer.context_provider.activate(ctx) try: with core.context_with_data( @@ -199,6 +199,9 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): span_type=SpanTypes.WORKER, resource=job.func_name, call_key="worker.perform_job", + distributed_headers_config=str(config.rq_worker.distributed_tracing_enabled), + distributed_headers=job.meta, + # distributed_context=, ) as ctx, ctx[ctx["call_key"]] as span: span.set_tag_str(COMPONENT, config.rq.integration_name) From 3150b114ef4f0ecb735a39375cd6beab7e2eb7e8 Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Thu, 12 Sep 2024 16:12:40 -0700 Subject: [PATCH 07/16] refactored code and resolved distribution trace issue --- ddtrace/contrib/rq/__init__.py | 51 ++++++++++++---------------------- 1 file changed, 17 insertions(+), 34 deletions(-) diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index d986e56e38e..0a16da85820 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -88,6 +88,7 @@ from ddtrace.internal.schema.span_attribute_schema import SpanDirection from ddtrace.internal.utils import get_argument_value from ddtrace.internal.utils.formats import asbool +from ddtrace._trace.trace_handlers import _start_span from ...ext import SpanKind from ...ext import SpanTypes @@ -145,16 +146,9 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): resource=resource, span_type=SpanTypes.WORKER, call_key="queue.enqueue_job", + tags={ COMPONENT: config.rq.integration_name, SPAN_KIND:SpanKind.PRODUCER, "queue.name":instance.name, "job.id":job.get_id(), "job.func_name":job.func_name }, ) as ctx, ctx[ctx["call_key"]] as span: - span.set_tag_str(COMPONENT, config.rq.integration_name) - - # set span.kind to the type of request being performed - span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) - - span.set_tag_str("queue.name", instance.name) - span.set_tag_str("job.id", job.get_id()) - span.set_tag_str("job.func_name", job.func_name) - + # If the queue is_async then add distributed tracing headers to the job if instance.is_async and config.rq.distributed_tracing_enabled: HTTPPropagator.inject(span.context, job.meta) @@ -163,6 +157,7 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): @trace_utils.with_traced_module def traced_queue_fetch_job(rq, pin, func, instance, args, kwargs): + job_id = get_argument_value(args, kwargs, 0, "job_id") with core.context_with_data( "rq.traced_queue_fetch_job", span_name=schematize_messaging_operation( @@ -171,11 +166,8 @@ def traced_queue_fetch_job(rq, pin, func, instance, args, kwargs): pin=pin, service=trace_utils.int_service(pin, config.rq), call_key="traced_queue_fetch_job", + tags={COMPONENT:config.rq.integration_name,"job.id":job_id}, ) as ctx, ctx[ctx["call_key"]] as span: - span.set_tag_str(COMPONENT, config.rq.integration_name) - - job_id = get_argument_value(args, kwargs, 0, "job_id") - span.set_tag_str("job.id", job_id) return func(*args, **kwargs) @@ -184,12 +176,7 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): """Trace rq.Worker.perform_job""" # `perform_job` is executed in a freshly forked, short-lived instance job = get_argument_value(args, kwargs, 0, "job") - - # if config.rq_worker.distributed_tracing_enabled: - # ctx = HTTPPropagator.extract(job.meta) - # if ctx.trace_id: - # pin.tracer.context_provider.activate(ctx) - + try: with core.context_with_data( "rq.worker.perform_job", @@ -199,15 +186,10 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): span_type=SpanTypes.WORKER, resource=job.func_name, call_key="worker.perform_job", - distributed_headers_config=str(config.rq_worker.distributed_tracing_enabled), + distributed_headers_config=config.rq_worker, distributed_headers=job.meta, - # distributed_context=, + tags={COMPONENT:config.rq.integration_name, SPAN_KIND:SpanKind.CONSUMER,"job.id":job.get_id()}, ) as ctx, ctx[ctx["call_key"]] as span: - span.set_tag_str(COMPONENT, config.rq.integration_name) - - # set span.kind to the type of request being performed - span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) - span.set_tag_str("job.id", job.get_id()) try: return func(*args, **kwargs) @@ -217,6 +199,7 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): span.set_tag_str("job.origin", job.origin) if job.is_failed: span.error = 1 + finally: # Force flush to agent since the process `os.exit()`s # immediately after this method returns @@ -232,17 +215,20 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs): # eg. in a worker, a perform_job parent span will exist with the worker # service. with core.context_with_data( - "rq.job.perform", span_name="rq.job.perform", resource=job.func_name, call_key="job.perform", pin=pin + "rq.job.perform", + span_name="rq.job.perform", + resource=job.func_name, + call_key="job.perform", + pin=pin, + tags={COMPONENT:config.rq.integration_name,"job.id":job.get_id()}, ) as ctx, ctx[ctx["call_key"]] as span: - span.set_tag_str(COMPONENT, config.rq.integration_name) - - span.set_tag("job.id", job.get_id()) return func(*args, **kwargs) @trace_utils.with_traced_module def traced_job_fetch_many(rq, pin, func, instance, args, kwargs): """Trace rq.Job.fetch_many(...)""" + job_ids = get_argument_value(args, kwargs, 0, "job_ids") with core.context_with_data( "rq.job.fetch_many", span_name=schematize_messaging_operation( @@ -251,11 +237,8 @@ def traced_job_fetch_many(rq, pin, func, instance, args, kwargs): service=trace_utils.ext_service(pin, config.rq_worker), call_key="job.fetch_many", pin=pin, + tags={COMPONENT:config.rq.integration_name,"job_ids":job_ids}, ) as ctx, ctx[ctx["call_key"]] as span: - span.set_tag_str(COMPONENT, config.rq.integration_name) - - job_ids = get_argument_value(args, kwargs, 0, "job_ids") - span.set_tag("job_ids", job_ids) return func(*args, **kwargs) From 5b174ceaacc94ec914e9068cba470f9823854652 Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Thu, 12 Sep 2024 16:13:13 -0700 Subject: [PATCH 08/16] passed lint --- ddtrace/contrib/rq/__init__.py | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index 0a16da85820..c4f3bce5b4c 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -80,6 +80,7 @@ from ddtrace import Pin from ddtrace import config +from ddtrace._trace.trace_handlers import _start_span from ddtrace.constants import SPAN_KIND from ddtrace.internal import core from ddtrace.internal.constants import COMPONENT @@ -88,7 +89,6 @@ from ddtrace.internal.schema.span_attribute_schema import SpanDirection from ddtrace.internal.utils import get_argument_value from ddtrace.internal.utils.formats import asbool -from ddtrace._trace.trace_handlers import _start_span from ...ext import SpanKind from ...ext import SpanTypes @@ -146,9 +146,14 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): resource=resource, span_type=SpanTypes.WORKER, call_key="queue.enqueue_job", - tags={ COMPONENT: config.rq.integration_name, SPAN_KIND:SpanKind.PRODUCER, "queue.name":instance.name, "job.id":job.get_id(), "job.func_name":job.func_name }, + tags={ + COMPONENT: config.rq.integration_name, + SPAN_KIND: SpanKind.PRODUCER, + "queue.name": instance.name, + "job.id": job.get_id(), + "job.func_name": job.func_name, + }, ) as ctx, ctx[ctx["call_key"]] as span: - # If the queue is_async then add distributed tracing headers to the job if instance.is_async and config.rq.distributed_tracing_enabled: HTTPPropagator.inject(span.context, job.meta) @@ -166,7 +171,7 @@ def traced_queue_fetch_job(rq, pin, func, instance, args, kwargs): pin=pin, service=trace_utils.int_service(pin, config.rq), call_key="traced_queue_fetch_job", - tags={COMPONENT:config.rq.integration_name,"job.id":job_id}, + tags={COMPONENT: config.rq.integration_name, "job.id": job_id}, ) as ctx, ctx[ctx["call_key"]] as span: return func(*args, **kwargs) @@ -176,7 +181,7 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): """Trace rq.Worker.perform_job""" # `perform_job` is executed in a freshly forked, short-lived instance job = get_argument_value(args, kwargs, 0, "job") - + try: with core.context_with_data( "rq.worker.perform_job", @@ -188,9 +193,8 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): call_key="worker.perform_job", distributed_headers_config=config.rq_worker, distributed_headers=job.meta, - tags={COMPONENT:config.rq.integration_name, SPAN_KIND:SpanKind.CONSUMER,"job.id":job.get_id()}, + tags={COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.CONSUMER, "job.id": job.get_id()}, ) as ctx, ctx[ctx["call_key"]] as span: - try: return func(*args, **kwargs) finally: @@ -199,7 +203,7 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): span.set_tag_str("job.origin", job.origin) if job.is_failed: span.error = 1 - + finally: # Force flush to agent since the process `os.exit()`s # immediately after this method returns @@ -215,12 +219,12 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs): # eg. in a worker, a perform_job parent span will exist with the worker # service. with core.context_with_data( - "rq.job.perform", - span_name="rq.job.perform", - resource=job.func_name, - call_key="job.perform", + "rq.job.perform", + span_name="rq.job.perform", + resource=job.func_name, + call_key="job.perform", pin=pin, - tags={COMPONENT:config.rq.integration_name,"job.id":job.get_id()}, + tags={COMPONENT: config.rq.integration_name, "job.id": job.get_id()}, ) as ctx, ctx[ctx["call_key"]] as span: return func(*args, **kwargs) @@ -237,7 +241,7 @@ def traced_job_fetch_many(rq, pin, func, instance, args, kwargs): service=trace_utils.ext_service(pin, config.rq_worker), call_key="job.fetch_many", pin=pin, - tags={COMPONENT:config.rq.integration_name,"job_ids":job_ids}, + tags={COMPONENT: config.rq.integration_name, "job_ids": job_ids}, ) as ctx, ctx[ctx["call_key"]] as span: return func(*args, **kwargs) From b4556a248abffab1930bfff380b36ad64ff9a2a1 Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Fri, 13 Sep 2024 09:47:38 -0700 Subject: [PATCH 09/16] refactored rq integration for core api --- ddtrace/_trace/trace_handlers.py | 11 +++++++++++ ddtrace/contrib/rq/__init__.py | 7 ++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index 813f38e3575..863d09994bc 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -691,6 +691,16 @@ def _on_botocore_patched_bedrock_api_call_success(ctx, reqid, latency, input_tok span.set_tag_str("bedrock.usage.completion_tokens", output_token_count) +def _after_perform_job(ctx, job): + """sets job.status and job.origin span tags after job is performed""" + # get_status() returns None when ttl=0 + span = ctx[ctx["call_key"]] + span.set_tag_str("job.status", job.get_status() or "None") + span.set_tag_str("job.origin", job.origin) + if job.is_failed: + span.error = 1 + + def _on_botocore_bedrock_process_response( ctx: core.ExecutionContext, formatted_response: Dict[str, Any], @@ -830,6 +840,7 @@ def listen(): core.on("test_visibility.enable", _on_test_visibility_enable) core.on("test_visibility.disable", _on_test_visibility_disable) core.on("test_visibility.is_enabled", _on_test_visibility_is_enabled, "is_enabled") + core.on("rq.worker.perform_job", _after_perform_job) for context_name in ( "flask.call", diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index c4f3bce5b4c..7c436a59dfb 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -198,11 +198,8 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): try: return func(*args, **kwargs) finally: - # get_status() returns None when ttl=0 - span.set_tag_str("job.status", job.get_status() or "None") - span.set_tag_str("job.origin", job.origin) - if job.is_failed: - span.error = 1 + # call _after_perform_job handler for job status and origin + core.dispatch("rq.worker.perform_job", [ctx, job]) finally: # Force flush to agent since the process `os.exit()`s From 6e49d6bd1cd9e8fe6f3014f74b398246890ae0a9 Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Fri, 13 Sep 2024 14:29:12 -0700 Subject: [PATCH 10/16] added additional handlers for rq.worker.perform.job and rq.traced_queue_fetch_job spans. --- ddtrace/_trace/trace_handlers.py | 18 ++++++++++++++++-- ddtrace/contrib/rq/__init__.py | 6 ++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index 863d09994bc..585620bffb7 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -691,7 +691,12 @@ def _on_botocore_patched_bedrock_api_call_success(ctx, reqid, latency, input_tok span.set_tag_str("bedrock.usage.completion_tokens", output_token_count) -def _after_perform_job(ctx, job): +def _on_rq_queue_enqueue_job_is_async(ctx, job): + span = ctx[ctx["call_key"]] + HTTPPropagator.inject(span.context, job.meta) + + +def _on_rq_after_perform_job(ctx, job): """sets job.status and job.origin span tags after job is performed""" # get_status() returns None when ttl=0 span = ctx[ctx["call_key"]] @@ -701,6 +706,13 @@ def _after_perform_job(ctx, job): span.error = 1 +def _on_rq_end_of_traced_method_in_fork(ctx, job): + """Force flush to agent since the process `os.exit()`s + immediately after this method returnsf + """ + ctx["pin"].tracer.flush() + + def _on_botocore_bedrock_process_response( ctx: core.ExecutionContext, formatted_response: Dict[str, Any], @@ -840,7 +852,9 @@ def listen(): core.on("test_visibility.enable", _on_test_visibility_enable) core.on("test_visibility.disable", _on_test_visibility_disable) core.on("test_visibility.is_enabled", _on_test_visibility_is_enabled, "is_enabled") - core.on("rq.worker.perform_job", _after_perform_job) + core.on("rq.worker.perform_job", _on_rq_after_perform_job) + core.on("rq.worker.after.perform.job", _on_rq_end_of_traced_method_in_fork) + core.on("rq.queue.enqueue_job", _on_rq_queue_enqueue_job_is_async) for context_name in ( "flask.call", diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index 7c436a59dfb..b1a1d1b3d3b 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -156,7 +156,7 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): ) as ctx, ctx[ctx["call_key"]] as span: # If the queue is_async then add distributed tracing headers to the job if instance.is_async and config.rq.distributed_tracing_enabled: - HTTPPropagator.inject(span.context, job.meta) + core.dispatch("rq.queue.enqueue_job", [ctx, job]) return func(*args, **kwargs) @@ -204,7 +204,9 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): finally: # Force flush to agent since the process `os.exit()`s # immediately after this method returns - ctx["pin"].tracer.flush() + + core.context_with_data("rq.worker.after.perform.job") + core.dispatch("rq.worker.after.perform.job", [ctx, job]) @trace_utils.with_traced_module From b4e2903b1e1c5d5d196a1c779616efb983c673a1 Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Mon, 16 Sep 2024 17:10:35 -0700 Subject: [PATCH 11/16] generalizing handlers rq.worker.perform_job and rq.queue.enqueue_job --- ddtrace/_trace/trace_handlers.py | 34 +++++++++++++++++++++----------- ddtrace/contrib/rq/__init__.py | 16 ++++++++------- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index 585620bffb7..8cddbdfdaf6 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -35,6 +35,7 @@ from ddtrace.internal.logger import get_logger from ddtrace.internal.schema.span_attribute_schema import SpanDirection from ddtrace.internal.utils import http as http_utils +from ddtrace.internal.utils import get_argument_value from ddtrace.propagation.http import HTTPPropagator @@ -691,22 +692,33 @@ def _on_botocore_patched_bedrock_api_call_success(ctx, reqid, latency, input_tok span.set_tag_str("bedrock.usage.completion_tokens", output_token_count) -def _on_rq_queue_enqueue_job_is_async(ctx, job): - span = ctx[ctx["call_key"]] - HTTPPropagator.inject(span.context, job.meta) +def _on_propagate_context(ctx, headers): + config = ctx.get_item("integration_config") + distributed_tracing_enabled = config.distributed_tracing_enabled + call_key = ctx.get_item("call_key") + if call_key is None: + log.warning("call_key not found in ctx") + if distributed_tracing_enabled and call_key: + span = ctx[ctx["call_key"]] + HTTPPropagator.inject(span.context, headers) -def _on_rq_after_perform_job(ctx, job): +def _on_rq_after_perform_job(ctx, job_failed, span_tags): """sets job.status and job.origin span tags after job is performed""" # get_status() returns None when ttl=0 - span = ctx[ctx["call_key"]] - span.set_tag_str("job.status", job.get_status() or "None") - span.set_tag_str("job.origin", job.origin) - if job.is_failed: + call_key = ctx.get_item("call_key") + if call_key: + span = ctx[ctx["call_key"]] + + if job_failed and span: span.error = 1 + if span_tags: + for k in span_tags.keys(): + span.set_tag_str(k, span_tags[k]) + -def _on_rq_end_of_traced_method_in_fork(ctx, job): +def _on_end_of_traced_method_in_fork(ctx): """Force flush to agent since the process `os.exit()`s immediately after this method returnsf """ @@ -853,8 +865,8 @@ def listen(): core.on("test_visibility.disable", _on_test_visibility_disable) core.on("test_visibility.is_enabled", _on_test_visibility_is_enabled, "is_enabled") core.on("rq.worker.perform_job", _on_rq_after_perform_job) - core.on("rq.worker.after.perform.job", _on_rq_end_of_traced_method_in_fork) - core.on("rq.queue.enqueue_job", _on_rq_queue_enqueue_job_is_async) + core.on("rq.worker.after.perform.job", _on_end_of_traced_method_in_fork) + core.on("rq.queue.enqueue_job", _on_propagate_context) for context_name in ( "flask.call", diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index b1a1d1b3d3b..d6128a45afd 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -126,7 +126,7 @@ def get_version(): @trace_utils.with_traced_module def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): job = get_argument_value(args, kwargs, 0, "f") - + func_name = job.func_name job_inst = job.instance job_inst_str = "%s.%s" % (job_inst.__module__, job_inst.__class__.__name__) if job_inst else "" @@ -146,6 +146,7 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): resource=resource, span_type=SpanTypes.WORKER, call_key="queue.enqueue_job", + integration_config=config.rq_worker, tags={ COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.PRODUCER, @@ -155,8 +156,8 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): }, ) as ctx, ctx[ctx["call_key"]] as span: # If the queue is_async then add distributed tracing headers to the job - if instance.is_async and config.rq.distributed_tracing_enabled: - core.dispatch("rq.queue.enqueue_job", [ctx, job]) + if instance.is_async: + core.dispatch("rq.queue.enqueue_job", [ctx, job.meta]) return func(*args, **kwargs) @@ -199,15 +200,16 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): return func(*args, **kwargs) finally: # call _after_perform_job handler for job status and origin - core.dispatch("rq.worker.perform_job", [ctx, job]) + # core.dispatch("rq.worker.perform_job", [ctx, job]) + span_tags = {"job.status":job.get_status() or "None", "job.origin":job.origin} + core.dispatch("rq.worker.perform_job", [ctx, job.is_failed, span_tags]) finally: # Force flush to agent since the process `os.exit()`s # immediately after this method returns - core.context_with_data("rq.worker.after.perform.job") - core.dispatch("rq.worker.after.perform.job", [ctx, job]) - + core.dispatch("rq.worker.after.perform.job", [ctx]) + @trace_utils.with_traced_module def traced_job_perform(rq, pin, func, instance, args, kwargs): From e8ae60d7220648f66738bf42016fb075059eb66c Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Tue, 17 Sep 2024 12:05:02 -0700 Subject: [PATCH 12/16] renamed two of the handlers --- ddtrace/_trace/trace_handlers.py | 18 ++++++++---------- ddtrace/contrib/rq/__init__.py | 6 +++--- 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index 8cddbdfdaf6..8ebe3ebd537 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -34,8 +34,8 @@ from ddtrace.internal.constants import RESPONSE_HEADERS from ddtrace.internal.logger import get_logger from ddtrace.internal.schema.span_attribute_schema import SpanDirection -from ddtrace.internal.utils import http as http_utils from ddtrace.internal.utils import get_argument_value +from ddtrace.internal.utils import http as http_utils from ddtrace.propagation.http import HTTPPropagator @@ -692,27 +692,25 @@ def _on_botocore_patched_bedrock_api_call_success(ctx, reqid, latency, input_tok span.set_tag_str("bedrock.usage.completion_tokens", output_token_count) -def _on_propagate_context(ctx, headers): +def _propagate_context(ctx, headers): config = ctx.get_item("integration_config") distributed_tracing_enabled = config.distributed_tracing_enabled call_key = ctx.get_item("call_key") if call_key is None: log.warning("call_key not found in ctx") - if distributed_tracing_enabled and call_key: - span = ctx[ctx["call_key"]] - HTTPPropagator.inject(span.context, headers) + if distributed_tracing_enabled and call_key: + span = ctx[ctx["call_key"]] + HTTPPropagator.inject(span.context, headers) -def _on_rq_after_perform_job(ctx, job_failed, span_tags): +def _after_job_execution(ctx, job_failed, span_tags): """sets job.status and job.origin span tags after job is performed""" # get_status() returns None when ttl=0 call_key = ctx.get_item("call_key") if call_key: span = ctx[ctx["call_key"]] - if job_failed and span: span.error = 1 - if span_tags: for k in span_tags.keys(): span.set_tag_str(k, span_tags[k]) @@ -864,9 +862,9 @@ def listen(): core.on("test_visibility.enable", _on_test_visibility_enable) core.on("test_visibility.disable", _on_test_visibility_disable) core.on("test_visibility.is_enabled", _on_test_visibility_is_enabled, "is_enabled") - core.on("rq.worker.perform_job", _on_rq_after_perform_job) + core.on("rq.worker.perform_job", _after_job_execution) core.on("rq.worker.after.perform.job", _on_end_of_traced_method_in_fork) - core.on("rq.queue.enqueue_job", _on_propagate_context) + core.on("rq.queue.enqueue_job", _propagate_context) for context_name in ( "flask.call", diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index d6128a45afd..2ace539ad56 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -126,7 +126,7 @@ def get_version(): @trace_utils.with_traced_module def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): job = get_argument_value(args, kwargs, 0, "f") - + func_name = job.func_name job_inst = job.instance job_inst_str = "%s.%s" % (job_inst.__module__, job_inst.__class__.__name__) if job_inst else "" @@ -201,7 +201,7 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): finally: # call _after_perform_job handler for job status and origin # core.dispatch("rq.worker.perform_job", [ctx, job]) - span_tags = {"job.status":job.get_status() or "None", "job.origin":job.origin} + span_tags = {"job.status": job.get_status() or "None", "job.origin": job.origin} core.dispatch("rq.worker.perform_job", [ctx, job.is_failed, span_tags]) finally: @@ -209,7 +209,7 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): # immediately after this method returns core.context_with_data("rq.worker.after.perform.job") core.dispatch("rq.worker.after.perform.job", [ctx]) - + @trace_utils.with_traced_module def traced_job_perform(rq, pin, func, instance, args, kwargs): From 92154a7256fdf4398b7c31f28a8c42025b64cd1b Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Tue, 17 Sep 2024 13:21:58 -0700 Subject: [PATCH 13/16] added constants for some key tags --- ddtrace/contrib/rq/__init__.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index 2ace539ad56..48194c08fa4 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -116,6 +116,11 @@ ) +JOB_ID = "job.id" +QUEUE_NAME = "queue.name" +JOB_FUNC_NAME = "job.func_name" + + def get_version(): # type: () -> str import rq @@ -150,9 +155,9 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): tags={ COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.PRODUCER, - "queue.name": instance.name, - "job.id": job.get_id(), - "job.func_name": job.func_name, + QUEUE_NAME : instance.name, + JOB_ID: job.get_id(), + JOB_FUNC_NAME : job.func_name, }, ) as ctx, ctx[ctx["call_key"]] as span: # If the queue is_async then add distributed tracing headers to the job @@ -172,7 +177,7 @@ def traced_queue_fetch_job(rq, pin, func, instance, args, kwargs): pin=pin, service=trace_utils.int_service(pin, config.rq), call_key="traced_queue_fetch_job", - tags={COMPONENT: config.rq.integration_name, "job.id": job_id}, + tags={COMPONENT: config.rq.integration_name, JOB_ID : job_id}, ) as ctx, ctx[ctx["call_key"]] as span: return func(*args, **kwargs) @@ -194,15 +199,15 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): call_key="worker.perform_job", distributed_headers_config=config.rq_worker, distributed_headers=job.meta, - tags={COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.CONSUMER, "job.id": job.get_id()}, + tags={COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.CONSUMER, JOB_ID: job.get_id()}, ) as ctx, ctx[ctx["call_key"]] as span: try: return func(*args, **kwargs) finally: # call _after_perform_job handler for job status and origin - # core.dispatch("rq.worker.perform_job", [ctx, job]) span_tags = {"job.status": job.get_status() or "None", "job.origin": job.origin} - core.dispatch("rq.worker.perform_job", [ctx, job.is_failed, span_tags]) + job_failed = job.is_failed + core.dispatch("rq.worker.perform_job", [ctx, job_failed, span_tags]) finally: # Force flush to agent since the process `os.exit()`s @@ -225,7 +230,7 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs): resource=job.func_name, call_key="job.perform", pin=pin, - tags={COMPONENT: config.rq.integration_name, "job.id": job.get_id()}, + tags={COMPONENT: config.rq.integration_name, JOB_ID: job.get_id()}, ) as ctx, ctx[ctx["call_key"]] as span: return func(*args, **kwargs) @@ -242,7 +247,7 @@ def traced_job_fetch_many(rq, pin, func, instance, args, kwargs): service=trace_utils.ext_service(pin, config.rq_worker), call_key="job.fetch_many", pin=pin, - tags={COMPONENT: config.rq.integration_name, "job_ids": job_ids}, + tags={COMPONENT: config.rq.integration_name, JOB_ID: job_ids}, ) as ctx, ctx[ctx["call_key"]] as span: return func(*args, **kwargs) From b0d713e532f836ca08261f83f2194da3b2a6d8f5 Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Tue, 17 Sep 2024 15:03:08 -0700 Subject: [PATCH 14/16] added constants for span keys --- ddtrace/contrib/rq/__init__.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index 48194c08fa4..fb90a5f338f 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -155,9 +155,9 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): tags={ COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.PRODUCER, - QUEUE_NAME : instance.name, + QUEUE_NAME: instance.name, JOB_ID: job.get_id(), - JOB_FUNC_NAME : job.func_name, + JOB_FUNC_NAME: job.func_name, }, ) as ctx, ctx[ctx["call_key"]] as span: # If the queue is_async then add distributed tracing headers to the job @@ -177,7 +177,7 @@ def traced_queue_fetch_job(rq, pin, func, instance, args, kwargs): pin=pin, service=trace_utils.int_service(pin, config.rq), call_key="traced_queue_fetch_job", - tags={COMPONENT: config.rq.integration_name, JOB_ID : job_id}, + tags={COMPONENT: config.rq.integration_name, JOB_ID: job_id}, ) as ctx, ctx[ctx["call_key"]] as span: return func(*args, **kwargs) From 4614bd9e3c4aeb2877db63931f590ce6117d409f Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Wed, 18 Sep 2024 10:56:25 -0700 Subject: [PATCH 15/16] fixed linting errors --- ddtrace/_trace/trace_handlers.py | 1 - ddtrace/contrib/rq/__init__.py | 12 +++++------- 2 files changed, 5 insertions(+), 8 deletions(-) diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index 74061387bcc..7917b02ce48 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -34,7 +34,6 @@ from ddtrace.internal.constants import RESPONSE_HEADERS from ddtrace.internal.logger import get_logger from ddtrace.internal.schema.span_attribute_schema import SpanDirection -from ddtrace.internal.utils import get_argument_value from ddtrace.internal.utils import http as http_utils from ddtrace.propagation.http import HTTPPropagator diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index fb90a5f338f..8175690d2e3 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -80,7 +80,6 @@ from ddtrace import Pin from ddtrace import config -from ddtrace._trace.trace_handlers import _start_span from ddtrace.constants import SPAN_KIND from ddtrace.internal import core from ddtrace.internal.constants import COMPONENT @@ -92,7 +91,6 @@ from ...ext import SpanKind from ...ext import SpanTypes -from ...propagation.http import HTTPPropagator from .. import trace_utils @@ -159,7 +157,7 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): JOB_ID: job.get_id(), JOB_FUNC_NAME: job.func_name, }, - ) as ctx, ctx[ctx["call_key"]] as span: + ) as ctx, ctx[ctx["call_key"]] as _: # If the queue is_async then add distributed tracing headers to the job if instance.is_async: core.dispatch("rq.queue.enqueue_job", [ctx, job.meta]) @@ -178,7 +176,7 @@ def traced_queue_fetch_job(rq, pin, func, instance, args, kwargs): service=trace_utils.int_service(pin, config.rq), call_key="traced_queue_fetch_job", tags={COMPONENT: config.rq.integration_name, JOB_ID: job_id}, - ) as ctx, ctx[ctx["call_key"]] as span: + ) as ctx, ctx[ctx["call_key"]] as _: return func(*args, **kwargs) @@ -200,7 +198,7 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): distributed_headers_config=config.rq_worker, distributed_headers=job.meta, tags={COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.CONSUMER, JOB_ID: job.get_id()}, - ) as ctx, ctx[ctx["call_key"]] as span: + ) as ctx, ctx[ctx["call_key"]] as _: try: return func(*args, **kwargs) finally: @@ -231,7 +229,7 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs): call_key="job.perform", pin=pin, tags={COMPONENT: config.rq.integration_name, JOB_ID: job.get_id()}, - ) as ctx, ctx[ctx["call_key"]] as span: + ) as ctx, ctx[ctx["call_key"]] as _: return func(*args, **kwargs) @@ -248,7 +246,7 @@ def traced_job_fetch_many(rq, pin, func, instance, args, kwargs): call_key="job.fetch_many", pin=pin, tags={COMPONENT: config.rq.integration_name, JOB_ID: job_ids}, - ) as ctx, ctx[ctx["call_key"]] as span: + ) as ctx, ctx[ctx["call_key"]] as _: return func(*args, **kwargs) From 50e34a6e205eade6b8b7d03034d273f067090c9f Mon Sep 17 00:00:00 2001 From: Jessica Gamio Date: Mon, 23 Sep 2024 11:28:09 -0700 Subject: [PATCH 16/16] removed as _ in context manager and retained ctx[ctx[call_key]] to maintain the automatic closing behavior. --- ddtrace/contrib/rq/__init__.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/ddtrace/contrib/rq/__init__.py b/ddtrace/contrib/rq/__init__.py index 8175690d2e3..5646a1bce0e 100644 --- a/ddtrace/contrib/rq/__init__.py +++ b/ddtrace/contrib/rq/__init__.py @@ -157,7 +157,7 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs): JOB_ID: job.get_id(), JOB_FUNC_NAME: job.func_name, }, - ) as ctx, ctx[ctx["call_key"]] as _: + ) as ctx, ctx[ctx["call_key"]]: # If the queue is_async then add distributed tracing headers to the job if instance.is_async: core.dispatch("rq.queue.enqueue_job", [ctx, job.meta]) @@ -176,7 +176,7 @@ def traced_queue_fetch_job(rq, pin, func, instance, args, kwargs): service=trace_utils.int_service(pin, config.rq), call_key="traced_queue_fetch_job", tags={COMPONENT: config.rq.integration_name, JOB_ID: job_id}, - ) as ctx, ctx[ctx["call_key"]] as _: + ) as ctx, ctx[ctx["call_key"]]: return func(*args, **kwargs) @@ -198,7 +198,7 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs): distributed_headers_config=config.rq_worker, distributed_headers=job.meta, tags={COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.CONSUMER, JOB_ID: job.get_id()}, - ) as ctx, ctx[ctx["call_key"]] as _: + ) as ctx, ctx[ctx["call_key"]]: try: return func(*args, **kwargs) finally: @@ -229,7 +229,7 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs): call_key="job.perform", pin=pin, tags={COMPONENT: config.rq.integration_name, JOB_ID: job.get_id()}, - ) as ctx, ctx[ctx["call_key"]] as _: + ) as ctx, ctx[ctx["call_key"]]: return func(*args, **kwargs) @@ -246,7 +246,7 @@ def traced_job_fetch_many(rq, pin, func, instance, args, kwargs): call_key="job.fetch_many", pin=pin, tags={COMPONENT: config.rq.integration_name, JOB_ID: job_ids}, - ) as ctx, ctx[ctx["call_key"]] as _: + ) as ctx, ctx[ctx["call_key"]]: return func(*args, **kwargs)