-
Notifications
You must be signed in to change notification settings - Fork 408
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
chore(integrations): move httplib,httpx,jinja2,kafka,kombu to internal (
#10166) - Moves all integration internals in ddtrace/contrib/(integration name)/ to ddtrace/contrib/internal/(integration name)/ for httplib, httpx, jinja2, kafka, and kombu - Ensures ddtrace/contrib/(integration name)/ and ddtrace/contrib/(integration name)/ continue to expose the same functions, classes, imports, and module level variables (via from ..internal.integration.module import * imports). - Log a deprecation warning if internal modules in ddtrace/contrib/(integration name)/ and ddtrace/contrib/(integration name)/. Only patch and unpack methods should be exposed by these packages. - #9996 ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)
- Loading branch information
1 parent
cb13a4c
commit d22c6c1
Showing
23 changed files
with
1,194 additions
and
1,088 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,241 +1,4 @@ | ||
import functools | ||
import os | ||
import sys | ||
from ..internal.httplib.patch import * # noqa: F401,F403 | ||
|
||
from ddtrace import config | ||
from ddtrace.appsec._common_module_patches import wrapped_request_D8CB81E472AF98A2 as _wrap_request_asm | ||
from ddtrace.internal.constants import COMPONENT | ||
from ddtrace.internal.schema.span_attribute_schema import SpanDirection | ||
from ddtrace.settings.asm import config as asm_config | ||
from ddtrace.vendor import wrapt | ||
|
||
from ...constants import ANALYTICS_SAMPLE_RATE_KEY | ||
from ...constants import SPAN_KIND | ||
from ...ext import SpanKind | ||
from ...ext import SpanTypes | ||
from ...internal.compat import httplib | ||
from ...internal.compat import parse | ||
from ...internal.constants import _HTTPLIB_NO_TRACE_REQUEST | ||
from ...internal.logger import get_logger | ||
from ...internal.schema import schematize_url_operation | ||
from ...internal.utils.formats import asbool | ||
from ...pin import Pin | ||
from ...propagation.http import HTTPPropagator | ||
from .. import trace_utils | ||
from ..trace_utils import unwrap as _u | ||
|
||
|
||
span_name = "http.client.request" | ||
span_name = schematize_url_operation(span_name, protocol="http", direction=SpanDirection.OUTBOUND) | ||
|
||
log = get_logger(__name__) | ||
|
||
|
||
config._add( | ||
"httplib", | ||
{ | ||
"distributed_tracing": asbool(os.getenv("DD_HTTPLIB_DISTRIBUTED_TRACING", default=True)), | ||
"default_http_tag_query_string": os.getenv("DD_HTTP_CLIENT_TAG_QUERY_STRING", "true"), | ||
}, | ||
) | ||
|
||
|
||
def get_version(): | ||
# type: () -> str | ||
return "" | ||
|
||
|
||
def _wrap_init(func, instance, args, kwargs): | ||
Pin(service=None, _config=config.httplib).onto(instance) | ||
return func(*args, **kwargs) | ||
|
||
|
||
def _wrap_getresponse(func, instance, args, kwargs): | ||
# Use any attached tracer if available, otherwise use the global tracer | ||
pin = Pin.get_from(instance) | ||
if not pin or not pin.enabled(): | ||
return func(*args, **kwargs) | ||
|
||
resp = None | ||
try: | ||
resp = func(*args, **kwargs) | ||
return resp | ||
finally: | ||
try: | ||
# Get the span attached to this instance, if available | ||
span = getattr(instance, "_datadog_span", None) | ||
if span: | ||
if resp: | ||
trace_utils.set_http_meta( | ||
span, config.httplib, status_code=resp.status, response_headers=resp.getheaders() | ||
) | ||
|
||
span.finish() | ||
delattr(instance, "_datadog_span") | ||
except Exception: | ||
log.debug("error applying request tags", exc_info=True) | ||
|
||
|
||
def _call_asm_wrap(func, instance, *args, **kwargs): | ||
_wrap_request_asm(func, instance, args, kwargs) | ||
|
||
|
||
def _wrap_request(func, instance, args, kwargs): | ||
# Use any attached tracer if available, otherwise use the global tracer | ||
if asm_config._iast_enabled or asm_config._asm_enabled: | ||
func_to_call = functools.partial(_call_asm_wrap, func, instance) | ||
else: | ||
func_to_call = func | ||
|
||
pin = Pin.get_from(instance) | ||
if should_skip_request(pin, instance): | ||
return func_to_call(*args, **kwargs) | ||
|
||
cfg = config.get_from(instance) | ||
|
||
try: | ||
# Create a new span and attach to this instance (so we can retrieve/update/close later on the response) | ||
span = pin.tracer.trace(span_name, span_type=SpanTypes.HTTP) | ||
|
||
span.set_tag_str(COMPONENT, config.httplib.integration_name) | ||
|
||
# set span.kind to the type of operation being performed | ||
span.set_tag_str(SPAN_KIND, SpanKind.CLIENT) | ||
|
||
instance._datadog_span = span | ||
|
||
# propagate distributed tracing headers | ||
if cfg.get("distributed_tracing"): | ||
if len(args) > 3: | ||
headers = args[3] | ||
else: | ||
headers = kwargs.setdefault("headers", {}) | ||
HTTPPropagator.inject(span.context, headers) | ||
except Exception: | ||
log.debug("error configuring request", exc_info=True) | ||
span = getattr(instance, "_datadog_span", None) | ||
if span: | ||
span.finish() | ||
|
||
try: | ||
return func_to_call(*args, **kwargs) | ||
except Exception: | ||
span = getattr(instance, "_datadog_span", None) | ||
exc_info = sys.exc_info() | ||
if span: | ||
span.set_exc_info(*exc_info) | ||
span.finish() | ||
raise | ||
|
||
|
||
def _wrap_putrequest(func, instance, args, kwargs): | ||
# Use any attached tracer if available, otherwise use the global tracer | ||
pin = Pin.get_from(instance) | ||
if should_skip_request(pin, instance): | ||
return func(*args, **kwargs) | ||
|
||
try: | ||
if hasattr(instance, "_datadog_span"): | ||
# Reuse an existing span set in _wrap_request | ||
span = instance._datadog_span | ||
else: | ||
# Create a new span and attach to this instance (so we can retrieve/update/close later on the response) | ||
span = pin.tracer.trace(span_name, span_type=SpanTypes.HTTP) | ||
|
||
span.set_tag_str(COMPONENT, config.httplib.integration_name) | ||
|
||
# set span.kind to the type of operation being performed | ||
span.set_tag_str(SPAN_KIND, SpanKind.CLIENT) | ||
|
||
instance._datadog_span = span | ||
|
||
method, path = args[:2] | ||
scheme = "https" if isinstance(instance, httplib.HTTPSConnection) else "http" | ||
port = ":{port}".format(port=instance.port) | ||
|
||
if (scheme == "http" and instance.port == 80) or (scheme == "https" and instance.port == 443): | ||
port = "" | ||
url = "{scheme}://{host}{port}{path}".format(scheme=scheme, host=instance.host, port=port, path=path) | ||
|
||
# sanitize url | ||
parsed = parse.urlparse(url) | ||
sanitized_url = parse.urlunparse( | ||
(parsed.scheme, parsed.netloc, parsed.path, parsed.params, None, parsed.fragment) # drop query | ||
) | ||
trace_utils.set_http_meta( | ||
span, config.httplib, method=method, url=sanitized_url, target_host=instance.host, query=parsed.query | ||
) | ||
|
||
# set analytics sample rate | ||
span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config.httplib.get_analytics_sample_rate()) | ||
except Exception: | ||
log.debug("error applying request tags", exc_info=True) | ||
|
||
# Close the span to prevent memory leaks. | ||
span = getattr(instance, "_datadog_span", None) | ||
if span: | ||
span.finish() | ||
|
||
try: | ||
return func(*args, **kwargs) | ||
except Exception: | ||
span = getattr(instance, "_datadog_span", None) | ||
exc_info = sys.exc_info() | ||
if span: | ||
span.set_exc_info(*exc_info) | ||
span.finish() | ||
raise | ||
|
||
|
||
def _wrap_putheader(func, instance, args, kwargs): | ||
span = getattr(instance, "_datadog_span", None) | ||
if span: | ||
request_headers = {args[0]: args[1]} | ||
trace_utils.set_http_meta(span, config.httplib, request_headers=request_headers) | ||
|
||
return func(*args, **kwargs) | ||
|
||
|
||
def should_skip_request(pin, request): | ||
"""Helper to determine if the provided request should be traced""" | ||
if getattr(request, _HTTPLIB_NO_TRACE_REQUEST, False): | ||
return True | ||
|
||
if not pin or not pin.enabled(): | ||
return True | ||
|
||
# httplib is used to send apm events (profiling,di, tracing, etc.) to the datadog agent | ||
# Tracing these requests introduces a significant noise and instability in ddtrace tests. | ||
# TO DO: Avoid tracing requests to APM internal services (ie: extend this functionality to agentless products). | ||
agent_url = pin.tracer.agent_trace_url | ||
if agent_url: | ||
parsed = parse.urlparse(agent_url) | ||
return request.host == parsed.hostname and request.port == parsed.port | ||
return False | ||
|
||
|
||
def patch(): | ||
"""patch the built-in urllib/httplib/httplib.client methods for tracing""" | ||
if getattr(httplib, "__datadog_patch", False): | ||
return | ||
httplib.__datadog_patch = True | ||
|
||
# Patch the desired methods | ||
httplib.HTTPConnection.__init__ = wrapt.FunctionWrapper(httplib.HTTPConnection.__init__, _wrap_init) | ||
httplib.HTTPConnection.getresponse = wrapt.FunctionWrapper(httplib.HTTPConnection.getresponse, _wrap_getresponse) | ||
httplib.HTTPConnection.request = wrapt.FunctionWrapper(httplib.HTTPConnection.request, _wrap_request) | ||
httplib.HTTPConnection.putrequest = wrapt.FunctionWrapper(httplib.HTTPConnection.putrequest, _wrap_putrequest) | ||
httplib.HTTPConnection.putheader = wrapt.FunctionWrapper(httplib.HTTPConnection.putheader, _wrap_putheader) | ||
|
||
|
||
def unpatch(): | ||
"""unpatch any previously patched modules""" | ||
if not getattr(httplib, "__datadog_patch", False): | ||
return | ||
httplib.__datadog_patch = False | ||
|
||
_u(httplib.HTTPConnection, "__init__") | ||
_u(httplib.HTTPConnection, "getresponse") | ||
_u(httplib.HTTPConnection, "request") | ||
_u(httplib.HTTPConnection, "putrequest") | ||
_u(httplib.HTTPConnection, "putheader") | ||
# TODO: deprecate and remove this module |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.