Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(rq): integrated core api to rq integration #10607

Merged
merged 19 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions ddtrace/_trace/trace_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,28 @@ def _on_botocore_patched_bedrock_api_call_success(ctx, reqid, latency, input_tok
span.set_tag_str("bedrock.usage.completion_tokens", output_token_count)


def _on_rq_queue_enqueue_job_is_async(ctx, job):
jessicagamio marked this conversation as resolved.
Show resolved Hide resolved
span = ctx[ctx["call_key"]]
mabdinur marked this conversation as resolved.
Show resolved Hide resolved
HTTPPropagator.inject(span.context, job.meta)


def _on_rq_after_perform_job(ctx, job):
mabdinur marked this conversation as resolved.
Show resolved Hide resolved
"""sets job.status and job.origin span tags after job is performed"""
# get_status() returns None when ttl=0
span = ctx[ctx["call_key"]]
span.set_tag_str("job.status", job.get_status() or "None")
span.set_tag_str("job.origin", job.origin)
if job.is_failed:
span.error = 1


def _on_rq_end_of_traced_method_in_fork(ctx, job):
jessicagamio marked this conversation as resolved.
Show resolved Hide resolved
"""Force flush to agent since the process `os.exit()`s
immediately after this method returnsf
jessicagamio marked this conversation as resolved.
Show resolved Hide resolved
"""
ctx["pin"].tracer.flush()


def _on_botocore_bedrock_process_response(
ctx: core.ExecutionContext,
formatted_response: Dict[str, Any],
Expand Down Expand Up @@ -830,6 +852,9 @@ def listen():
core.on("test_visibility.enable", _on_test_visibility_enable)
core.on("test_visibility.disable", _on_test_visibility_disable)
core.on("test_visibility.is_enabled", _on_test_visibility_is_enabled, "is_enabled")
core.on("rq.worker.perform_job", _on_rq_after_perform_job)
core.on("rq.worker.after.perform.job", _on_rq_end_of_traced_method_in_fork)
core.on("rq.queue.enqueue_job", _on_rq_queue_enqueue_job_is_async)

for context_name in (
"flask.call",
Expand All @@ -848,6 +873,11 @@ def listen():
"botocore.patched_stepfunctions_api_call",
"botocore.patched_bedrock_api_call",
"redis.command",
"rq.queue.enqueue_job",
"rq.traced_queue_fetch_job",
"rq.worker.perform_job",
"rq.job.perform",
"rq.job.fetch_many",
):
core.on(f"context.started.start_span.{context_name}", _start_span)

Expand Down
109 changes: 60 additions & 49 deletions ddtrace/contrib/rq/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,9 @@

from ddtrace import Pin
from ddtrace import config
from ddtrace._trace.trace_handlers import _start_span
from ddtrace.constants import SPAN_KIND
from ddtrace.internal import core
from ddtrace.internal.constants import COMPONENT
from ddtrace.internal.schema import schematize_messaging_operation
from ddtrace.internal.schema import schematize_service_name
Expand Down Expand Up @@ -134,37 +136,43 @@ def traced_queue_enqueue_job(rq, pin, func, instance, args, kwargs):
else:
resource = func_name

with pin.tracer.trace(
schematize_messaging_operation("rq.queue.enqueue_job", provider="rq", direction=SpanDirection.OUTBOUND),
with core.context_with_data(
"rq.queue.enqueue_job",
span_name=schematize_messaging_operation(
"rq.queue.enqueue_job", provider="rq", direction=SpanDirection.OUTBOUND
),
pin=pin,
service=trace_utils.int_service(pin, config.rq),
resource=resource,
span_type=SpanTypes.WORKER,
) as span:
span.set_tag_str(COMPONENT, config.rq.integration_name)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER)

span.set_tag_str("queue.name", instance.name)
span.set_tag_str("job.id", job.get_id())
span.set_tag_str("job.func_name", job.func_name)

call_key="queue.enqueue_job",
tags={
COMPONENT: config.rq.integration_name,
SPAN_KIND: SpanKind.PRODUCER,
"queue.name": instance.name,
mabdinur marked this conversation as resolved.
Show resolved Hide resolved
"job.id": job.get_id(),
"job.func_name": job.func_name,
},
) as ctx, ctx[ctx["call_key"]] as span:
# If the queue is_async then add distributed tracing headers to the job
if instance.is_async and config.rq.distributed_tracing_enabled:
HTTPPropagator.inject(span.context, job.meta)
core.dispatch("rq.queue.enqueue_job", [ctx, job])
mabdinur marked this conversation as resolved.
Show resolved Hide resolved
return func(*args, **kwargs)


@trace_utils.with_traced_module
def traced_queue_fetch_job(rq, pin, func, instance, args, kwargs):
with pin.tracer.trace(
schematize_messaging_operation("rq.queue.fetch_job", provider="rq", direction=SpanDirection.PROCESSING),
job_id = get_argument_value(args, kwargs, 0, "job_id")
with core.context_with_data(
"rq.traced_queue_fetch_job",
span_name=schematize_messaging_operation(
"rq.queue.fetch_job", provider="rq", direction=SpanDirection.PROCESSING
),
pin=pin,
service=trace_utils.int_service(pin, config.rq),
) as span:
span.set_tag_str(COMPONENT, config.rq.integration_name)

job_id = get_argument_value(args, kwargs, 0, "job_id")
span.set_tag_str("job.id", job_id)
call_key="traced_queue_fetch_job",
tags={COMPONENT: config.rq.integration_name, "job.id": job_id},
) as ctx, ctx[ctx["call_key"]] as span:
return func(*args, **kwargs)


Expand All @@ -174,35 +182,31 @@ def traced_perform_job(rq, pin, func, instance, args, kwargs):
# `perform_job` is executed in a freshly forked, short-lived instance
job = get_argument_value(args, kwargs, 0, "job")

if config.rq_worker.distributed_tracing_enabled:
ctx = HTTPPropagator.extract(job.meta)
if ctx.trace_id:
pin.tracer.context_provider.activate(ctx)

try:
with pin.tracer.trace(
with core.context_with_data(
"rq.worker.perform_job",
span_name="rq.worker.perform_job",
service=trace_utils.int_service(pin, config.rq_worker),
pin=pin,
span_type=SpanTypes.WORKER,
resource=job.func_name,
) as span:
span.set_tag_str(COMPONENT, config.rq.integration_name)

# set span.kind to the type of request being performed
span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER)
span.set_tag_str("job.id", job.get_id())
call_key="worker.perform_job",
distributed_headers_config=config.rq_worker,
distributed_headers=job.meta,
tags={COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.CONSUMER, "job.id": job.get_id()},
) as ctx, ctx[ctx["call_key"]] as span:
try:
return func(*args, **kwargs)
finally:
# get_status() returns None when ttl=0
span.set_tag_str("job.status", job.get_status() or "None")
span.set_tag_str("job.origin", job.origin)
if job.is_failed:
span.error = 1
# call _after_perform_job handler for job status and origin
core.dispatch("rq.worker.perform_job", [ctx, job])

