Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add AsyncDDOGTracer #1247

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/reference/lifecycle-hooks/DDOGTracer.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,8 @@ plugins.h_ddog.DDOGTracer
:special-members: __init__
:members:
:inherited-members:

.. autoclass:: hamilton.plugins.h_ddog.AsyncDDOGTracer
:special-members: __init__
:members:
:inherited-members:
292 changes: 263 additions & 29 deletions hamilton/plugins/h_ddog.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import logging
from typing import Any, Dict, Optional
from types import ModuleType
from typing import Any, Dict, List, Optional

from hamilton import lifecycle
from hamilton import graph as h_graph
from hamilton import lifecycle, node
from hamilton.lifecycle import base

logger = logging.getLogger(__name__)
try:
Expand All @@ -16,35 +19,17 @@
raise


class DDOGTracer(
lifecycle.NodeExecutionHook, lifecycle.GraphExecutionHook, lifecycle.TaskExecutionHook
):
"""Lifecycle adapter to use datadog to run tracing on node execution. This works with the following execution environments:
1. Vanilla Hamilton -- no task-based computation, just nodes
2. Task-based, synchronous
3. Task-based with Multithreading, Ray, and Dask
It will likely work with others, although we have not yet tested them. This does not work with async (yet).

Note that this is not a typical use of Datadog if you're not using hamilton for a microservice. It does work quite nicely, however!
Monitoring ETLs is not a typical datadog case (you can't see relationships between nodes/tasks or data summaries),
but it is easy enough to work with and gives some basic information.
class _DDOGTracerImpl:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a docstring for the reasoning of having a composition-based class (rather than inheritance) -- e.g. sync versus async. Good choice, IMO, but worth clarifying in the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added!

"""Implementation class for DDOGTracer and AsyncDDOGTracer functionality.

This tracer bypasses context management so we can more accurately track relationships between nodes/tags. Also, we plan to
get this working with OpenTelemetry, and use that for datadog integration.

To use this, you'll want to run `pip install sf-hamilton[ddog]` (or `pip install "sf-hamilton[ddog]"` if using zsh)
This class encapsulates the core logic for Datadog tracing within Hamilton's lifecycle hooks.
It provides methods to handle the tracing operations required before and after the execution
of graphs, nodes, and tasks. The DDOGTracer and AsyncDDOGTracer classes are composed of this implementation class,
due to the differences in their base lifecycle classes (sync vs async). This class allows sharing logic between the
two without duplicating code or interfering with the base classes they inherit from.
"""

def __init__(self, root_name: str, include_causal_links: bool = False, service: str = None):
"""Creates a DDOGTracer. This has the option to specify some parameters.

:param root_name: Name of the root trace/span. Due to the way datadog inherits, this will inherit an active span.
:param include_causal_links: Whether or not to include span causal links. Note that there are some edge-cases here, and
This is in beta for datadog, and actually broken in the current client, but it has been fixed and will be released shortly:
https://github.com/DataDog/dd-trace-py/issues/8049. Furthermore, the query on datadog is slow for displaying causal links.
We've disabled this by default, but feel free to test it out -- its likely they'll be improving the docum
:param service: Service name -- will pick it up from the environment through DDOG if not available.
"""
self.root_name = root_name
self.service = service
self.include_causal_links = include_causal_links
Expand Down Expand Up @@ -130,7 +115,11 @@ def run_before_graph_execution(self, *, run_id: str, **future_kwargs: Any):
:param run_id: ID of the run
:param future_kwargs: reserved for future keyword arguments/backwards compatibility.
"""
span = tracer.start_span(name=self.root_name, activate=True, service=self.service)
# This returns None if there's no active context and works as a no-op, otherwise we tie this span to a parent.
current_context = tracer.current_trace_context()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returns None if there's no active trace, so will be a safe no-op if there isn't one. child_of in start_span is None by default, so we're just assigning the parent if it exists.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth adding this as a comment

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea — added!

span = tracer.start_span(
name=self.root_name, child_of=current_context, activate=True, service=self.service
)
self.run_span_cache[run_id] = span # we save this as a root span
self.node_span_cache[run_id] = {}
self.task_span_cache[run_id] = {}
Expand Down Expand Up @@ -176,7 +165,7 @@ def run_before_node_execution(
)
tags = node_tags.copy()
tags["hamilton.node_name"] = node_name
new_span.set_tags(DDOGTracer._sanitize_tags(tags=tags))
new_span.set_tags(self._sanitize_tags(tags=tags))
self.node_span_cache[run_id][(task_id, node_name)] = new_span

