-
Notifications
You must be signed in to change notification settings - Fork 408
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore(rq): integrated core api to rq integration #10607
base: main
Are you sure you want to change the base?
Conversation
|
Datadog ReportBranch report: ✅ 0 Failed, 592 Passed, 694 Skipped, 39m 15.39s Total duration (34m 54.2s time saved) |
BenchmarksBenchmark execution time: 2024-09-23 19:06:06 Comparing candidate commit 50e34a6 in PR branch Found 0 performance improvements and 0 performance regressions! Performance is the same for 353 metrics, 47 unstable metrics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aside from the one note I left, this looks great. Nice work so far. Please also add a brief description to the pull request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let some nits but overall this looks really good. Great job!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests are passing: https://gitlab.ddbuild.io/DataDog/apm-reliability/dd-trace-py/-/jobs/642791801.
This change looks good to me. Thanks for sticking with it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks great. Seems like you've got your head around how the Core API works. There are a few bits that should be changed, but the main body of this change is ready to go.
config = ctx.get_item("integration_config") | ||
distributed_tracing_enabled = config.distributed_tracing_enabled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this code is written with the (acceptable) assumption that integration_config
will always exist, making that assumption more explicit would clarify the code. Example:
config = ctx.get_item("integration_config") | |
distributed_tracing_enabled = config.distributed_tracing_enabled | |
distributed_tracing_enabled = ctx["integration_config"].distributed_tracing_enabled |
This code throws a KeyError
if that key isn't present, as opposed to the less-clear NoneType has no attribute...
error that the current version would throw.
span.error = 1 | ||
if span_tags: | ||
for k in span_tags.keys(): | ||
span.set_tag_str(k, span_tags[k]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This code fails if span is None
|
||
def _on_end_of_traced_method_in_fork(ctx): | ||
"""Force flush to agent since the process `os.exit()`s | ||
immediately after this method returnsf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
immediately after this method returnsf | |
immediately after this method returns |
ddtrace/contrib/rq/__init__.py
Outdated
span.set_tag_str("job.id", job_id) | ||
call_key="traced_queue_fetch_job", | ||
tags={COMPONENT: config.rq.integration_name, JOB_ID: job_id}, | ||
) as ctx, ctx[ctx["call_key"]] as _: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
) as ctx, ctx[ctx["call_key"]] as _: | |
) as ctx, ctx[ctx["call_key"]]: |
This does the same thing
ddtrace/contrib/rq/__init__.py
Outdated
distributed_headers_config=config.rq_worker, | ||
distributed_headers=job.meta, | ||
tags={COMPONENT: config.rq.integration_name, SPAN_KIND: SpanKind.CONSUMER, JOB_ID: job.get_id()}, | ||
) as ctx, ctx[ctx["call_key"]] as _: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
) as ctx, ctx[ctx["call_key"]] as _: | |
) as ctx, ctx[ctx["call_key"]]: |
finally: | ||
# Force flush to agent since the process `os.exit()`s | ||
# immediately after this method returns | ||
pin.tracer.flush() | ||
core.context_with_data("rq.worker.after.perform.job") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why's this included? The dispatch on the next line seems like it does the job.
ddtrace/contrib/rq/__init__.py
Outdated
call_key="job.perform", | ||
pin=pin, | ||
tags={COMPONENT: config.rq.integration_name, JOB_ID: job.get_id()}, | ||
) as ctx, ctx[ctx["call_key"]] as _: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
) as ctx, ctx[ctx["call_key"]] as _: | |
) as ctx, ctx[ctx["call_key"]]: |
ddtrace/contrib/rq/__init__.py
Outdated
call_key="job.fetch_many", | ||
pin=pin, | ||
tags={COMPONENT: config.rq.integration_name, JOB_ID: job_ids}, | ||
) as ctx, ctx[ctx["call_key"]] as _: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
) as ctx, ctx[ctx["call_key"]] as _: | |
) as ctx, ctx[ctx["call_key"]]: |
…intain the automatic closing behavior.
Implementing the Core API into the existing rq integration.
Checklist
Reviewer Checklist