From d22c6c1a744d9d33dfd75fab7659008c8453395b Mon Sep 17 00:00:00 2001 From: Rachel Yang Date: Tue, 13 Aug 2024 15:01:14 -0400 Subject: [PATCH] chore(integrations): move httplib,httpx,jinja2,kafka,kombu to internal (#10166) - Moves all integration internals in ddtrace/contrib/(integration name)/ to ddtrace/contrib/internal/(integration name)/ for httplib, httpx, jinja2, kafka, and kombu - Ensures ddtrace/contrib/(integration name)/ and ddtrace/contrib/(integration name)/ continue to expose the same functions, classes, imports, and module level variables (via from ..internal.integration.module import * imports). - Log a deprecation warning if internal modules in ddtrace/contrib/(integration name)/ and ddtrace/contrib/(integration name)/. Only patch and unpack methods should be exposed by these packages. - https://github.com/DataDog/dd-trace-py/pull/9996- ## Checklist - [x] PR author has checked that all the criteria below are met - The PR description includes an overview of the change - The PR description articulates the motivation for the change - The change includes tests OR the PR description describes a testing strategy - The PR description notes risks associated with the change, if any - Newly-added code is easy to change - The change follows the [library release note guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html) - The change includes or references documentation updates if necessary - Backport labels are set (if [applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)) ## Reviewer Checklist - [x] Reviewer has checked that all the criteria below are met - Title is accurate - All changes are related to the pull request's stated goal - Avoids breaking [API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces) changes - Testing strategy adequately addresses listed risks - Newly-added code is easy to change - Release note makes sense to a user of the library - If necessary, author has acknowledged and discussed the performance implications of this PR as reported in the benchmarks PR comment - Backport labels are set in a manner that is consistent with the [release branch maintenance policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting) --- ddtrace/contrib/httplib/__init__.py | 15 +- ddtrace/contrib/httplib/patch.py | 241 +------------- ddtrace/contrib/httpx/__init__.py | 10 +- ddtrace/contrib/httpx/patch.py | 204 +----------- ddtrace/contrib/internal/httplib/patch.py | 240 ++++++++++++++ ddtrace/contrib/internal/httpx/patch.py | 204 ++++++++++++ ddtrace/contrib/internal/jinja2/constants.py | 1 + ddtrace/contrib/internal/jinja2/patch.py | 110 ++++++ ddtrace/contrib/internal/kafka/patch.py | 312 ++++++++++++++++++ ddtrace/contrib/internal/kombu/constants.py | 1 + ddtrace/contrib/internal/kombu/patch.py | 167 ++++++++++ ddtrace/contrib/internal/kombu/utils.py | 49 +++ ddtrace/contrib/jinja2/__init__.py | 10 +- ddtrace/contrib/jinja2/constants.py | 16 +- ddtrace/contrib/jinja2/patch.py | 110 +----- ddtrace/contrib/kafka/__init__.py | 10 +- ddtrace/contrib/kafka/patch.py | 312 +----------------- ddtrace/contrib/kombu/__init__.py | 8 +- ddtrace/contrib/kombu/constants.py | 16 +- ddtrace/contrib/kombu/patch.py | 167 +--------- ddtrace/contrib/kombu/utils.py | 56 +--- ...-to-internal-httplib-475d071a277b18cf.yaml | 12 + tests/.suitespec.json | 11 +- 23 files changed, 1194 insertions(+), 1088 deletions(-) create mode 100644 ddtrace/contrib/internal/httplib/patch.py create mode 100644 ddtrace/contrib/internal/httpx/patch.py create mode 100644 ddtrace/contrib/internal/jinja2/constants.py create mode 100644 ddtrace/contrib/internal/jinja2/patch.py create mode 100644 ddtrace/contrib/internal/kafka/patch.py create mode 100644 ddtrace/contrib/internal/kombu/constants.py create mode 100644 ddtrace/contrib/internal/kombu/patch.py create mode 100644 ddtrace/contrib/internal/kombu/utils.py create mode 100644 releasenotes/notes/move-integrations-to-internal-httplib-475d071a277b18cf.yaml diff --git a/ddtrace/contrib/httplib/__init__.py b/ddtrace/contrib/httplib/__init__.py index b3cdaf9ab6b..f935befd7b3 100644 --- a/ddtrace/contrib/httplib/__init__.py +++ b/ddtrace/contrib/httplib/__init__.py @@ -58,9 +58,18 @@ :ref:`Headers tracing ` is supported for this integration. """ -from .patch import get_version -from .patch import patch -from .patch import unpatch +from ...internal.utils.importlib import require_modules + + +required_modules = ["http.client"] + +with require_modules(required_modules) as missing_modules: + if not missing_modules: + from . import patch as _ # noqa: F401, I001 + + from ..internal.httplib.patch import get_version + from ..internal.httplib.patch import patch + from ..internal.httplib.patch import unpatch __all__ = ["patch", "unpatch", "get_version"] diff --git a/ddtrace/contrib/httplib/patch.py b/ddtrace/contrib/httplib/patch.py index 6d7f695c1e7..a86043c05f4 100644 --- a/ddtrace/contrib/httplib/patch.py +++ b/ddtrace/contrib/httplib/patch.py @@ -1,241 +1,4 @@ -import functools -import os -import sys +from ..internal.httplib.patch import * # noqa: F401,F403 -from ddtrace import config -from ddtrace.appsec._common_module_patches import wrapped_request_D8CB81E472AF98A2 as _wrap_request_asm -from ddtrace.internal.constants import COMPONENT -from ddtrace.internal.schema.span_attribute_schema import SpanDirection -from ddtrace.settings.asm import config as asm_config -from ddtrace.vendor import wrapt -from ...constants import ANALYTICS_SAMPLE_RATE_KEY -from ...constants import SPAN_KIND -from ...ext import SpanKind -from ...ext import SpanTypes -from ...internal.compat import httplib -from ...internal.compat import parse -from ...internal.constants import _HTTPLIB_NO_TRACE_REQUEST -from ...internal.logger import get_logger -from ...internal.schema import schematize_url_operation -from ...internal.utils.formats import asbool -from ...pin import Pin -from ...propagation.http import HTTPPropagator -from .. import trace_utils -from ..trace_utils import unwrap as _u - - -span_name = "http.client.request" -span_name = schematize_url_operation(span_name, protocol="http", direction=SpanDirection.OUTBOUND) - -log = get_logger(__name__) - - -config._add( - "httplib", - { - "distributed_tracing": asbool(os.getenv("DD_HTTPLIB_DISTRIBUTED_TRACING", default=True)), - "default_http_tag_query_string": os.getenv("DD_HTTP_CLIENT_TAG_QUERY_STRING", "true"), - }, -) - - -def get_version(): - # type: () -> str - return "" - - -def _wrap_init(func, instance, args, kwargs): - Pin(service=None, _config=config.httplib).onto(instance) - return func(*args, **kwargs) - - -def _wrap_getresponse(func, instance, args, kwargs): - # Use any attached tracer if available, otherwise use the global tracer - pin = Pin.get_from(instance) - if not pin or not pin.enabled(): - return func(*args, **kwargs) - - resp = None - try: - resp = func(*args, **kwargs) - return resp - finally: - try: - # Get the span attached to this instance, if available - span = getattr(instance, "_datadog_span", None) - if span: - if resp: - trace_utils.set_http_meta( - span, config.httplib, status_code=resp.status, response_headers=resp.getheaders() - ) - - span.finish() - delattr(instance, "_datadog_span") - except Exception: - log.debug("error applying request tags", exc_info=True) - - -def _call_asm_wrap(func, instance, *args, **kwargs): - _wrap_request_asm(func, instance, args, kwargs) - - -def _wrap_request(func, instance, args, kwargs): - # Use any attached tracer if available, otherwise use the global tracer - if asm_config._iast_enabled or asm_config._asm_enabled: - func_to_call = functools.partial(_call_asm_wrap, func, instance) - else: - func_to_call = func - - pin = Pin.get_from(instance) - if should_skip_request(pin, instance): - return func_to_call(*args, **kwargs) - - cfg = config.get_from(instance) - - try: - # Create a new span and attach to this instance (so we can retrieve/update/close later on the response) - span = pin.tracer.trace(span_name, span_type=SpanTypes.HTTP) - - span.set_tag_str(COMPONENT, config.httplib.integration_name) - - # set span.kind to the type of operation being performed - span.set_tag_str(SPAN_KIND, SpanKind.CLIENT) - - instance._datadog_span = span - - # propagate distributed tracing headers - if cfg.get("distributed_tracing"): - if len(args) > 3: - headers = args[3] - else: - headers = kwargs.setdefault("headers", {}) - HTTPPropagator.inject(span.context, headers) - except Exception: - log.debug("error configuring request", exc_info=True) - span = getattr(instance, "_datadog_span", None) - if span: - span.finish() - - try: - return func_to_call(*args, **kwargs) - except Exception: - span = getattr(instance, "_datadog_span", None) - exc_info = sys.exc_info() - if span: - span.set_exc_info(*exc_info) - span.finish() - raise - - -def _wrap_putrequest(func, instance, args, kwargs): - # Use any attached tracer if available, otherwise use the global tracer - pin = Pin.get_from(instance) - if should_skip_request(pin, instance): - return func(*args, **kwargs) - - try: - if hasattr(instance, "_datadog_span"): - # Reuse an existing span set in _wrap_request - span = instance._datadog_span - else: - # Create a new span and attach to this instance (so we can retrieve/update/close later on the response) - span = pin.tracer.trace(span_name, span_type=SpanTypes.HTTP) - - span.set_tag_str(COMPONENT, config.httplib.integration_name) - - # set span.kind to the type of operation being performed - span.set_tag_str(SPAN_KIND, SpanKind.CLIENT) - - instance._datadog_span = span - - method, path = args[:2] - scheme = "https" if isinstance(instance, httplib.HTTPSConnection) else "http" - port = ":{port}".format(port=instance.port) - - if (scheme == "http" and instance.port == 80) or (scheme == "https" and instance.port == 443): - port = "" - url = "{scheme}://{host}{port}{path}".format(scheme=scheme, host=instance.host, port=port, path=path) - - # sanitize url - parsed = parse.urlparse(url) - sanitized_url = parse.urlunparse( - (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None, parsed.fragment) # drop query - ) - trace_utils.set_http_meta( - span, config.httplib, method=method, url=sanitized_url, target_host=instance.host, query=parsed.query - ) - - # set analytics sample rate - span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config.httplib.get_analytics_sample_rate()) - except Exception: - log.debug("error applying request tags", exc_info=True) - - # Close the span to prevent memory leaks. - span = getattr(instance, "_datadog_span", None) - if span: - span.finish() - - try: - return func(*args, **kwargs) - except Exception: - span = getattr(instance, "_datadog_span", None) - exc_info = sys.exc_info() - if span: - span.set_exc_info(*exc_info) - span.finish() - raise - - -def _wrap_putheader(func, instance, args, kwargs): - span = getattr(instance, "_datadog_span", None) - if span: - request_headers = {args[0]: args[1]} - trace_utils.set_http_meta(span, config.httplib, request_headers=request_headers) - - return func(*args, **kwargs) - - -def should_skip_request(pin, request): - """Helper to determine if the provided request should be traced""" - if getattr(request, _HTTPLIB_NO_TRACE_REQUEST, False): - return True - - if not pin or not pin.enabled(): - return True - - # httplib is used to send apm events (profiling,di, tracing, etc.) to the datadog agent - # Tracing these requests introduces a significant noise and instability in ddtrace tests. - # TO DO: Avoid tracing requests to APM internal services (ie: extend this functionality to agentless products). - agent_url = pin.tracer.agent_trace_url - if agent_url: - parsed = parse.urlparse(agent_url) - return request.host == parsed.hostname and request.port == parsed.port - return False - - -def patch(): - """patch the built-in urllib/httplib/httplib.client methods for tracing""" - if getattr(httplib, "__datadog_patch", False): - return - httplib.__datadog_patch = True - - # Patch the desired methods - httplib.HTTPConnection.__init__ = wrapt.FunctionWrapper(httplib.HTTPConnection.__init__, _wrap_init) - httplib.HTTPConnection.getresponse = wrapt.FunctionWrapper(httplib.HTTPConnection.getresponse, _wrap_getresponse) - httplib.HTTPConnection.request = wrapt.FunctionWrapper(httplib.HTTPConnection.request, _wrap_request) - httplib.HTTPConnection.putrequest = wrapt.FunctionWrapper(httplib.HTTPConnection.putrequest, _wrap_putrequest) - httplib.HTTPConnection.putheader = wrapt.FunctionWrapper(httplib.HTTPConnection.putheader, _wrap_putheader) - - -def unpatch(): - """unpatch any previously patched modules""" - if not getattr(httplib, "__datadog_patch", False): - return - httplib.__datadog_patch = False - - _u(httplib.HTTPConnection, "__init__") - _u(httplib.HTTPConnection, "getresponse") - _u(httplib.HTTPConnection, "request") - _u(httplib.HTTPConnection, "putrequest") - _u(httplib.HTTPConnection, "putheader") +# TODO: deprecate and remove this module diff --git a/ddtrace/contrib/httpx/__init__.py b/ddtrace/contrib/httpx/__init__.py index 509847ca3f1..b9116afb861 100644 --- a/ddtrace/contrib/httpx/__init__.py +++ b/ddtrace/contrib/httpx/__init__.py @@ -84,8 +84,12 @@ with require_modules(required_modules) as missing_modules: if not missing_modules: - from .patch import get_version - from .patch import patch - from .patch import unpatch + # Required to allow users to import from `ddtrace.contrib.httpx.patch` directly + from . import patch as _ # noqa: F401, I001 + + # Expose public methods + from ..internal.httpx.patch import get_version + from ..internal.httpx.patch import patch + from ..internal.httpx.patch import unpatch __all__ = ["patch", "unpatch", "get_version"] diff --git a/ddtrace/contrib/httpx/patch.py b/ddtrace/contrib/httpx/patch.py index 11647da55e1..3fd6483a186 100644 --- a/ddtrace/contrib/httpx/patch.py +++ b/ddtrace/contrib/httpx/patch.py @@ -1,204 +1,4 @@ -import os +from ..internal.httpx.patch import * # noqa: F401,F403 -import httpx -from ddtrace import config -from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY -from ddtrace.constants import SPAN_KIND -from ddtrace.constants import SPAN_MEASURED_KEY -from ddtrace.contrib.trace_utils import distributed_tracing_enabled -from ddtrace.contrib.trace_utils import ext_service -from ddtrace.contrib.trace_utils import set_http_meta -from ddtrace.ext import SpanKind -from ddtrace.ext import SpanTypes -from ddtrace.internal.compat import ensure_binary -from ddtrace.internal.compat import ensure_text -from ddtrace.internal.constants import COMPONENT -from ddtrace.internal.schema import schematize_url_operation -from ddtrace.internal.schema.span_attribute_schema import SpanDirection -from ddtrace.internal.utils import get_argument_value -from ddtrace.internal.utils.formats import asbool -from ddtrace.internal.utils.version import parse_version -from ddtrace.internal.utils.wrappers import unwrap as _u -from ddtrace.pin import Pin -from ddtrace.propagation.http import HTTPPropagator -from ddtrace.vendor.wrapt import BoundFunctionWrapper -from ddtrace.vendor.wrapt import wrap_function_wrapper as _w - - -HTTPX_VERSION = parse_version(httpx.__version__) - - -def get_version(): - # type: () -> str - return getattr(httpx, "__version__", "") - - -config._add( - "httpx", - { - "distributed_tracing": asbool(os.getenv("DD_HTTPX_DISTRIBUTED_TRACING", default=True)), - "split_by_domain": asbool(os.getenv("DD_HTTPX_SPLIT_BY_DOMAIN", default=False)), - "default_http_tag_query_string": os.getenv("DD_HTTP_CLIENT_TAG_QUERY_STRING", "true"), - }, -) - - -def _url_to_str(url): - # type: (httpx.URL) -> str - """ - Helper to convert the httpx.URL parts from bytes to a str - """ - # httpx==0.13.0 added URL.raw, removed in httpx==0.23.1. Otherwise, must construct manually - if HTTPX_VERSION < (0, 13, 0): - # Manually construct the same way httpx==0.13 does it: - # https://github.com/encode/httpx/blob/2c2c6a71a9ff520d237f8283a586df2753f01f5e/httpx/_models.py#L161 - scheme = url.scheme.encode("ascii") - host = url.host.encode("ascii") - port = url.port - raw_path = url.full_path.encode("ascii") - elif HTTPX_VERSION < (0, 23, 1): - scheme, host, port, raw_path = url.raw - else: - scheme = url.raw_scheme - host = url.raw_host - port = url.port - raw_path = url.raw_path - url = scheme + b"://" + host - if port is not None: - url += b":" + ensure_binary(str(port)) - url += raw_path - return ensure_text(url) - - -def _get_service_name(pin, request): - # type: (Pin, httpx.Request) -> typing.Text - if config.httpx.split_by_domain: - if hasattr(request.url, "netloc"): - return ensure_text(request.url.netloc, errors="backslashreplace") - else: - service = ensure_binary(request.url.host) - if request.url.port: - service += b":" + ensure_binary(str(request.url.port)) - return ensure_text(service, errors="backslashreplace") - return ext_service(pin, config.httpx) - - -def _init_span(span, request): - # type: (Span, httpx.Request) -> None - span.set_tag(SPAN_MEASURED_KEY) - - if distributed_tracing_enabled(config.httpx): - HTTPPropagator.inject(span.context, request.headers) - - sample_rate = config.httpx.get_analytics_sample_rate(use_global_config=True) - if sample_rate is not None: - span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, sample_rate) - - -def _set_span_meta(span, request, response): - # type: (Span, httpx.Request, httpx.Response) -> None - set_http_meta( - span, - config.httpx, - method=request.method, - url=_url_to_str(request.url), - target_host=request.url.host, - status_code=response.status_code if response else None, - query=request.url.query, - request_headers=request.headers, - response_headers=response.headers if response else None, - ) - - -async def _wrapped_async_send( - wrapped: BoundFunctionWrapper, - instance, # type: httpx.AsyncClient - args, # type: typing.Tuple[httpx.Request] - kwargs, # type: typing.Dict[typing.Str, typing.Any] -): - # type: (...) -> typing.Coroutine[None, None, httpx.Response] - req = get_argument_value(args, kwargs, 0, "request") - - pin = Pin.get_from(instance) - if not pin or not pin.enabled(): - return await wrapped(*args, **kwargs) - - operation_name = schematize_url_operation("http.request", protocol="http", direction=SpanDirection.OUTBOUND) - with pin.tracer.trace(operation_name, service=_get_service_name(pin, req), span_type=SpanTypes.HTTP) as span: - span.set_tag_str(COMPONENT, config.httpx.integration_name) - - # set span.kind to the operation type being performed - span.set_tag_str(SPAN_KIND, SpanKind.CLIENT) - - _init_span(span, req) - resp = None - try: - resp = await wrapped(*args, **kwargs) - return resp - finally: - _set_span_meta(span, req, resp) - - -def _wrapped_sync_send( - wrapped: BoundFunctionWrapper, - instance, # type: httpx.AsyncClient - args, # type: typing.Tuple[httpx.Request] - kwargs, # type: typing.Dict[typing.Str, typing.Any] -): - # type: (...) -> httpx.Response - pin = Pin.get_from(instance) - if not pin or not pin.enabled(): - return wrapped(*args, **kwargs) - - req = get_argument_value(args, kwargs, 0, "request") - - operation_name = schematize_url_operation("http.request", protocol="http", direction=SpanDirection.OUTBOUND) - with pin.tracer.trace(operation_name, service=_get_service_name(pin, req), span_type=SpanTypes.HTTP) as span: - span.set_tag_str(COMPONENT, config.httpx.integration_name) - - # set span.kind to the operation type being performed - span.set_tag_str(SPAN_KIND, SpanKind.CLIENT) - - _init_span(span, req) - resp = None - try: - resp = wrapped(*args, **kwargs) - return resp - finally: - _set_span_meta(span, req, resp) - - -def patch(): - # type: () -> None - if getattr(httpx, "_datadog_patch", False): - return - - httpx._datadog_patch = True - - pin = Pin() - - if HTTPX_VERSION >= (0, 11): - # httpx==0.11 created synchronous Client class separate from AsyncClient - _w(httpx.Client, "send", _wrapped_sync_send) - _w(httpx.AsyncClient, "send", _wrapped_async_send) - pin.onto(httpx.AsyncClient) - else: - # httpx==0.9 Client class was asynchronous, httpx==0.10 made Client synonymous with AsyncClient - _w(httpx.Client, "send", _wrapped_async_send) - - pin.onto(httpx.Client) - - -def unpatch(): - # type: () -> None - if not getattr(httpx, "_datadog_patch", False): - return - - httpx._datadog_patch = False - - if HTTPX_VERSION >= (0, 11): - # See above patching code for when this patching occurred - _u(httpx.AsyncClient, "send") - - _u(httpx.Client, "send") +# TODO: deprecate and remove this module diff --git a/ddtrace/contrib/internal/httplib/patch.py b/ddtrace/contrib/internal/httplib/patch.py new file mode 100644 index 00000000000..6821839fc46 --- /dev/null +++ b/ddtrace/contrib/internal/httplib/patch.py @@ -0,0 +1,240 @@ +import functools +import os +import sys + +from ddtrace import config +from ddtrace.appsec._common_module_patches import wrapped_request_D8CB81E472AF98A2 as _wrap_request_asm +from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY +from ddtrace.constants import SPAN_KIND +from ddtrace.contrib import trace_utils +from ddtrace.contrib.trace_utils import unwrap as _u +from ddtrace.ext import SpanKind +from ddtrace.ext import SpanTypes +from ddtrace.internal.compat import httplib +from ddtrace.internal.compat import parse +from ddtrace.internal.constants import _HTTPLIB_NO_TRACE_REQUEST +from ddtrace.internal.constants import COMPONENT +from ddtrace.internal.logger import get_logger +from ddtrace.internal.schema import schematize_url_operation +from ddtrace.internal.schema.span_attribute_schema import SpanDirection +from ddtrace.internal.utils.formats import asbool +from ddtrace.pin import Pin +from ddtrace.propagation.http import HTTPPropagator +from ddtrace.settings.asm import config as asm_config +from ddtrace.vendor import wrapt + + +span_name = "http.client.request" +span_name = schematize_url_operation(span_name, protocol="http", direction=SpanDirection.OUTBOUND) + +log = get_logger(__name__) + + +config._add( + "httplib", + { + "distributed_tracing": asbool(os.getenv("DD_HTTPLIB_DISTRIBUTED_TRACING", default=True)), + "default_http_tag_query_string": os.getenv("DD_HTTP_CLIENT_TAG_QUERY_STRING", "true"), + }, +) + + +def get_version(): + # type: () -> str + return "" + + +def _wrap_init(func, instance, args, kwargs): + Pin(service=None, _config=config.httplib).onto(instance) + return func(*args, **kwargs) + + +def _wrap_getresponse(func, instance, args, kwargs): + # Use any attached tracer if available, otherwise use the global tracer + pin = Pin.get_from(instance) + if not pin or not pin.enabled(): + return func(*args, **kwargs) + + resp = None + try: + resp = func(*args, **kwargs) + return resp + finally: + try: + # Get the span attached to this instance, if available + span = getattr(instance, "_datadog_span", None) + if span: + if resp: + trace_utils.set_http_meta( + span, config.httplib, status_code=resp.status, response_headers=resp.getheaders() + ) + + span.finish() + delattr(instance, "_datadog_span") + except Exception: + log.debug("error applying request tags", exc_info=True) + + +def _call_asm_wrap(func, instance, *args, **kwargs): + _wrap_request_asm(func, instance, args, kwargs) + + +def _wrap_request(func, instance, args, kwargs): + # Use any attached tracer if available, otherwise use the global tracer + if asm_config._iast_enabled or asm_config._asm_enabled: + func_to_call = functools.partial(_call_asm_wrap, func, instance) + else: + func_to_call = func + + pin = Pin.get_from(instance) + if should_skip_request(pin, instance): + return func_to_call(*args, **kwargs) + + cfg = config.get_from(instance) + + try: + # Create a new span and attach to this instance (so we can retrieve/update/close later on the response) + span = pin.tracer.trace(span_name, span_type=SpanTypes.HTTP) + + span.set_tag_str(COMPONENT, config.httplib.integration_name) + + # set span.kind to the type of operation being performed + span.set_tag_str(SPAN_KIND, SpanKind.CLIENT) + + instance._datadog_span = span + + # propagate distributed tracing headers + if cfg.get("distributed_tracing"): + if len(args) > 3: + headers = args[3] + else: + headers = kwargs.setdefault("headers", {}) + HTTPPropagator.inject(span.context, headers) + except Exception: + log.debug("error configuring request", exc_info=True) + span = getattr(instance, "_datadog_span", None) + if span: + span.finish() + + try: + return func_to_call(*args, **kwargs) + except Exception: + span = getattr(instance, "_datadog_span", None) + exc_info = sys.exc_info() + if span: + span.set_exc_info(*exc_info) + span.finish() + raise + + +def _wrap_putrequest(func, instance, args, kwargs): + # Use any attached tracer if available, otherwise use the global tracer + pin = Pin.get_from(instance) + if should_skip_request(pin, instance): + return func(*args, **kwargs) + + try: + if hasattr(instance, "_datadog_span"): + # Reuse an existing span set in _wrap_request + span = instance._datadog_span + else: + # Create a new span and attach to this instance (so we can retrieve/update/close later on the response) + span = pin.tracer.trace(span_name, span_type=SpanTypes.HTTP) + + span.set_tag_str(COMPONENT, config.httplib.integration_name) + + # set span.kind to the type of operation being performed + span.set_tag_str(SPAN_KIND, SpanKind.CLIENT) + + instance._datadog_span = span + + method, path = args[:2] + scheme = "https" if isinstance(instance, httplib.HTTPSConnection) else "http" + port = ":{port}".format(port=instance.port) + + if (scheme == "http" and instance.port == 80) or (scheme == "https" and instance.port == 443): + port = "" + url = "{scheme}://{host}{port}{path}".format(scheme=scheme, host=instance.host, port=port, path=path) + + # sanitize url + parsed = parse.urlparse(url) + sanitized_url = parse.urlunparse( + (parsed.scheme, parsed.netloc, parsed.path, parsed.params, None, parsed.fragment) # drop query + ) + trace_utils.set_http_meta( + span, config.httplib, method=method, url=sanitized_url, target_host=instance.host, query=parsed.query + ) + + # set analytics sample rate + span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config.httplib.get_analytics_sample_rate()) + except Exception: + log.debug("error applying request tags", exc_info=True) + + # Close the span to prevent memory leaks. + span = getattr(instance, "_datadog_span", None) + if span: + span.finish() + + try: + return func(*args, **kwargs) + except Exception: + span = getattr(instance, "_datadog_span", None) + exc_info = sys.exc_info() + if span: + span.set_exc_info(*exc_info) + span.finish() + raise + + +def _wrap_putheader(func, instance, args, kwargs): + span = getattr(instance, "_datadog_span", None) + if span: + request_headers = {args[0]: args[1]} + trace_utils.set_http_meta(span, config.httplib, request_headers=request_headers) + + return func(*args, **kwargs) + + +def should_skip_request(pin, request): + """Helper to determine if the provided request should be traced""" + if getattr(request, _HTTPLIB_NO_TRACE_REQUEST, False): + return True + + if not pin or not pin.enabled(): + return True + + # httplib is used to send apm events (profiling,di, tracing, etc.) to the datadog agent + # Tracing these requests introduces a significant noise and instability in ddtrace tests. + # TO DO: Avoid tracing requests to APM internal services (ie: extend this functionality to agentless products). + agent_url = pin.tracer.agent_trace_url + if agent_url: + parsed = parse.urlparse(agent_url) + return request.host == parsed.hostname and request.port == parsed.port + return False + + +def patch(): + """patch the built-in urllib/httplib/httplib.client methods for tracing""" + if getattr(httplib, "__datadog_patch", False): + return + httplib.__datadog_patch = True + + # Patch the desired methods + httplib.HTTPConnection.__init__ = wrapt.FunctionWrapper(httplib.HTTPConnection.__init__, _wrap_init) + httplib.HTTPConnection.getresponse = wrapt.FunctionWrapper(httplib.HTTPConnection.getresponse, _wrap_getresponse) + httplib.HTTPConnection.request = wrapt.FunctionWrapper(httplib.HTTPConnection.request, _wrap_request) + httplib.HTTPConnection.putrequest = wrapt.FunctionWrapper(httplib.HTTPConnection.putrequest, _wrap_putrequest) + httplib.HTTPConnection.putheader = wrapt.FunctionWrapper(httplib.HTTPConnection.putheader, _wrap_putheader) + + +def unpatch(): + """unpatch any previously patched modules""" + if not getattr(httplib, "__datadog_patch", False): + return + httplib.__datadog_patch = False + + _u(httplib.HTTPConnection, "__init__") + _u(httplib.HTTPConnection, "getresponse") + _u(httplib.HTTPConnection, "request") + _u(httplib.HTTPConnection, "putrequest") + _u(httplib.HTTPConnection, "putheader") diff --git a/ddtrace/contrib/internal/httpx/patch.py b/ddtrace/contrib/internal/httpx/patch.py new file mode 100644 index 00000000000..11647da55e1 --- /dev/null +++ b/ddtrace/contrib/internal/httpx/patch.py @@ -0,0 +1,204 @@ +import os + +import httpx + +from ddtrace import config +from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY +from ddtrace.constants import SPAN_KIND +from ddtrace.constants import SPAN_MEASURED_KEY +from ddtrace.contrib.trace_utils import distributed_tracing_enabled +from ddtrace.contrib.trace_utils import ext_service +from ddtrace.contrib.trace_utils import set_http_meta +from ddtrace.ext import SpanKind +from ddtrace.ext import SpanTypes +from ddtrace.internal.compat import ensure_binary +from ddtrace.internal.compat import ensure_text +from ddtrace.internal.constants import COMPONENT +from ddtrace.internal.schema import schematize_url_operation +from ddtrace.internal.schema.span_attribute_schema import SpanDirection +from ddtrace.internal.utils import get_argument_value +from ddtrace.internal.utils.formats import asbool +from ddtrace.internal.utils.version import parse_version +from ddtrace.internal.utils.wrappers import unwrap as _u +from ddtrace.pin import Pin +from ddtrace.propagation.http import HTTPPropagator +from ddtrace.vendor.wrapt import BoundFunctionWrapper +from ddtrace.vendor.wrapt import wrap_function_wrapper as _w + + +HTTPX_VERSION = parse_version(httpx.__version__) + + +def get_version(): + # type: () -> str + return getattr(httpx, "__version__", "") + + +config._add( + "httpx", + { + "distributed_tracing": asbool(os.getenv("DD_HTTPX_DISTRIBUTED_TRACING", default=True)), + "split_by_domain": asbool(os.getenv("DD_HTTPX_SPLIT_BY_DOMAIN", default=False)), + "default_http_tag_query_string": os.getenv("DD_HTTP_CLIENT_TAG_QUERY_STRING", "true"), + }, +) + + +def _url_to_str(url): + # type: (httpx.URL) -> str + """ + Helper to convert the httpx.URL parts from bytes to a str + """ + # httpx==0.13.0 added URL.raw, removed in httpx==0.23.1. Otherwise, must construct manually + if HTTPX_VERSION < (0, 13, 0): + # Manually construct the same way httpx==0.13 does it: + # https://github.com/encode/httpx/blob/2c2c6a71a9ff520d237f8283a586df2753f01f5e/httpx/_models.py#L161 + scheme = url.scheme.encode("ascii") + host = url.host.encode("ascii") + port = url.port + raw_path = url.full_path.encode("ascii") + elif HTTPX_VERSION < (0, 23, 1): + scheme, host, port, raw_path = url.raw + else: + scheme = url.raw_scheme + host = url.raw_host + port = url.port + raw_path = url.raw_path + url = scheme + b"://" + host + if port is not None: + url += b":" + ensure_binary(str(port)) + url += raw_path + return ensure_text(url) + + +def _get_service_name(pin, request): + # type: (Pin, httpx.Request) -> typing.Text + if config.httpx.split_by_domain: + if hasattr(request.url, "netloc"): + return ensure_text(request.url.netloc, errors="backslashreplace") + else: + service = ensure_binary(request.url.host) + if request.url.port: + service += b":" + ensure_binary(str(request.url.port)) + return ensure_text(service, errors="backslashreplace") + return ext_service(pin, config.httpx) + + +def _init_span(span, request): + # type: (Span, httpx.Request) -> None + span.set_tag(SPAN_MEASURED_KEY) + + if distributed_tracing_enabled(config.httpx): + HTTPPropagator.inject(span.context, request.headers) + + sample_rate = config.httpx.get_analytics_sample_rate(use_global_config=True) + if sample_rate is not None: + span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, sample_rate) + + +def _set_span_meta(span, request, response): + # type: (Span, httpx.Request, httpx.Response) -> None + set_http_meta( + span, + config.httpx, + method=request.method, + url=_url_to_str(request.url), + target_host=request.url.host, + status_code=response.status_code if response else None, + query=request.url.query, + request_headers=request.headers, + response_headers=response.headers if response else None, + ) + + +async def _wrapped_async_send( + wrapped: BoundFunctionWrapper, + instance, # type: httpx.AsyncClient + args, # type: typing.Tuple[httpx.Request] + kwargs, # type: typing.Dict[typing.Str, typing.Any] +): + # type: (...) -> typing.Coroutine[None, None, httpx.Response] + req = get_argument_value(args, kwargs, 0, "request") + + pin = Pin.get_from(instance) + if not pin or not pin.enabled(): + return await wrapped(*args, **kwargs) + + operation_name = schematize_url_operation("http.request", protocol="http", direction=SpanDirection.OUTBOUND) + with pin.tracer.trace(operation_name, service=_get_service_name(pin, req), span_type=SpanTypes.HTTP) as span: + span.set_tag_str(COMPONENT, config.httpx.integration_name) + + # set span.kind to the operation type being performed + span.set_tag_str(SPAN_KIND, SpanKind.CLIENT) + + _init_span(span, req) + resp = None + try: + resp = await wrapped(*args, **kwargs) + return resp + finally: + _set_span_meta(span, req, resp) + + +def _wrapped_sync_send( + wrapped: BoundFunctionWrapper, + instance, # type: httpx.AsyncClient + args, # type: typing.Tuple[httpx.Request] + kwargs, # type: typing.Dict[typing.Str, typing.Any] +): + # type: (...) -> httpx.Response + pin = Pin.get_from(instance) + if not pin or not pin.enabled(): + return wrapped(*args, **kwargs) + + req = get_argument_value(args, kwargs, 0, "request") + + operation_name = schematize_url_operation("http.request", protocol="http", direction=SpanDirection.OUTBOUND) + with pin.tracer.trace(operation_name, service=_get_service_name(pin, req), span_type=SpanTypes.HTTP) as span: + span.set_tag_str(COMPONENT, config.httpx.integration_name) + + # set span.kind to the operation type being performed + span.set_tag_str(SPAN_KIND, SpanKind.CLIENT) + + _init_span(span, req) + resp = None + try: + resp = wrapped(*args, **kwargs) + return resp + finally: + _set_span_meta(span, req, resp) + + +def patch(): + # type: () -> None + if getattr(httpx, "_datadog_patch", False): + return + + httpx._datadog_patch = True + + pin = Pin() + + if HTTPX_VERSION >= (0, 11): + # httpx==0.11 created synchronous Client class separate from AsyncClient + _w(httpx.Client, "send", _wrapped_sync_send) + _w(httpx.AsyncClient, "send", _wrapped_async_send) + pin.onto(httpx.AsyncClient) + else: + # httpx==0.9 Client class was asynchronous, httpx==0.10 made Client synonymous with AsyncClient + _w(httpx.Client, "send", _wrapped_async_send) + + pin.onto(httpx.Client) + + +def unpatch(): + # type: () -> None + if not getattr(httpx, "_datadog_patch", False): + return + + httpx._datadog_patch = False + + if HTTPX_VERSION >= (0, 11): + # See above patching code for when this patching occurred + _u(httpx.AsyncClient, "send") + + _u(httpx.Client, "send") diff --git a/ddtrace/contrib/internal/jinja2/constants.py b/ddtrace/contrib/internal/jinja2/constants.py new file mode 100644 index 00000000000..1bda6e1cadb --- /dev/null +++ b/ddtrace/contrib/internal/jinja2/constants.py @@ -0,0 +1 @@ +DEFAULT_TEMPLATE_NAME = "" diff --git a/ddtrace/contrib/internal/jinja2/patch.py b/ddtrace/contrib/internal/jinja2/patch.py new file mode 100644 index 00000000000..9e8006af385 --- /dev/null +++ b/ddtrace/contrib/internal/jinja2/patch.py @@ -0,0 +1,110 @@ +import os + +import jinja2 + +from ddtrace import config +from ddtrace.constants import SPAN_MEASURED_KEY +from ddtrace.contrib.trace_utils import unwrap as _u +from ddtrace.ext import SpanTypes +from ddtrace.internal.constants import COMPONENT +from ddtrace.internal.utils import ArgumentError +from ddtrace.internal.utils import get_argument_value +from ddtrace.pin import Pin +from ddtrace.vendor.wrapt import wrap_function_wrapper as _w + +from .constants import DEFAULT_TEMPLATE_NAME + + +# default settings +config._add( + "jinja2", + { + "service_name": os.getenv("DD_JINJA2_SERVICE_NAME"), + }, +) + + +def get_version(): + # type: () -> str + return getattr(jinja2, "__version__", "") + + +def patch(): + if getattr(jinja2, "__datadog_patch", False): + # already patched + return + jinja2.__datadog_patch = True + Pin( + service=config.jinja2["service_name"], + _config=config.jinja2, + ).onto(jinja2.environment.Environment) + _w(jinja2, "environment.Template.render", _wrap_render) + _w(jinja2, "environment.Template.generate", _wrap_render) + _w(jinja2, "environment.Environment.compile", _wrap_compile) + _w(jinja2, "environment.Environment._load_template", _wrap_load_template) + + +def unpatch(): + if not getattr(jinja2, "__datadog_patch", False): + return + jinja2.__datadog_patch = False + _u(jinja2.Template, "render") + _u(jinja2.Template, "generate") + _u(jinja2.Environment, "compile") + _u(jinja2.Environment, "_load_template") + + +def _wrap_render(wrapped, instance, args, kwargs): + """Wrap `Template.render()` or `Template.generate()`""" + pin = Pin.get_from(instance.environment) + if not pin or not pin.enabled(): + return wrapped(*args, **kwargs) + + template_name = str(instance.name or DEFAULT_TEMPLATE_NAME) + with pin.tracer.trace("jinja2.render", pin.service, span_type=SpanTypes.TEMPLATE) as span: + span.set_tag_str(COMPONENT, config.jinja2.integration_name) + + span.set_tag(SPAN_MEASURED_KEY) + try: + return wrapped(*args, **kwargs) + finally: + span.resource = template_name + span.set_tag_str("jinja2.template_name", template_name) + + +def _wrap_compile(wrapped, instance, args, kwargs): + pin = Pin.get_from(instance) + if not pin or not pin.enabled(): + return wrapped(*args, **kwargs) + + try: + template_name = get_argument_value(args, kwargs, 1, "name") + except ArgumentError: + template_name = DEFAULT_TEMPLATE_NAME + + with pin.tracer.trace("jinja2.compile", pin.service, span_type=SpanTypes.TEMPLATE) as span: + try: + return wrapped(*args, **kwargs) + finally: + span.set_tag_str(COMPONENT, config.jinja2.integration_name) + + span.resource = template_name + span.set_tag_str("jinja2.template_name", template_name) + + +def _wrap_load_template(wrapped, instance, args, kwargs): + pin = Pin.get_from(instance) + if not pin or not pin.enabled(): + return wrapped(*args, **kwargs) + + template_name = get_argument_value(args, kwargs, 0, "name") + with pin.tracer.trace("jinja2.load", pin.service, span_type=SpanTypes.TEMPLATE) as span: + template = None + try: + template = wrapped(*args, **kwargs) + return template + finally: + span.resource = template_name + span.set_tag_str("jinja2.template_name", template_name) + if template: + span.set_tag_str("jinja2.template_path", template.filename) diff --git a/ddtrace/contrib/internal/kafka/patch.py b/ddtrace/contrib/internal/kafka/patch.py new file mode 100644 index 00000000000..60e5ed23379 --- /dev/null +++ b/ddtrace/contrib/internal/kafka/patch.py @@ -0,0 +1,312 @@ +import os +import sys + +import confluent_kafka + +from ddtrace import config +from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY +from ddtrace.constants import SPAN_KIND +from ddtrace.constants import SPAN_MEASURED_KEY +from ddtrace.contrib import trace_utils +from ddtrace.ext import SpanKind +from ddtrace.ext import SpanTypes +from ddtrace.ext import kafka as kafkax +from ddtrace.internal import core +from ddtrace.internal.compat import time_ns +from ddtrace.internal.constants import COMPONENT +from ddtrace.internal.constants import MESSAGING_SYSTEM +from ddtrace.internal.logger import get_logger +from ddtrace.internal.schema import schematize_messaging_operation +from ddtrace.internal.schema import schematize_service_name +from ddtrace.internal.schema.span_attribute_schema import SpanDirection +from ddtrace.internal.utils import ArgumentError +from ddtrace.internal.utils import get_argument_value +from ddtrace.internal.utils import set_argument_value +from ddtrace.internal.utils.formats import asbool +from ddtrace.internal.utils.version import parse_version +from ddtrace.pin import Pin +from ddtrace.propagation.http import HTTPPropagator as Propagator + + +_Producer = confluent_kafka.Producer +_Consumer = confluent_kafka.Consumer +_SerializingProducer = confluent_kafka.SerializingProducer if hasattr(confluent_kafka, "SerializingProducer") else None +_DeserializingConsumer = ( + confluent_kafka.DeserializingConsumer if hasattr(confluent_kafka, "DeserializingConsumer") else None +) + + +log = get_logger(__name__) + + +config._add( + "kafka", + dict( + _default_service=schematize_service_name("kafka"), + distributed_tracing_enabled=asbool(os.getenv("DD_KAFKA_PROPAGATION_ENABLED", default=False)), + trace_empty_poll_enabled=asbool(os.getenv("DD_KAFKA_EMPTY_POLL_ENABLED", default=True)), + ), +) + + +def get_version(): + # type: () -> str + return getattr(confluent_kafka, "__version__", "") + + +KAFKA_VERSION_TUPLE = parse_version(get_version()) + + +_SerializationContext = confluent_kafka.serialization.SerializationContext if KAFKA_VERSION_TUPLE >= (1, 4, 0) else None +_MessageField = confluent_kafka.serialization.MessageField if KAFKA_VERSION_TUPLE >= (1, 4, 0) else None + + +class TracedProducerMixin: + def __init__(self, config, *args, **kwargs): + super(TracedProducerMixin, self).__init__(config, *args, **kwargs) + self._dd_bootstrap_servers = ( + config.get("bootstrap.servers") + if config.get("bootstrap.servers") is not None + else config.get("metadata.broker.list") + ) + + # in older versions of confluent_kafka, bool(Producer()) evaluates to False, + # which makes the Pin functionality ignore it. + def __bool__(self): + return True + + __nonzero__ = __bool__ + + +class TracedConsumerMixin: + def __init__(self, config, *args, **kwargs): + super(TracedConsumerMixin, self).__init__(config, *args, **kwargs) + self._group_id = config.get("group.id", "") + self._auto_commit = asbool(config.get("enable.auto.commit", True)) + + +class TracedConsumer(TracedConsumerMixin, confluent_kafka.Consumer): + pass + + +class TracedProducer(TracedProducerMixin, confluent_kafka.Producer): + pass + + +class TracedDeserializingConsumer(TracedConsumerMixin, confluent_kafka.DeserializingConsumer): + pass + + +class TracedSerializingProducer(TracedProducerMixin, confluent_kafka.SerializingProducer): + pass + + +def patch(): + if getattr(confluent_kafka, "_datadog_patch", False): + return + confluent_kafka._datadog_patch = True + + confluent_kafka.Producer = TracedProducer + confluent_kafka.Consumer = TracedConsumer + if _SerializingProducer is not None: + confluent_kafka.SerializingProducer = TracedSerializingProducer + if _DeserializingConsumer is not None: + confluent_kafka.DeserializingConsumer = TracedDeserializingConsumer + + for producer in (TracedProducer, TracedSerializingProducer): + trace_utils.wrap(producer, "produce", traced_produce) + for consumer in (TracedConsumer, TracedDeserializingConsumer): + trace_utils.wrap(consumer, "poll", traced_poll_or_consume) + trace_utils.wrap(consumer, "commit", traced_commit) + + # Consume is not implemented in deserializing consumers + trace_utils.wrap(TracedConsumer, "consume", traced_poll_or_consume) + Pin().onto(confluent_kafka.Producer) + Pin().onto(confluent_kafka.Consumer) + Pin().onto(confluent_kafka.SerializingProducer) + Pin().onto(confluent_kafka.DeserializingConsumer) + + +def unpatch(): + if getattr(confluent_kafka, "_datadog_patch", False): + confluent_kafka._datadog_patch = False + + for producer in (TracedProducer, TracedSerializingProducer): + if trace_utils.iswrapped(producer.produce): + trace_utils.unwrap(producer, "produce") + for consumer in (TracedConsumer, TracedDeserializingConsumer): + if trace_utils.iswrapped(consumer.poll): + trace_utils.unwrap(consumer, "poll") + if trace_utils.iswrapped(consumer.commit): + trace_utils.unwrap(consumer, "commit") + + # Consume is not implemented in deserializing consumers + if trace_utils.iswrapped(TracedConsumer.consume): + trace_utils.unwrap(TracedConsumer, "consume") + + confluent_kafka.Producer = _Producer + confluent_kafka.Consumer = _Consumer + if _SerializingProducer is not None: + confluent_kafka.SerializingProducer = _SerializingProducer + if _DeserializingConsumer is not None: + confluent_kafka.DeserializingConsumer = _DeserializingConsumer + + +def traced_produce(func, instance, args, kwargs): + pin = Pin.get_from(instance) + if not pin or not pin.enabled(): + return func(*args, **kwargs) + + topic = get_argument_value(args, kwargs, 0, "topic") or "" + core.set_item("kafka_topic", topic) + try: + value = get_argument_value(args, kwargs, 1, "value") + except ArgumentError: + value = None + message_key = kwargs.get("key", "") or "" + partition = kwargs.get("partition", -1) + headers = get_argument_value(args, kwargs, 6, "headers", optional=True) or {} + with pin.tracer.trace( + schematize_messaging_operation(kafkax.PRODUCE, provider="kafka", direction=SpanDirection.OUTBOUND), + service=trace_utils.ext_service(pin, config.kafka), + span_type=SpanTypes.WORKER, + ) as span: + core.dispatch("kafka.produce.start", (instance, args, kwargs, isinstance(instance, _SerializingProducer), span)) + span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE) + span.set_tag_str(COMPONENT, config.kafka.integration_name) + span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) + span.set_tag_str(kafkax.TOPIC, topic) + + if _SerializingProducer is not None and isinstance(instance, _SerializingProducer): + serialized_key = serialize_key(instance, topic, message_key, headers) + if serialized_key is not None: + span.set_tag_str(kafkax.MESSAGE_KEY, serialized_key) + else: + span.set_tag_str(kafkax.MESSAGE_KEY, message_key) + + span.set_tag(kafkax.PARTITION, partition) + span.set_tag_str(kafkax.TOMBSTONE, str(value is None)) + span.set_tag(SPAN_MEASURED_KEY) + if instance._dd_bootstrap_servers is not None: + span.set_tag_str(kafkax.HOST_LIST, instance._dd_bootstrap_servers) + rate = config.kafka.get_analytics_sample_rate() + if rate is not None: + span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate) + + # inject headers with Datadog tags if trace propagation is enabled + if config.kafka.distributed_tracing_enabled: + # inject headers with Datadog tags: + headers = get_argument_value(args, kwargs, 6, "headers", True) or {} + Propagator.inject(span.context, headers) + args, kwargs = set_argument_value(args, kwargs, 6, "headers", headers, override_unset=True) + return func(*args, **kwargs) + + +def traced_poll_or_consume(func, instance, args, kwargs): + pin = Pin.get_from(instance) + if not pin or not pin.enabled(): + return func(*args, **kwargs) + + # we must get start time now since execute before starting a span in order to get distributed context + # if it exists + start_ns = time_ns() + err = None + result = None + try: + result = func(*args, **kwargs) + except Exception as e: + err = e + raise err + finally: + if isinstance(result, confluent_kafka.Message): + # poll returns a single message + _instrument_message([result], pin, start_ns, instance, err) + elif isinstance(result, list): + # consume returns a list of messages, + _instrument_message(result, pin, start_ns, instance, err) + elif config.kafka.trace_empty_poll_enabled: + _instrument_message([None], pin, start_ns, instance, err) + + return result + + +def _instrument_message(messages, pin, start_ns, instance, err): + ctx = None + # First message is used to extract context and enrich datadog spans + # This approach aligns with the opentelemetry confluent kafka semantics + first_message = messages[0] if len(messages) else None + if first_message is not None and config.kafka.distributed_tracing_enabled and first_message.headers(): + ctx = Propagator.extract(dict(first_message.headers())) + with pin.tracer.start_span( + name=schematize_messaging_operation(kafkax.CONSUME, provider="kafka", direction=SpanDirection.PROCESSING), + service=trace_utils.ext_service(pin, config.kafka), + span_type=SpanTypes.WORKER, + child_of=ctx if ctx is not None else pin.tracer.context_provider.active(), + activate=True, + ) as span: + # reset span start time to before function call + span.start_ns = start_ns + + for message in messages: + if message is not None and first_message is not None: + core.set_item("kafka_topic", first_message.topic()) + core.dispatch("kafka.consume.start", (instance, first_message, span)) + + span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE) + span.set_tag_str(COMPONENT, config.kafka.integration_name) + span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) + span.set_tag_str(kafkax.RECEIVED_MESSAGE, str(first_message is not None)) + span.set_tag_str(kafkax.GROUP_ID, instance._group_id) + if first_message is not None: + message_key = first_message.key() or "" + message_offset = first_message.offset() or -1 + span.set_tag_str(kafkax.TOPIC, first_message.topic()) + + # If this is a deserializing consumer, do not set the key as a tag since we + # do not have the serialization function + if ( + (_DeserializingConsumer is not None and not isinstance(instance, _DeserializingConsumer)) + or isinstance(message_key, str) + or isinstance(message_key, bytes) + ): + span.set_tag_str(kafkax.MESSAGE_KEY, message_key) + span.set_tag(kafkax.PARTITION, first_message.partition()) + is_tombstone = False + try: + is_tombstone = len(first_message) == 0 + except TypeError: # https://github.com/confluentinc/confluent-kafka-python/issues/1192 + pass + span.set_tag_str(kafkax.TOMBSTONE, str(is_tombstone)) + span.set_tag(kafkax.MESSAGE_OFFSET, message_offset) + span.set_tag(SPAN_MEASURED_KEY) + rate = config.kafka.get_analytics_sample_rate() + if rate is not None: + span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate) + + if err is not None: + span.set_exc_info(*sys.exc_info()) + + +def traced_commit(func, instance, args, kwargs): + pin = Pin.get_from(instance) + if not pin or not pin.enabled(): + return func(*args, **kwargs) + + core.dispatch("kafka.commit.start", (instance, args, kwargs)) + + return func(*args, **kwargs) + + +def serialize_key(instance, topic, key, headers): + if _SerializationContext is not None and _MessageField is not None: + ctx = _SerializationContext(topic, _MessageField.KEY, headers) + if hasattr(instance, "_key_serializer") and instance._key_serializer is not None: + try: + key = instance._key_serializer(key, ctx) + return key + except Exception: + log.debug("Failed to set Kafka Consumer key tag: %s", str(key)) + return None + else: + log.warning("Failed to set Kafka Consumer key tag, no method available to serialize key: %s", str(key)) + return None diff --git a/ddtrace/contrib/internal/kombu/constants.py b/ddtrace/contrib/internal/kombu/constants.py new file mode 100644 index 00000000000..bcada46cdfd --- /dev/null +++ b/ddtrace/contrib/internal/kombu/constants.py @@ -0,0 +1 @@ +DEFAULT_SERVICE = "kombu" diff --git a/ddtrace/contrib/internal/kombu/patch.py b/ddtrace/contrib/internal/kombu/patch.py new file mode 100644 index 00000000000..6a3c63380c3 --- /dev/null +++ b/ddtrace/contrib/internal/kombu/patch.py @@ -0,0 +1,167 @@ +import os + +# 3p +import kombu + +from ddtrace import config +from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY +from ddtrace.constants import SPAN_KIND +from ddtrace.constants import SPAN_MEASURED_KEY + +# project +from ddtrace.contrib import trace_utils +from ddtrace.ext import SpanKind +from ddtrace.ext import SpanTypes +from ddtrace.ext import kombu as kombux +from ddtrace.internal import core +from ddtrace.internal.constants import COMPONENT +from ddtrace.internal.schema import schematize_messaging_operation +from ddtrace.internal.schema import schematize_service_name +from ddtrace.internal.schema.span_attribute_schema import SpanDirection +from ddtrace.internal.utils import get_argument_value +from ddtrace.internal.utils.formats import asbool +from ddtrace.internal.utils.wrappers import unwrap +from ddtrace.pin import Pin +from ddtrace.propagation.http import HTTPPropagator +from ddtrace.vendor import wrapt + +from .constants import DEFAULT_SERVICE +from .utils import HEADER_POS +from .utils import extract_conn_tags +from .utils import get_body_length_from_args +from .utils import get_exchange_from_args +from .utils import get_routing_key_from_args + + +def get_version(): + # type: () -> str + return str(kombu.__version__) + + +# kombu default settings + +config._add( + "kombu", + { + "distributed_tracing_enabled": asbool(os.getenv("DD_KOMBU_DISTRIBUTED_TRACING", default=True)), + "service_name": config.service or os.getenv("DD_KOMBU_SERVICE_NAME", default=DEFAULT_SERVICE), + }, +) + +propagator = HTTPPropagator + + +def patch(): + """Patch the instrumented methods + + This duplicated doesn't look nice. The nicer alternative is to use an ObjectProxy on top + of Kombu. However, it means that any "import kombu.Connection" won't be instrumented. + """ + if getattr(kombu, "_datadog_patch", False): + return + kombu._datadog_patch = True + + _w = wrapt.wrap_function_wrapper + # We wrap the _publish method because the publish method: + # * defines defaults in its kwargs + # * potentially overrides kwargs with values from self + # * extracts/normalizes things like exchange + _w("kombu", "Producer._publish", traced_publish) + _w("kombu", "Consumer.receive", traced_receive) + + # We do not provide a service for producer spans since they represent + # external calls to another service. + # Instead the service should be inherited from the parent. + if config.service: + prod_service = None + # DEV: backwards-compatibility for users who set a kombu service + else: + prod_service = os.getenv("DD_KOMBU_SERVICE_NAME", default=DEFAULT_SERVICE) + + Pin( + service=schematize_service_name(prod_service), + ).onto(kombu.messaging.Producer) + + Pin(service=schematize_service_name(config.kombu["service_name"])).onto(kombu.messaging.Consumer) + + +def unpatch(): + if getattr(kombu, "_datadog_patch", False): + kombu._datadog_patch = False + unwrap(kombu.Producer, "_publish") + unwrap(kombu.Consumer, "receive") + + +# +# tracing functions +# + + +def traced_receive(func, instance, args, kwargs): + pin = Pin.get_from(instance) + if not pin or not pin.enabled(): + return func(*args, **kwargs) + + # Signature only takes 2 args: (body, message) + message = get_argument_value(args, kwargs, 1, "message") + + trace_utils.activate_distributed_headers(pin.tracer, request_headers=message.headers, int_config=config.kombu) + + with pin.tracer.trace( + schematize_messaging_operation(kombux.RECEIVE_NAME, provider="kombu", direction=SpanDirection.PROCESSING), + service=pin.service, + span_type=SpanTypes.WORKER, + ) as s: + s.set_tag_str(COMPONENT, config.kombu.integration_name) + + # set span.kind to the type of operation being performed + s.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) + + s.set_tag(SPAN_MEASURED_KEY) + # run the command + exchange = message.delivery_info["exchange"] + s.resource = exchange + s.set_tag_str(kombux.EXCHANGE, exchange) + + s.set_tags(extract_conn_tags(message.channel.connection)) + s.set_tag_str(kombux.ROUTING_KEY, message.delivery_info["routing_key"]) + # set analytics sample rate + s.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config.kombu.get_analytics_sample_rate()) + result = func(*args, **kwargs) + core.dispatch("kombu.amqp.receive.post", [instance, message, s]) + return result + + +def traced_publish(func, instance, args, kwargs): + pin = Pin.get_from(instance) + if not pin or not pin.enabled(): + return func(*args, **kwargs) + + with pin.tracer.trace( + schematize_messaging_operation(kombux.PUBLISH_NAME, provider="kombu", direction=SpanDirection.OUTBOUND), + service=pin.service, + span_type=SpanTypes.WORKER, + ) as s: + s.set_tag_str(COMPONENT, config.kombu.integration_name) + + # set span.kind to the type of operation being performed + s.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) + + s.set_tag(SPAN_MEASURED_KEY) + exchange_name = get_exchange_from_args(args) + s.resource = exchange_name + s.set_tag_str(kombux.EXCHANGE, exchange_name) + if pin.tags: + s.set_tags(pin.tags) + s.set_tag_str(kombux.ROUTING_KEY, get_routing_key_from_args(args)) + s.set_tags(extract_conn_tags(instance.channel.connection)) + s.set_metric(kombux.BODY_LEN, get_body_length_from_args(args)) + # set analytics sample rate + s.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config.kombu.get_analytics_sample_rate()) + # run the command + if config.kombu.distributed_tracing_enabled: + propagator.inject(s.context, args[HEADER_POS]) + core.dispatch( + "kombu.amqp.publish.pre", [args, kwargs, s] + ) # Has to happen after trace injection for actual payload size + return func(*args, **kwargs) diff --git a/ddtrace/contrib/internal/kombu/utils.py b/ddtrace/contrib/internal/kombu/utils.py new file mode 100644 index 00000000000..96acd6795cc --- /dev/null +++ b/ddtrace/contrib/internal/kombu/utils.py @@ -0,0 +1,49 @@ +""" +Some utils used by the dogtrace kombu integration +""" +from ddtrace.ext import kombu as kombux +from ddtrace.ext import net + + +PUBLISH_BODY_IDX = 0 +PUBLISH_ROUTING_KEY = 6 +PUBLISH_EXCHANGE_IDX = 9 + +HEADER_POS = 4 + + +def extract_conn_tags(connection): + """Transform kombu conn info into dogtrace metas""" + try: + host, port = connection.host.split(":") + return { + net.TARGET_HOST: host, + net.TARGET_PORT: port, + kombux.VHOST: connection.virtual_host, + } + except AttributeError: + # Unlikely that we don't have .host or .virtual_host but let's not die over it + return {} + + +def get_exchange_from_args(args): + """Extract the exchange + + The publish method extracts the name and hands that off to _publish (what we patch) + """ + + return args[PUBLISH_EXCHANGE_IDX] + + +def get_routing_key_from_args(args): + """Extract the routing key""" + + name = args[PUBLISH_ROUTING_KEY] + return name + + +def get_body_length_from_args(args): + """Extract the length of the body""" + + length = len(args[PUBLISH_BODY_IDX]) + return length diff --git a/ddtrace/contrib/jinja2/__init__.py b/ddtrace/contrib/jinja2/__init__.py index 5e2214f341b..1b0959b29ef 100644 --- a/ddtrace/contrib/jinja2/__init__.py +++ b/ddtrace/contrib/jinja2/__init__.py @@ -34,8 +34,12 @@ with require_modules(required_modules) as missing_modules: if not missing_modules: - from .patch import get_version - from .patch import patch - from .patch import unpatch + # Required to allow users to import from `ddtrace.contrib.jinja2.patch` directly + from . import patch as _ # noqa: F401, I001 + + # Expose public methods + from ..internal.jinja2.patch import get_version + from ..internal.jinja2.patch import patch + from ..internal.jinja2.patch import unpatch __all__ = ["patch", "unpatch", "get_version"] diff --git a/ddtrace/contrib/jinja2/constants.py b/ddtrace/contrib/jinja2/constants.py index 1bda6e1cadb..21417cd8478 100644 --- a/ddtrace/contrib/jinja2/constants.py +++ b/ddtrace/contrib/jinja2/constants.py @@ -1 +1,15 @@ -DEFAULT_TEMPLATE_NAME = "" +from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning +from ddtrace.vendor.debtcollector import deprecate + +from ..internal.jinja2.constants import * # noqa: F401,F403 + + +def __getattr__(name): + deprecate( + ("%s.%s is deprecated" % (__name__, name)), + category=DDTraceDeprecationWarning, + ) + + if name in globals(): + return globals()[name] + raise AttributeError("%s has no attribute %s", __name__, name) diff --git a/ddtrace/contrib/jinja2/patch.py b/ddtrace/contrib/jinja2/patch.py index fede2c630e5..d81b3d67778 100644 --- a/ddtrace/contrib/jinja2/patch.py +++ b/ddtrace/contrib/jinja2/patch.py @@ -1,110 +1,4 @@ -import os +from ..internal.jinja2.patch import * # noqa: F401,F403 -import jinja2 -from ddtrace import config -from ddtrace.internal.constants import COMPONENT -from ddtrace.vendor.wrapt import wrap_function_wrapper as _w - -from ...constants import SPAN_MEASURED_KEY -from ...ext import SpanTypes -from ...internal.utils import ArgumentError -from ...internal.utils import get_argument_value -from ...pin import Pin -from ..trace_utils import unwrap as _u -from .constants import DEFAULT_TEMPLATE_NAME - - -# default settings -config._add( - "jinja2", - { - "service_name": os.getenv("DD_JINJA2_SERVICE_NAME"), - }, -) - - -def get_version(): - # type: () -> str - return getattr(jinja2, "__version__", "") - - -def patch(): - if getattr(jinja2, "__datadog_patch", False): - # already patched - return - jinja2.__datadog_patch = True - Pin( - service=config.jinja2["service_name"], - _config=config.jinja2, - ).onto(jinja2.environment.Environment) - _w(jinja2, "environment.Template.render", _wrap_render) - _w(jinja2, "environment.Template.generate", _wrap_render) - _w(jinja2, "environment.Environment.compile", _wrap_compile) - _w(jinja2, "environment.Environment._load_template", _wrap_load_template) - - -def unpatch(): - if not getattr(jinja2, "__datadog_patch", False): - return - jinja2.__datadog_patch = False - _u(jinja2.Template, "render") - _u(jinja2.Template, "generate") - _u(jinja2.Environment, "compile") - _u(jinja2.Environment, "_load_template") - - -def _wrap_render(wrapped, instance, args, kwargs): - """Wrap `Template.render()` or `Template.generate()`""" - pin = Pin.get_from(instance.environment) - if not pin or not pin.enabled(): - return wrapped(*args, **kwargs) - - template_name = str(instance.name or DEFAULT_TEMPLATE_NAME) - with pin.tracer.trace("jinja2.render", pin.service, span_type=SpanTypes.TEMPLATE) as span: - span.set_tag_str(COMPONENT, config.jinja2.integration_name) - - span.set_tag(SPAN_MEASURED_KEY) - try: - return wrapped(*args, **kwargs) - finally: - span.resource = template_name - span.set_tag_str("jinja2.template_name", template_name) - - -def _wrap_compile(wrapped, instance, args, kwargs): - pin = Pin.get_from(instance) - if not pin or not pin.enabled(): - return wrapped(*args, **kwargs) - - try: - template_name = get_argument_value(args, kwargs, 1, "name") - except ArgumentError: - template_name = DEFAULT_TEMPLATE_NAME - - with pin.tracer.trace("jinja2.compile", pin.service, span_type=SpanTypes.TEMPLATE) as span: - try: - return wrapped(*args, **kwargs) - finally: - span.set_tag_str(COMPONENT, config.jinja2.integration_name) - - span.resource = template_name - span.set_tag_str("jinja2.template_name", template_name) - - -def _wrap_load_template(wrapped, instance, args, kwargs): - pin = Pin.get_from(instance) - if not pin or not pin.enabled(): - return wrapped(*args, **kwargs) - - template_name = get_argument_value(args, kwargs, 0, "name") - with pin.tracer.trace("jinja2.load", pin.service, span_type=SpanTypes.TEMPLATE) as span: - template = None - try: - template = wrapped(*args, **kwargs) - return template - finally: - span.resource = template_name - span.set_tag_str("jinja2.template_name", template_name) - if template: - span.set_tag_str("jinja2.template_path", template.filename) +# TODO: deprecate and remove this module diff --git a/ddtrace/contrib/kafka/__init__.py b/ddtrace/contrib/kafka/__init__.py index f49a5cca732..05359f5c11d 100644 --- a/ddtrace/contrib/kafka/__init__.py +++ b/ddtrace/contrib/kafka/__init__.py @@ -48,8 +48,12 @@ with require_modules(required_modules) as missing_modules: if not missing_modules: - from .patch import get_version - from .patch import patch - from .patch import unpatch + # Required to allow users to import from `ddtrace.contrib.kafka.patch` directly + from . import patch as _ # noqa: F401, I001 + + # Expose public methods + from ..internal.kafka.patch import get_version + from ..internal.kafka.patch import patch + from ..internal.kafka.patch import unpatch __all__ = ["patch", "unpatch", "get_version"] diff --git a/ddtrace/contrib/kafka/patch.py b/ddtrace/contrib/kafka/patch.py index 60e5ed23379..6470048b748 100644 --- a/ddtrace/contrib/kafka/patch.py +++ b/ddtrace/contrib/kafka/patch.py @@ -1,312 +1,4 @@ -import os -import sys +from ..internal.kafka.patch import * # noqa: F401,F403 -import confluent_kafka -from ddtrace import config -from ddtrace.constants import ANALYTICS_SAMPLE_RATE_KEY -from ddtrace.constants import SPAN_KIND -from ddtrace.constants import SPAN_MEASURED_KEY -from ddtrace.contrib import trace_utils -from ddtrace.ext import SpanKind -from ddtrace.ext import SpanTypes -from ddtrace.ext import kafka as kafkax -from ddtrace.internal import core -from ddtrace.internal.compat import time_ns -from ddtrace.internal.constants import COMPONENT -from ddtrace.internal.constants import MESSAGING_SYSTEM -from ddtrace.internal.logger import get_logger -from ddtrace.internal.schema import schematize_messaging_operation -from ddtrace.internal.schema import schematize_service_name -from ddtrace.internal.schema.span_attribute_schema import SpanDirection -from ddtrace.internal.utils import ArgumentError -from ddtrace.internal.utils import get_argument_value -from ddtrace.internal.utils import set_argument_value -from ddtrace.internal.utils.formats import asbool -from ddtrace.internal.utils.version import parse_version -from ddtrace.pin import Pin -from ddtrace.propagation.http import HTTPPropagator as Propagator - - -_Producer = confluent_kafka.Producer -_Consumer = confluent_kafka.Consumer -_SerializingProducer = confluent_kafka.SerializingProducer if hasattr(confluent_kafka, "SerializingProducer") else None -_DeserializingConsumer = ( - confluent_kafka.DeserializingConsumer if hasattr(confluent_kafka, "DeserializingConsumer") else None -) - - -log = get_logger(__name__) - - -config._add( - "kafka", - dict( - _default_service=schematize_service_name("kafka"), - distributed_tracing_enabled=asbool(os.getenv("DD_KAFKA_PROPAGATION_ENABLED", default=False)), - trace_empty_poll_enabled=asbool(os.getenv("DD_KAFKA_EMPTY_POLL_ENABLED", default=True)), - ), -) - - -def get_version(): - # type: () -> str - return getattr(confluent_kafka, "__version__", "") - - -KAFKA_VERSION_TUPLE = parse_version(get_version()) - - -_SerializationContext = confluent_kafka.serialization.SerializationContext if KAFKA_VERSION_TUPLE >= (1, 4, 0) else None -_MessageField = confluent_kafka.serialization.MessageField if KAFKA_VERSION_TUPLE >= (1, 4, 0) else None - - -class TracedProducerMixin: - def __init__(self, config, *args, **kwargs): - super(TracedProducerMixin, self).__init__(config, *args, **kwargs) - self._dd_bootstrap_servers = ( - config.get("bootstrap.servers") - if config.get("bootstrap.servers") is not None - else config.get("metadata.broker.list") - ) - - # in older versions of confluent_kafka, bool(Producer()) evaluates to False, - # which makes the Pin functionality ignore it. - def __bool__(self): - return True - - __nonzero__ = __bool__ - - -class TracedConsumerMixin: - def __init__(self, config, *args, **kwargs): - super(TracedConsumerMixin, self).__init__(config, *args, **kwargs) - self._group_id = config.get("group.id", "") - self._auto_commit = asbool(config.get("enable.auto.commit", True)) - - -class TracedConsumer(TracedConsumerMixin, confluent_kafka.Consumer): - pass - - -class TracedProducer(TracedProducerMixin, confluent_kafka.Producer): - pass - - -class TracedDeserializingConsumer(TracedConsumerMixin, confluent_kafka.DeserializingConsumer): - pass - - -class TracedSerializingProducer(TracedProducerMixin, confluent_kafka.SerializingProducer): - pass - - -def patch(): - if getattr(confluent_kafka, "_datadog_patch", False): - return - confluent_kafka._datadog_patch = True - - confluent_kafka.Producer = TracedProducer - confluent_kafka.Consumer = TracedConsumer - if _SerializingProducer is not None: - confluent_kafka.SerializingProducer = TracedSerializingProducer - if _DeserializingConsumer is not None: - confluent_kafka.DeserializingConsumer = TracedDeserializingConsumer - - for producer in (TracedProducer, TracedSerializingProducer): - trace_utils.wrap(producer, "produce", traced_produce) - for consumer in (TracedConsumer, TracedDeserializingConsumer): - trace_utils.wrap(consumer, "poll", traced_poll_or_consume) - trace_utils.wrap(consumer, "commit", traced_commit) - - # Consume is not implemented in deserializing consumers - trace_utils.wrap(TracedConsumer, "consume", traced_poll_or_consume) - Pin().onto(confluent_kafka.Producer) - Pin().onto(confluent_kafka.Consumer) - Pin().onto(confluent_kafka.SerializingProducer) - Pin().onto(confluent_kafka.DeserializingConsumer) - - -def unpatch(): - if getattr(confluent_kafka, "_datadog_patch", False): - confluent_kafka._datadog_patch = False - - for producer in (TracedProducer, TracedSerializingProducer): - if trace_utils.iswrapped(producer.produce): - trace_utils.unwrap(producer, "produce") - for consumer in (TracedConsumer, TracedDeserializingConsumer): - if trace_utils.iswrapped(consumer.poll): - trace_utils.unwrap(consumer, "poll") - if trace_utils.iswrapped(consumer.commit): - trace_utils.unwrap(consumer, "commit") - - # Consume is not implemented in deserializing consumers - if trace_utils.iswrapped(TracedConsumer.consume): - trace_utils.unwrap(TracedConsumer, "consume") - - confluent_kafka.Producer = _Producer - confluent_kafka.Consumer = _Consumer - if _SerializingProducer is not None: - confluent_kafka.SerializingProducer = _SerializingProducer - if _DeserializingConsumer is not None: - confluent_kafka.DeserializingConsumer = _DeserializingConsumer - - -def traced_produce(func, instance, args, kwargs): - pin = Pin.get_from(instance) - if not pin or not pin.enabled(): - return func(*args, **kwargs) - - topic = get_argument_value(args, kwargs, 0, "topic") or "" - core.set_item("kafka_topic", topic) - try: - value = get_argument_value(args, kwargs, 1, "value") - except ArgumentError: - value = None - message_key = kwargs.get("key", "") or "" - partition = kwargs.get("partition", -1) - headers = get_argument_value(args, kwargs, 6, "headers", optional=True) or {} - with pin.tracer.trace( - schematize_messaging_operation(kafkax.PRODUCE, provider="kafka", direction=SpanDirection.OUTBOUND), - service=trace_utils.ext_service(pin, config.kafka), - span_type=SpanTypes.WORKER, - ) as span: - core.dispatch("kafka.produce.start", (instance, args, kwargs, isinstance(instance, _SerializingProducer), span)) - span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE) - span.set_tag_str(COMPONENT, config.kafka.integration_name) - span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) - span.set_tag_str(kafkax.TOPIC, topic) - - if _SerializingProducer is not None and isinstance(instance, _SerializingProducer): - serialized_key = serialize_key(instance, topic, message_key, headers) - if serialized_key is not None: - span.set_tag_str(kafkax.MESSAGE_KEY, serialized_key) - else: - span.set_tag_str(kafkax.MESSAGE_KEY, message_key) - - span.set_tag(kafkax.PARTITION, partition) - span.set_tag_str(kafkax.TOMBSTONE, str(value is None)) - span.set_tag(SPAN_MEASURED_KEY) - if instance._dd_bootstrap_servers is not None: - span.set_tag_str(kafkax.HOST_LIST, instance._dd_bootstrap_servers) - rate = config.kafka.get_analytics_sample_rate() - if rate is not None: - span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate) - - # inject headers with Datadog tags if trace propagation is enabled - if config.kafka.distributed_tracing_enabled: - # inject headers with Datadog tags: - headers = get_argument_value(args, kwargs, 6, "headers", True) or {} - Propagator.inject(span.context, headers) - args, kwargs = set_argument_value(args, kwargs, 6, "headers", headers, override_unset=True) - return func(*args, **kwargs) - - -def traced_poll_or_consume(func, instance, args, kwargs): - pin = Pin.get_from(instance) - if not pin or not pin.enabled(): - return func(*args, **kwargs) - - # we must get start time now since execute before starting a span in order to get distributed context - # if it exists - start_ns = time_ns() - err = None - result = None - try: - result = func(*args, **kwargs) - except Exception as e: - err = e - raise err - finally: - if isinstance(result, confluent_kafka.Message): - # poll returns a single message - _instrument_message([result], pin, start_ns, instance, err) - elif isinstance(result, list): - # consume returns a list of messages, - _instrument_message(result, pin, start_ns, instance, err) - elif config.kafka.trace_empty_poll_enabled: - _instrument_message([None], pin, start_ns, instance, err) - - return result - - -def _instrument_message(messages, pin, start_ns, instance, err): - ctx = None - # First message is used to extract context and enrich datadog spans - # This approach aligns with the opentelemetry confluent kafka semantics - first_message = messages[0] if len(messages) else None - if first_message is not None and config.kafka.distributed_tracing_enabled and first_message.headers(): - ctx = Propagator.extract(dict(first_message.headers())) - with pin.tracer.start_span( - name=schematize_messaging_operation(kafkax.CONSUME, provider="kafka", direction=SpanDirection.PROCESSING), - service=trace_utils.ext_service(pin, config.kafka), - span_type=SpanTypes.WORKER, - child_of=ctx if ctx is not None else pin.tracer.context_provider.active(), - activate=True, - ) as span: - # reset span start time to before function call - span.start_ns = start_ns - - for message in messages: - if message is not None and first_message is not None: - core.set_item("kafka_topic", first_message.topic()) - core.dispatch("kafka.consume.start", (instance, first_message, span)) - - span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE) - span.set_tag_str(COMPONENT, config.kafka.integration_name) - span.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) - span.set_tag_str(kafkax.RECEIVED_MESSAGE, str(first_message is not None)) - span.set_tag_str(kafkax.GROUP_ID, instance._group_id) - if first_message is not None: - message_key = first_message.key() or "" - message_offset = first_message.offset() or -1 - span.set_tag_str(kafkax.TOPIC, first_message.topic()) - - # If this is a deserializing consumer, do not set the key as a tag since we - # do not have the serialization function - if ( - (_DeserializingConsumer is not None and not isinstance(instance, _DeserializingConsumer)) - or isinstance(message_key, str) - or isinstance(message_key, bytes) - ): - span.set_tag_str(kafkax.MESSAGE_KEY, message_key) - span.set_tag(kafkax.PARTITION, first_message.partition()) - is_tombstone = False - try: - is_tombstone = len(first_message) == 0 - except TypeError: # https://github.com/confluentinc/confluent-kafka-python/issues/1192 - pass - span.set_tag_str(kafkax.TOMBSTONE, str(is_tombstone)) - span.set_tag(kafkax.MESSAGE_OFFSET, message_offset) - span.set_tag(SPAN_MEASURED_KEY) - rate = config.kafka.get_analytics_sample_rate() - if rate is not None: - span.set_tag(ANALYTICS_SAMPLE_RATE_KEY, rate) - - if err is not None: - span.set_exc_info(*sys.exc_info()) - - -def traced_commit(func, instance, args, kwargs): - pin = Pin.get_from(instance) - if not pin or not pin.enabled(): - return func(*args, **kwargs) - - core.dispatch("kafka.commit.start", (instance, args, kwargs)) - - return func(*args, **kwargs) - - -def serialize_key(instance, topic, key, headers): - if _SerializationContext is not None and _MessageField is not None: - ctx = _SerializationContext(topic, _MessageField.KEY, headers) - if hasattr(instance, "_key_serializer") and instance._key_serializer is not None: - try: - key = instance._key_serializer(key, ctx) - return key - except Exception: - log.debug("Failed to set Kafka Consumer key tag: %s", str(key)) - return None - else: - log.warning("Failed to set Kafka Consumer key tag, no method available to serialize key: %s", str(key)) - return None +# TODO: deprecate and remove this module diff --git a/ddtrace/contrib/kombu/__init__.py b/ddtrace/contrib/kombu/__init__.py index 009a4fe997f..c93edc54141 100644 --- a/ddtrace/contrib/kombu/__init__.py +++ b/ddtrace/contrib/kombu/__init__.py @@ -39,7 +39,11 @@ with require_modules(required_modules) as missing_modules: if not missing_modules: - from .patch import get_version - from .patch import patch + # Required to allow users to import from `ddtrace.contrib.kombu.patch` directly + from . import patch as _ # noqa: F401, I001 + + # Expose public methods + from ..internal.kombu.patch import get_version + from ..internal.kombu.patch import patch __all__ = ["patch", "get_version"] diff --git a/ddtrace/contrib/kombu/constants.py b/ddtrace/contrib/kombu/constants.py index bcada46cdfd..4a2067e511c 100644 --- a/ddtrace/contrib/kombu/constants.py +++ b/ddtrace/contrib/kombu/constants.py @@ -1 +1,15 @@ -DEFAULT_SERVICE = "kombu" +from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning +from ddtrace.vendor.debtcollector import deprecate + +from ..internal.kombu.constants import * # noqa: F401,F403 + + +def __getattr__(name): + deprecate( + ("%s.%s is deprecated" % (__name__, name)), + category=DDTraceDeprecationWarning, + ) + + if name in globals(): + return globals()[name] + raise AttributeError("%s has no attribute %s", __name__, name) diff --git a/ddtrace/contrib/kombu/patch.py b/ddtrace/contrib/kombu/patch.py index e1297c8d196..2324e765666 100644 --- a/ddtrace/contrib/kombu/patch.py +++ b/ddtrace/contrib/kombu/patch.py @@ -1,167 +1,4 @@ -import os +from ..internal.kombu.patch import * # noqa: F401,F403 -# 3p -import kombu -from ddtrace import config -from ddtrace.internal import core -from ddtrace.internal.constants import COMPONENT -from ddtrace.internal.schema import schematize_messaging_operation -from ddtrace.internal.schema import schematize_service_name -from ddtrace.internal.schema.span_attribute_schema import SpanDirection -from ddtrace.internal.utils.formats import asbool -from ddtrace.vendor import wrapt - -from ...constants import ANALYTICS_SAMPLE_RATE_KEY -from ...constants import SPAN_KIND -from ...constants import SPAN_MEASURED_KEY -from ...ext import SpanKind -from ...ext import SpanTypes -from ...ext import kombu as kombux -from ...internal.utils import get_argument_value -from ...internal.utils.wrappers import unwrap -from ...pin import Pin -from ...propagation.http import HTTPPropagator - -# project -from .. import trace_utils -from .constants import DEFAULT_SERVICE -from .utils import HEADER_POS -from .utils import extract_conn_tags -from .utils import get_body_length_from_args -from .utils import get_exchange_from_args -from .utils import get_routing_key_from_args - - -def get_version(): - # type: () -> str - return str(kombu.__version__) - - -# kombu default settings - -config._add( - "kombu", - { - "distributed_tracing_enabled": asbool(os.getenv("DD_KOMBU_DISTRIBUTED_TRACING", default=True)), - "service_name": config.service or os.getenv("DD_KOMBU_SERVICE_NAME", default=DEFAULT_SERVICE), - }, -) - -propagator = HTTPPropagator - - -def patch(): - """Patch the instrumented methods - - This duplicated doesn't look nice. The nicer alternative is to use an ObjectProxy on top - of Kombu. However, it means that any "import kombu.Connection" won't be instrumented. - """ - if getattr(kombu, "_datadog_patch", False): - return - kombu._datadog_patch = True - - _w = wrapt.wrap_function_wrapper - # We wrap the _publish method because the publish method: - # * defines defaults in its kwargs - # * potentially overrides kwargs with values from self - # * extracts/normalizes things like exchange - _w("kombu", "Producer._publish", traced_publish) - _w("kombu", "Consumer.receive", traced_receive) - - # We do not provide a service for producer spans since they represent - # external calls to another service. - # Instead the service should be inherited from the parent. - if config.service: - prod_service = None - # DEV: backwards-compatibility for users who set a kombu service - else: - prod_service = os.getenv("DD_KOMBU_SERVICE_NAME", default=DEFAULT_SERVICE) - - Pin( - service=schematize_service_name(prod_service), - ).onto(kombu.messaging.Producer) - - Pin(service=schematize_service_name(config.kombu["service_name"])).onto(kombu.messaging.Consumer) - - -def unpatch(): - if getattr(kombu, "_datadog_patch", False): - kombu._datadog_patch = False - unwrap(kombu.Producer, "_publish") - unwrap(kombu.Consumer, "receive") - - -# -# tracing functions -# - - -def traced_receive(func, instance, args, kwargs): - pin = Pin.get_from(instance) - if not pin or not pin.enabled(): - return func(*args, **kwargs) - - # Signature only takes 2 args: (body, message) - message = get_argument_value(args, kwargs, 1, "message") - - trace_utils.activate_distributed_headers(pin.tracer, request_headers=message.headers, int_config=config.kombu) - - with pin.tracer.trace( - schematize_messaging_operation(kombux.RECEIVE_NAME, provider="kombu", direction=SpanDirection.PROCESSING), - service=pin.service, - span_type=SpanTypes.WORKER, - ) as s: - s.set_tag_str(COMPONENT, config.kombu.integration_name) - - # set span.kind to the type of operation being performed - s.set_tag_str(SPAN_KIND, SpanKind.CONSUMER) - - s.set_tag(SPAN_MEASURED_KEY) - # run the command - exchange = message.delivery_info["exchange"] - s.resource = exchange - s.set_tag_str(kombux.EXCHANGE, exchange) - - s.set_tags(extract_conn_tags(message.channel.connection)) - s.set_tag_str(kombux.ROUTING_KEY, message.delivery_info["routing_key"]) - # set analytics sample rate - s.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config.kombu.get_analytics_sample_rate()) - result = func(*args, **kwargs) - core.dispatch("kombu.amqp.receive.post", [instance, message, s]) - return result - - -def traced_publish(func, instance, args, kwargs): - pin = Pin.get_from(instance) - if not pin or not pin.enabled(): - return func(*args, **kwargs) - - with pin.tracer.trace( - schematize_messaging_operation(kombux.PUBLISH_NAME, provider="kombu", direction=SpanDirection.OUTBOUND), - service=pin.service, - span_type=SpanTypes.WORKER, - ) as s: - s.set_tag_str(COMPONENT, config.kombu.integration_name) - - # set span.kind to the type of operation being performed - s.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) - - s.set_tag(SPAN_MEASURED_KEY) - exchange_name = get_exchange_from_args(args) - s.resource = exchange_name - s.set_tag_str(kombux.EXCHANGE, exchange_name) - if pin.tags: - s.set_tags(pin.tags) - s.set_tag_str(kombux.ROUTING_KEY, get_routing_key_from_args(args)) - s.set_tags(extract_conn_tags(instance.channel.connection)) - s.set_metric(kombux.BODY_LEN, get_body_length_from_args(args)) - # set analytics sample rate - s.set_tag(ANALYTICS_SAMPLE_RATE_KEY, config.kombu.get_analytics_sample_rate()) - # run the command - if config.kombu.distributed_tracing_enabled: - propagator.inject(s.context, args[HEADER_POS]) - core.dispatch( - "kombu.amqp.publish.pre", [args, kwargs, s] - ) # Has to happen after trace injection for actual payload size - return func(*args, **kwargs) +# TODO: deprecate and remove this module diff --git a/ddtrace/contrib/kombu/utils.py b/ddtrace/contrib/kombu/utils.py index fcce88a54ae..4686328da0a 100644 --- a/ddtrace/contrib/kombu/utils.py +++ b/ddtrace/contrib/kombu/utils.py @@ -1,49 +1,15 @@ -""" -Some utils used by the dogtrace kombu integration -""" -from ...ext import kombu as kombux -from ...ext import net +from ddtrace.internal.utils.deprecations import DDTraceDeprecationWarning +from ddtrace.vendor.debtcollector import deprecate +from ..internal.kombu.utils import * # noqa: F401,F403 -PUBLISH_BODY_IDX = 0 -PUBLISH_ROUTING_KEY = 6 -PUBLISH_EXCHANGE_IDX = 9 -HEADER_POS = 4 +def __getattr__(name): + deprecate( + ("%s.%s is deprecated" % (__name__, name)), + category=DDTraceDeprecationWarning, + ) - -def extract_conn_tags(connection): - """Transform kombu conn info into dogtrace metas""" - try: - host, port = connection.host.split(":") - return { - net.TARGET_HOST: host, - net.TARGET_PORT: port, - kombux.VHOST: connection.virtual_host, - } - except AttributeError: - # Unlikely that we don't have .host or .virtual_host but let's not die over it - return {} - - -def get_exchange_from_args(args): - """Extract the exchange - - The publish method extracts the name and hands that off to _publish (what we patch) - """ - - return args[PUBLISH_EXCHANGE_IDX] - - -def get_routing_key_from_args(args): - """Extract the routing key""" - - name = args[PUBLISH_ROUTING_KEY] - return name - - -def get_body_length_from_args(args): - """Extract the length of the body""" - - length = len(args[PUBLISH_BODY_IDX]) - return length + if name in globals(): + return globals()[name] + raise AttributeError("%s has no attribute %s", __name__, name) diff --git a/releasenotes/notes/move-integrations-to-internal-httplib-475d071a277b18cf.yaml b/releasenotes/notes/move-integrations-to-internal-httplib-475d071a277b18cf.yaml new file mode 100644 index 00000000000..31db92ee27e --- /dev/null +++ b/releasenotes/notes/move-integrations-to-internal-httplib-475d071a277b18cf.yaml @@ -0,0 +1,12 @@ +--- +deprecations: + - | + httplib: Deprecates all modules in the ``ddtrace.contrib.httplib`` package. Use attributes exposed in ``ddtrace.contrib.httplib.__all__`` instead. + - | + httpx: Deprecates all modules in the ``ddtrace.contrib.httpx`` package. Use attributes exposed in ``ddtrace.contrib.httpx.__all__`` instead. + - | + jinja2: Deprecates all modules in the ``ddtrace.contrib.jinja2`` package. Use attributes exposed in ``ddtrace.contrib.jinja2.__all__`` instead. + - | + kafka: Deprecates all modules in the ``ddtrace.contrib.kafka`` package. Use attributes exposed in ``ddtrace.contrib.kafka.__all__`` instead. + - | + kombu: Deprecates all modules in the ``ddtrace.contrib.kombu`` package. Use attributes exposed in ``ddtrace.contrib.kombu.__all__`` instead. \ No newline at end of file diff --git a/tests/.suitespec.json b/tests/.suitespec.json index bf016e9f3d4..666bd0cc46d 100644 --- a/tests/.suitespec.json +++ b/tests/.suitespec.json @@ -300,10 +300,12 @@ "ddtrace/contrib/mako/*" ], "jinja2": [ - "ddtrace/contrib/jinja2/*" + "ddtrace/contrib/jinja2/*", + "ddtrace/contrib/internal/jinja2/*" ], "kombu": [ "ddtrace/contrib/kombu/*", + "ddtrace/contrib/internal/kombu/*", "ddtrace/ext/kombu.py" ], "wsgi": [ @@ -318,6 +320,7 @@ ], "kafka": [ "ddtrace/contrib/kafka/*", + "ddtrace/contrib/internal/kafka/*", "ddtrace/ext/kafka.py" ], "graphql": [ @@ -332,10 +335,12 @@ "ddtrace/contrib/gunicorn/*" ], "httplib": [ - "ddtrace/contrib/httplib/*" + "ddtrace/contrib/httplib/*", + "ddtrace/contrib/internal/httplib/*" ], "httpx": [ - "ddtrace/contrib/httpx/*" + "ddtrace/contrib/httpx/*", + "ddtrace/contrib/internal/httpx/*" ], "mariadb": [ "ddtrace/contrib/mariadb/*"