Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rq): integrated core api to rq integration #10607

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
39 changes: 39 additions & 0 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,37 @@ def _on_botocore_patched_bedrock_api_call_success(ctx, reqid, latency, input_tok
span.set_tag_str("bedrock.usage.completion_tokens", output_token_count)


def _propagate_context(ctx, headers):
config = ctx.get_item("integration_config")
distributed_tracing_enabled = config.distributed_tracing_enabled
Comment on lines +695 to +696
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this code is written with the (acceptable) assumption that integration_config will always exist, making that assumption more explicit would clarify the code. Example:

Suggested change
config = ctx.get_item("integration_config")
distributed_tracing_enabled = config.distributed_tracing_enabled
distributed_tracing_enabled = ctx["integration_config"].distributed_tracing_enabled

This code throws a KeyError if that key isn't present, as opposed to the less-clear NoneType has no attribute... error that the current version would throw.

call_key = ctx.get_item("call_key")
if call_key is None:
log.warning("call_key not found in ctx")
if distributed_tracing_enabled and call_key:
span = ctx[ctx["call_key"]]
HTTPPropagator.inject(span.context, headers)


def _after_job_execution(ctx, job_failed, span_tags):
"""sets job.status and job.origin span tags after job is performed"""
# get_status() returns None when ttl=0
call_key = ctx.get_item("call_key")
if call_key:
span = ctx[ctx["call_key"]]
if job_failed and span:
span.error = 1
if span_tags:
for k in span_tags.keys():
span.set_tag_str(k, span_tags[k])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code fails if span is None



def _on_end_of_traced_method_in_fork(ctx):
"""Force flush to agent since the process `os.exit()`s
immediately after this method returnsf
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
immediately after this method returnsf
immediately after this method returns

"""
ctx["pin"].tracer.flush()


def _on_botocore_bedrock_process_response(
ctx: core.ExecutionContext,
formatted_response: Dict[str, Any],
Expand Down Expand Up @@ -830,6 +861,9 @@ def listen():
core.on("test_visibility.enable", _on_test_visibility_enable)
core.on("test_visibility.disable", _on_test_visibility_disable)
core.on("test_visibility.is_enabled", _on_test_visibility_is_enabled, "is_enabled")
core.on("rq.worker.perform_job", _after_job_execution)
core.on("rq.worker.after.perform.job", _on_end_of_traced_method_in_fork)
core.on("rq.queue.enqueue_job", _propagate_context)

for context_name in (
"flask.call",
Expand All @@ -848,6 +882,11 @@ def listen():
"botocore.patched_stepfunctions_api_call",
"botocore.patched_bedrock_api_call",
"redis.command",
"rq.queue.enqueue_job",
"rq.traced_queue_fetch_job",
"rq.worker.perform_job",
"rq.job.perform",
"rq.job.fetch_many",
):
core.on(f"context.started.start_span.{context_name}", _start_span)

Expand Down
118 changes: 67 additions & 51 deletions ddtrace/contrib/rq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@
from ddtrace import Pin
from ddtrace import config
from ddtrace.constants import SPAN_KIND
from ddtrace.internal import core
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.schema import schematize_messaging_operation
from ddtrace.internal.schema import schematize_service_name
Expand All @@ -90,7 +91,6 @@

from ...ext import SpanKind
from ...ext import SpanTypes
from ...propagation.http import HTTPPropagator
from .. import trace_utils


Expand All @@ -114,6 +114,11 @@
)


JOB_ID = "job.id"
QUEUE_NAME = "queue.name"
JOB_FUNC_NAME = "job.func_name"


def get_version():
# type: () -> str
import rq
Expand All @@ -134,37 +139,44 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs):
else:
resource = func_name

with pin.tracer.trace(
schematize_messaging_operation("rq.queue.enqueue_job", provider="rq", direction=SpanDirection.OUTBOUND),
with core.context_with_data(
"rq.queue.enqueue_job",
span_name=schematize_messaging_operation(
"rq.queue.enqueue_job", provider="rq", direction=SpanDirection.OUTBOUND
),
pin=pin,
service=trace_utils.int_service(pin, config.rq),
resource=resource,
span_type=SpanTypes.WORKER,
) as span:
span.set_tag_str(COMPONENT, config.rq.integration_name)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)

span.set_tag_str("queue.name", instance.name)
span.set_tag_str("job.id", job.get_id())
span.set_tag_str("job.func_name", job.func_name)

call_key="queue.enqueue_job",
integration_config=config.rq_worker,
tags={
COMPONENT: config.rq.integration_name,
SPAN_KIND: SpanKind.PRODUCER,
QUEUE_NAME: instance.name,
JOB_ID: job.get_id(),
JOB_FUNC_NAME: job.func_name,
},
) as ctx, ctx[ctx["call_key"]]:
# If the queue is_async then add distributed tracing headers to the job
if instance.is_async and config.rq.distributed_tracing_enabled:
HTTPPropagator.inject(span.context, job.meta)
if instance.is_async:
core.dispatch("rq.queue.enqueue_job", [ctx, job.meta])
return func(*args, **kwargs)