finally:
# Force flush to agent since the process `os.exit()`s
# immediately after this method returns
pin.tracer.flush()

jessicagamio marked this conversation as resolved.
Show resolved Hide resolved
core.context_with_data("rq.worker.after.perform.job")
jessicagamio marked this conversation as resolved.
Show resolved Hide resolved
core.dispatch("rq.worker.after.perform.job", [ctx, job])


@trace_utils.with_traced_module
Expand All @@ -213,24 +217,31 @@ def traced_job_perform(rq, pin, func, instance, args, kwargs):
# Inherit the service name from whatever parent exists.
# eg. in a worker, a perform_job parent span will exist with the worker
# service.
with pin.tracer.trace("rq.job.perform", resource=job.func_name) as span:
span.set_tag_str(COMPONENT, config.rq.integration_name)

span.set_tag("job.id", job.get_id())
with core.context_with_data(
"rq.job.perform",
span_name="rq.job.perform",
resource=job.func_name,
call_key="job.perform",
pin=pin,
tags={COMPONENT: config.rq.integration_name, "job.id": job.get_id()},
) as ctx, ctx[ctx["call_key"]] as span:
return func(*args, **kwargs)


@trace_utils.with_traced_module
def traced_job_fetch_many(rq, pin, func, instance, args, kwargs):
"""Trace rq.Job.fetch_many(...)"""
with pin.tracer.trace(
schematize_messaging_operation("rq.job.fetch_many", provider="rq", direction=SpanDirection.PROCESSING),
job_ids = get_argument_value(args, kwargs, 0, "job_ids")
with core.context_with_data(
"rq.job.fetch_many",
span_name=schematize_messaging_operation(
"rq.job.fetch_many", provider="rq", direction=SpanDirection.PROCESSING
),
service=trace_utils.ext_service(pin, config.rq_worker),
) as span:
span.set_tag_str(COMPONENT, config.rq.integration_name)

job_ids = get_argument_value(args, kwargs, 0, "job_ids")
span.set_tag("job_ids", job_ids)
call_key="job.fetch_many",
pin=pin,
tags={COMPONENT: config.rq.integration_name, "job_ids": job_ids},
) as ctx, ctx[ctx["call_key"]] as span:
return func(*args, **kwargs)


Expand Down
Loading