def run_after_node_execution(
Expand Down Expand Up @@ -267,3 +256,248 @@ def run_after_task_execution(
exc_value = error
tb = error.__traceback__
span.__exit__(exc_type, exc_value, tb)


class DDOGTracer(
lifecycle.NodeExecutionHook, lifecycle.GraphExecutionHook, lifecycle.TaskExecutionHook
):
"""Lifecycle adapter to use datadog to run tracing on node execution. This works with the following execution environments:
skrawcz marked this conversation as resolved.
Show resolved Hide resolved
1. Vanilla Hamilton -- no task-based computation, just nodes
2. Task-based, synchronous
3. Task-based with Multithreading, Ray, and Dask
It will likely work with others, although we have not yet tested them. This does not work with async (yet).

Note that this is not a typical use of Datadog if you're not using hamilton for a microservice. It does work quite nicely, however!
Monitoring ETLs is not a typical datadog case (you can't see relationships between nodes/tasks or data summaries),
but it is easy enough to work with and gives some basic information.

This tracer bypasses context management so we can more accurately track relationships between nodes/tags. Also, we plan to
get this working with OpenTelemetry, and use that for datadog integration.

To use this, you'll want to run `pip install sf-hamilton[ddog]` (or `pip install "sf-hamilton[ddog]"` if using zsh)
"""

def __init__(self, root_name: str, include_causal_links: bool = False, service: str = None):
"""Creates a DDOGTracer. This has the option to specify some parameters.

:param root_name: Name of the root trace/span. Due to the way datadog inherits, this will inherit an active span.
:param include_causal_links: Whether or not to include span causal links. Note that there are some edge-cases here, and
This is in beta for datadog, and actually broken in the current client, but it has been fixed and will be released shortly:
https://github.com/DataDog/dd-trace-py/issues/8049. Furthermore, the query on datadog is slow for displaying causal links.
We've disabled this by default, but feel free to test it out -- its likely they'll be improving the docum
:param service: Service name -- will pick it up from the environment through DDOG if not available.
"""
self._impl = _DDOGTracerImpl(
root_name=root_name, include_causal_links=include_causal_links, service=service
)

def run_before_graph_execution(self, *, run_id: str, **future_kwargs: Any):
"""Runs before graph execution -- sets the state so future ones can reference it.

:param run_id: ID of the run
:param future_kwargs: reserved for future keyword arguments/backwards compatibility.
"""
self._impl.run_before_graph_execution(run_id=run_id, **future_kwargs)

def run_before_node_execution(
self,
*,
node_name: str,
node_kwargs: Dict[str, Any],
node_tags: Dict[str, Any],
task_id: Optional[str],
run_id: str,
**future_kwargs: Any,
):
"""Runs before a node's execution. Sets up/stores spans.

:param node_name: Name of the node.
:param node_kwargs: Keyword arguments of the node.
:param node_tags: Tags of the node (they'll get stored as datadog tags)
:param task_id: Task ID that spawned the node
:param run_id: ID of the run.
:param future_kwargs: reserved for future keyword arguments/backwards compatibility.
"""
self._impl.run_before_node_execution(
node_name=node_name,
node_kwargs=node_kwargs,
node_tags=node_tags,
task_id=task_id,
run_id=run_id,
**future_kwargs,
)

def run_after_node_execution(
self,
*,
node_name: str,
error: Optional[Exception],
task_id: Optional[str],
run_id: str,
**future_kwargs: Any,
):
"""Runs after a node's execution -- completes the span.

:param node_name: Name of the node
:param error: Error that the node raised, if any
:param task_id: Task ID that spawned the node
:param run_id: ID of the run.
:param future_kwargs: reserved for future keyword arguments/backwards compatibility.
"""
self._impl.run_after_node_execution(
node_name=node_name, error=error, task_id=task_id, run_id=run_id, **future_kwargs
)

def run_after_graph_execution(
self, *, error: Optional[Exception], run_id: str, **future_kwargs: Any
):
"""Runs after graph execution. Garbage collects + finishes the root span.

:param error: Error the graph raised when running, if any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The run_after_task_execution method is not utilized in AsyncDDOGTracer. Consider implementing this method for async task execution as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is rightly pointed out, but not sure if I need it: is async mode & dynamic/task mode compatible? I assume "task" here refers to the parallelize/collect flow.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is specifically not supported (right now). I think this is good for now -- add a note about it.

:param run_id: ID of the run
:param future_kwargs: reserved for future keyword arguments/backwards compatibility.
"""
self._impl.run_after_graph_execution(error=error, run_id=run_id, **future_kwargs)

def run_before_task_execution(self, *, task_id: str, run_id: str, **future_kwargs):
"""Runs before task execution. Sets up the task span.

:param task_id: ID of the task
:param run_id: ID of the run,
:param future_kwargs: reserved for future keyword arguments/backwards compatibility.
"""
self._impl.run_before_task_execution(task_id=task_id, run_id=run_id, **future_kwargs)

def run_after_task_execution(
self,
*,
task_id: str,
run_id: str,
error: Exception,
**future_kwargs,
):
"""Rusn after task execution. Finishes task-level spans.

:param task_id: ID of the task, ID of the run.
:param run_id: ID of the run
:param error: Error the graph raised when running, if any
:param future_kwargs: Future keyword arguments for backwards compatibility
"""
self._impl.run_after_task_execution(
task_id=task_id, run_id=run_id, error=error, **future_kwargs
)


class AsyncDDOGTracer(
base.BasePostGraphConstructAsync,
base.BasePreGraphExecuteAsync,
base.BasePreNodeExecuteAsync,
base.BasePostNodeExecuteAsync,
base.BasePostGraphExecuteAsync,
):
def __init__(
self, root_name: str, include_causal_links: bool = False, service: str | None = None
):
"""Creates a AsyncDDOGTracer, the asyncio-friendly version of DDOGTracer.

This has the option to specify some parameters:

:param root_name: Name of the root trace/span. Due to the way datadog inherits, this will inherit an active span.
:param include_causal_links: Whether or not to include span causal links. Note that there are some edge-cases here, and
This is in beta for datadog, and actually broken in the current client, but it has been fixed and will be released shortly:
https://github.com/DataDog/dd-trace-py/issues/8049. Furthermore, the query on datadog is slow for displaying causal links.
We've disabled this by default, but feel free to test it out -- its likely they'll be improving the docum
:param service: Service name -- will pick it up from the environment through DDOG if not available.
"""
self._impl = _DDOGTracerImpl(
root_name=root_name, include_causal_links=include_causal_links, service=service
)

async def post_graph_construct(
self, graph: h_graph.FunctionGraph, modules: List[ModuleType], config: Dict[str, Any]
) -> None:
"""Runs after graph construction. This is a no-op for this plugin.

:param graph: Graph that has been constructed.
:param modules: Modules passed into the graph
:param config: Config passed into the graph
"""
pass

async def pre_graph_execute(
self,
run_id: str,
graph: h_graph.FunctionGraph,
final_vars: List[str],
inputs: Dict[str, Any],
overrides: Dict[str, Any],
) -> None:
"""Runs before graph execution -- sets the state so future ones can reference it.

:param run_id: ID of the run, unique in scope of the driver.
:param graph: Graph that is being executed
:param final_vars: Variables we are extracting from the graph
:param inputs: Inputs to the graph
:param overrides: Overrides to graph execution
"""
self._impl.run_before_graph_execution(run_id=run_id)

async def pre_node_execute(
self, run_id: str, node_: node.Node, kwargs: Dict[str, Any], task_id: Optional[str] = None
) -> None:
"""Runs before a node's execution. Sets up/stores spans.

:param run_id: ID of the run, unique in scope of the driver.
:param node_: Node that is being executed
:param kwargs: Keyword arguments that are being passed into the node
:param task_id: ID of the task, defaults to None if not in a task setting
"""
self._impl.run_before_node_execution(
node_name=node_.name,
node_kwargs=kwargs,
node_tags=node_.tags,
task_id=task_id,
run_id=run_id,
)

async def post_node_execute(
self,
run_id: str,
node_: node.Node,
success: bool,
error: Optional[Exception],
result: Any,
task_id: Optional[str] = None,
**future_kwargs: dict,
) -> None:
"""Runs after a node's execution -- completes the span.

:param run_id: ID of the run, unique in scope of the driver.
:param node_: Node that is being executed
:param kwargs: Keyword arguments that are being passed into the node
:param success: Whether or not the node executed successfully
:param error: The error that was raised, if any
:param result: The result of the node execution, if no error was raised
:param task_id: ID of the task, defaults to None if not in a task-based execution
"""
self._impl.run_after_node_execution(
node_name=node_.name, error=error, task_id=task_id, run_id=run_id
)

async def post_graph_execute(
self,
run_id: str,
graph: h_graph.FunctionGraph,
success: bool,
error: Optional[Exception],
results: Optional[Dict[str, Any]],
) -> None:
"""Runs after graph execution. Garbage collects + finishes the root span.

:param run_id: ID of the run, unique in scope of the driver.
:param graph: Graph that was executed
:param success: Whether or not the graph executed successfully
:param error: Error that was raised, if any
:param results: Results of the graph execution
"""
self._impl.run_after_graph_execution(error=error, run_id=run_id)