@trace_utils.with_traced_module
def traced_queue_fetch_job(rq, pin, func, instance, args, kwargs):
with pin.tracer.trace(
schematize_messaging_operation("rq.queue.fetch_job", provider="rq", direction=SpanDirection.PROCESSING),
job_id = get_argument_value(args, kwargs, 0, "job_id")
with core.context_with_data(
"rq.traced_queue_fetch_job",
span_name=schematize_messaging_operation(
"rq.queue.fetch_job", provider="rq", direction=SpanDirection.PROCESSING
),
pin=pin,
service=trace_utils.int_service(pin, config.rq),
) as span:
span.set_tag_str(COMPONENT, config.rq.integration_name)

job_id = get_argument_value(args, kwargs, 0, "job_id")
span.set_tag_str("job.id", job_id)
call_key="traced_queue_fetch_job",
tags={COMPONENT: config.rq.integration_name, JOB_ID: job_id},
) as ctx, ctx[ctx["call_key"]]:
return func(*args, **kwargs)


Expand All @@ -174,35 +186,32 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs):
# `perform_job` is executed in a freshly forked, short-lived instance
job = get_argument_value(args, kwargs, 0, "job")

if config.rq_worker.distributed_tracing_enabled:
ctx = HTTPPropagator.extract(job.meta)
if ctx.trace_id:
pin.tracer.context_provider.activate(ctx)

try:
with pin.tracer.trace(
with core.context_with_data(
"rq.worker.perform_job",
span_name="rq.worker.perform_job",
service=trace_utils.int_service(pin, config.rq_worker),
pin=pin,
span_type=SpanTypes.WORKER,
resource=job.func_name,
) as span:
span.set_tag_str(COMPONENT, config.rq.integration_name)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
span.set_tag_str("job.id", job.get_id())
call_key="worker.perform_job",
distributed_headers_config=config.rq_worker,
distributed_headers=job.meta,
tags={COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.CONSUMER, JOB_ID: job.get_id()},
) as ctx, ctx[ctx["call_key"]]:
try:
return func(*args, **kwargs)
finally:
# get_status() returns None when ttl=0
span.set_tag_str("job.status", job.get_status() or "None")
span.set_tag_str("job.origin", job.origin)
if job.is_failed:
span.error = 1
# call _after_perform_job handler for job status and origin
span_tags = {"job.status": job.get_status() or "None", "job.origin": job.origin}
job_failed = job.is_failed
core.dispatch("rq.worker.perform_job", [ctx, job_failed, span_tags])

finally:
# Force flush to agent since the process `os.exit()`s
# immediately after this method returns
pin.tracer.flush()
core.context_with_data("rq.worker.after.perform.job")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why's this included? The dispatch on the next line seems like it does the job.

core.dispatch("rq.worker.after.perform.job", [ctx])


@trace_utils.with_traced_module
Expand All @@ -213,24 +222,31 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs):
# Inherit the service name from whatever parent exists.
# eg. in a worker, a perform_job parent span will exist with the worker
# service.
with pin.tracer.trace("rq.job.perform", resource=job.func_name) as span:
span.set_tag_str(COMPONENT, config.rq.integration_name)

span.set_tag("job.id", job.get_id())
with core.context_with_data(
"rq.job.perform",
span_name="rq.job.perform",
resource=job.func_name,
call_key="job.perform",
pin=pin,
tags={COMPONENT: config.rq.integration_name, JOB_ID: job.get_id()},
) as ctx, ctx[ctx["call_key"]]:
return func(*args, **kwargs)


@trace_utils.with_traced_module
def traced_job_fetch_many(rq, pin, func, instance, args, kwargs):
"""Trace rq.Job.fetch_many(...)"""
with pin.tracer.trace(
schematize_messaging_operation("rq.job.fetch_many", provider="rq", direction=SpanDirection.PROCESSING),
job_ids = get_argument_value(args, kwargs, 0, "job_ids")
with core.context_with_data(
"rq.job.fetch_many",
span_name=schematize_messaging_operation(
"rq.job.fetch_many", provider="rq", direction=SpanDirection.PROCESSING
),
service=trace_utils.ext_service(pin, config.rq_worker),
) as span:
span.set_tag_str(COMPONENT, config.rq.integration_name)

job_ids = get_argument_value(args, kwargs, 0, "job_ids")
span.set_tag("job_ids", job_ids)
call_key="job.fetch_many",
pin=pin,
tags={COMPONENT: config.rq.integration_name, JOB_ID: job_ids},
) as ctx, ctx[ctx["call_key"]]:
return func(*args, **kwargs)


Expand Down
Loading