From 1b4f69b430a1ccdb39c6b1fc5cd1f49bc908d897 Mon Sep 17 00:00:00 2001 From: Ryan Whitten Date: Mon, 2 Dec 2024 13:23:51 -0500 Subject: [PATCH 1/4] Add AsyncDDOGTracer --- hamilton/plugins/h_ddog.py | 209 ++++++++++++++++++++++++++++++++++++- 1 file changed, 207 insertions(+), 2 deletions(-) diff --git a/hamilton/plugins/h_ddog.py b/hamilton/plugins/h_ddog.py index 43260a52a..7ba134204 100644 --- a/hamilton/plugins/h_ddog.py +++ b/hamilton/plugins/h_ddog.py @@ -1,7 +1,10 @@ import logging -from typing import Any, Dict, Optional +from types import ModuleType +from typing import Any, Dict, List, Optional -from hamilton import lifecycle +from hamilton import graph as h_graph +from hamilton import lifecycle, node +from hamilton.lifecycle import base logger = logging.getLogger(__name__) try: @@ -267,3 +270,205 @@ def run_after_task_execution( exc_value = error tb = error.__traceback__ span.__exit__(exc_type, exc_value, tb) + + +class AsyncDDOGTracer( + base.BasePostGraphConstructAsync, + base.BasePreGraphExecuteAsync, + base.BasePreNodeExecuteAsync, + base.BasePostNodeExecuteAsync, + base.BasePostGraphExecuteAsync, +): + def __init__( + self, root_name: str, include_causal_links: bool = False, service: str | None = None + ): + """Creates a AsyncDDOGTracer. This has the option to specify some parameters. + + :param root_name: Name of the root trace/span. Due to the way datadog inherits, this will inherit an active span. + :param include_causal_links: Whether or not to include span causal links. Note that there are some edge-cases here, and + This is in beta for datadog, and actually broken in the current client, but it has been fixed and will be released shortly: + https://github.com/DataDog/dd-trace-py/issues/8049. Furthermore, the query on datadog is slow for displaying causal links. + We've disabled this by default, but feel free to test it out -- its likely they'll be improving the docum + :param service: Service name -- will pick it up from the environment through DDOG if not available. + """ + self.root_name = root_name + self.service = service + self.include_causal_links = include_causal_links + self.run_span_cache: dict[str, Any] = {} # Cache of run_id -> span tuples + self.task_span_cache: dict[ + str, Any + ] = {} # cache of run_iod -> task_id -> span. Note that we will prune this after task execution + self.node_span_cache: dict[ + str, Any + ] = {} # Cache of run_id -> [task_id, node_id] -> span. We use this to open/close general traces + + @staticmethod + def _serialize_span_dict(span_dict: Dict[str, Span]) -> Dict[str, dict]: + """Serializes to a readable format. We're not propogating span links (see note above on causal links), + but that's fine (for now). We have to do this as passing spans back and forth is frowned upon. + + :param span_dict: A key -> span dictionary + :return: The serialized representation. + """ + # For some reason this doesn't use the right ser/deser for dask + # Or for some reason it has contexts instead of spans. Well, we can serialize them both! + return { + key: { + "trace_id": span.context.trace_id if isinstance(span, Span) else span.trace_id, + "span_id": span.context.span_id if isinstance(span, Span) else span.span_id, + } + for key, span in span_dict.items() + } + + @staticmethod + def _deserialize_span_dict(serialized_repr: Dict[str, dict]) -> Dict[str, context.Context]: + """Note that we deserialize as contexts, as passing spans is not supported + (the child should never terminate the parent span). + + :param span_dict: Dict of str -> dict params for contexts + :return: A dictionary of contexts + """ + return {key: context.Context(**params) for key, params in serialized_repr.items()} + + def __getstate__(self) -> dict[str, Any]: + """Gets the state for serialization.""" + return dict( + root_trace_name=self.root_name, + service=self.service, + include_causal_links=self.include_causal_links, + run_span_cache=self._serialize_span_dict(self.run_span_cache), + task_span_cache={ + key: self._serialize_span_dict(value) for key, value in self.task_span_cache.items() + }, + # this is unnecessary, but leaving it here for now + # to remove it, we need to add a default check in the one that adds to the nodes + node_span_cache={ + key: self._serialize_span_dict(value) for key, value in self.task_span_cache.items() + }, # Nothing here, we can just wipe it for a new task + ) + + def __setstate__(self, state: dict) -> None: + """Sets the state for serialization.""" + self.service = state["service"] + self.root_name = state["root_trace_name"] + self.include_causal_links = state["include_causal_links"] + # TODO -- move this out/consider doing it to the others + self.run_span_cache = self._deserialize_span_dict(state["run_span_cache"]) + # We only really need this if we log the stuff before submitting... + # This shouldn't happen but it leaves flexibility for the future + self.task_span_cache = { + key: self._deserialize_span_dict(value) + for key, value in state["task_span_cache"].items() + } + self.node_span_cache = { + key: self._deserialize_span_dict(value) + for key, value in state["node_span_cache"].items() + } + + @staticmethod + def _sanitize_tags(tags: Dict[str, Any]) -> Dict[str, str]: + """Sanitizes tags to be strings, just in case. + + :param tags: Node tags. + :return: The string -> string representation of tags + """ + return {f"hamilton.{key}": str(value) for key, value in tags.items()} + + async def post_graph_construct( + self, graph: h_graph.FunctionGraph, modules: List[ModuleType], config: Dict[str, Any] + ) -> None: + pass + + async def pre_graph_execute( + self, + run_id: str, + graph: h_graph.FunctionGraph, + final_vars: List[str], + inputs: Dict[str, Any], + overrides: Dict[str, Any], + ) -> None: + current_context = ( + tracer.current_trace_context() + ) # Get the current ctx, if available, otherwise returns None + span = tracer.start_span( + name=self.root_name, child_of=current_context, activate=True, service=self.service + ) + self.run_span_cache[run_id] = span # we save this as a root span + self.node_span_cache[run_id] = {} + self.task_span_cache[run_id] = {} + + async def pre_node_execute( + self, run_id: str, node_: node.Node, kwargs: Dict[str, Any], task_id: Optional[str] = None + ) -> None: + """Runs before a node's execution. Sets up/stores spans. + + :param node_name: Name of the node. + :param node_kwargs: Keyword arguments of the node. + :param node_tags: Tags of the node (they'll get stored as datadog tags) + :param task_id: Task ID that spawned the node + :param run_id: ID of the run. + :param future_kwargs: reserved for future keyword arguments/backwards compatibility. + """ + # We need to do this on launching tasks and we have not yet exposed it. + # TODO -- do pre-task and post-task execution. + parent_span = self.task_span_cache[run_id].get(task_id) or self.run_span_cache[run_id] + new_span_name = f"{task_id}:" if task_id is not None else "" + new_span_name += node_.name + new_span = tracer.start_span( + name=new_span_name, child_of=parent_span, activate=True, service=self.service + ) + if self.include_causal_links: + prior_spans = {key: self.node_span_cache[run_id].get((task_id, key)) for key in kwargs} + for input_node, span in prior_spans.items(): + if span is not None: + new_span.link_span( + context=span.context, + attributes={ + "link.name": f"{input_node}_to_{node_.name}", + }, + ) + tags = node_.tags.copy() + tags["hamilton.node_name"] = node_.name + new_span.set_tags(AsyncDDOGTracer._sanitize_tags(tags=tags)) # type: ignore[arg-type] + self.node_span_cache[run_id][(task_id, node_.name)] = new_span + + async def post_node_execute( + self, + run_id: str, + node_: node.Node, + success: bool, + error: Optional[Exception], + result: Any, + task_id: Optional[str] = None, + **future_kwargs: dict, + ) -> None: + span = self.node_span_cache[run_id][(task_id, node_.name)] + exc_type = None + exc_value = None + tb = None + if error is not None: + exc_type = type(error) + exc_value = error + tb = error.__traceback__ + span.__exit__(exc_type, exc_value, tb) + + async def post_graph_execute( + self, + run_id: str, + graph: h_graph.FunctionGraph, + success: bool, + error: Optional[Exception], + results: Optional[Dict[str, Any]], + ) -> None: + span = self.run_span_cache[run_id] + exc_type = None + exc_value = None + tb = None + if error is not None: + exc_type = type(error) + exc_value = error + tb = error.__traceback__ + span.__exit__(exc_type, exc_value, tb) + del self.run_span_cache[run_id] + del self.node_span_cache[run_id] + del self.task_span_cache[run_id] From 37f7d65ea8710b0298016ce7f558551653866957 Mon Sep 17 00:00:00 2001 From: Ryan Whitten Date: Mon, 2 Dec 2024 13:40:07 -0500 Subject: [PATCH 2/4] Refactor DDOG tracers to use common impl class --- docs/reference/lifecycle-hooks/DDOGTracer.rst | 5 + hamilton/plugins/h_ddog.py | 351 +++++++++--------- 2 files changed, 190 insertions(+), 166 deletions(-) diff --git a/docs/reference/lifecycle-hooks/DDOGTracer.rst b/docs/reference/lifecycle-hooks/DDOGTracer.rst index 7f781dd02..f4a10c59f 100644 --- a/docs/reference/lifecycle-hooks/DDOGTracer.rst +++ b/docs/reference/lifecycle-hooks/DDOGTracer.rst @@ -7,3 +7,8 @@ plugins.h_ddog.DDOGTracer :special-members: __init__ :members: :inherited-members: + +.. autoclass:: hamilton.plugins.h_ddog.AsyncDDOGTracer + :special-members: __init__ + :members: + :inherited-members: diff --git a/hamilton/plugins/h_ddog.py b/hamilton/plugins/h_ddog.py index 7ba134204..204122cbb 100644 --- a/hamilton/plugins/h_ddog.py +++ b/hamilton/plugins/h_ddog.py @@ -19,35 +19,8 @@ raise -class DDOGTracer( - lifecycle.NodeExecutionHook, lifecycle.GraphExecutionHook, lifecycle.TaskExecutionHook -): - """Lifecycle adapter to use datadog to run tracing on node execution. This works with the following execution environments: - 1. Vanilla Hamilton -- no task-based computation, just nodes - 2. Task-based, synchronous - 3. Task-based with Multithreading, Ray, and Dask - It will likely work with others, although we have not yet tested them. This does not work with async (yet). - - Note that this is not a typical use of Datadog if you're not using hamilton for a microservice. It does work quite nicely, however! - Monitoring ETLs is not a typical datadog case (you can't see relationships between nodes/tasks or data summaries), - but it is easy enough to work with and gives some basic information. - - This tracer bypasses context management so we can more accurately track relationships between nodes/tags. Also, we plan to - get this working with OpenTelemetry, and use that for datadog integration. - - To use this, you'll want to run `pip install sf-hamilton[ddog]` (or `pip install "sf-hamilton[ddog]"` if using zsh) - """ - +class _DDOGTracerImpl: def __init__(self, root_name: str, include_causal_links: bool = False, service: str = None): - """Creates a DDOGTracer. This has the option to specify some parameters. - - :param root_name: Name of the root trace/span. Due to the way datadog inherits, this will inherit an active span. - :param include_causal_links: Whether or not to include span causal links. Note that there are some edge-cases here, and - This is in beta for datadog, and actually broken in the current client, but it has been fixed and will be released shortly: - https://github.com/DataDog/dd-trace-py/issues/8049. Furthermore, the query on datadog is slow for displaying causal links. - We've disabled this by default, but feel free to test it out -- its likely they'll be improving the docum - :param service: Service name -- will pick it up from the environment through DDOG if not available. - """ self.root_name = root_name self.service = service self.include_causal_links = include_causal_links @@ -133,7 +106,10 @@ def run_before_graph_execution(self, *, run_id: str, **future_kwargs: Any): :param run_id: ID of the run :param future_kwargs: reserved for future keyword arguments/backwards compatibility. """ - span = tracer.start_span(name=self.root_name, activate=True, service=self.service) + current_context = tracer.current_trace_context() + span = tracer.start_span( + name=self.root_name, child_of=current_context, activate=True, service=self.service + ) self.run_span_cache[run_id] = span # we save this as a root span self.node_span_cache[run_id] = {} self.task_span_cache[run_id] = {} @@ -272,17 +248,27 @@ def run_after_task_execution( span.__exit__(exc_type, exc_value, tb) -class AsyncDDOGTracer( - base.BasePostGraphConstructAsync, - base.BasePreGraphExecuteAsync, - base.BasePreNodeExecuteAsync, - base.BasePostNodeExecuteAsync, - base.BasePostGraphExecuteAsync, +class DDOGTracer( + lifecycle.NodeExecutionHook, lifecycle.GraphExecutionHook, lifecycle.TaskExecutionHook ): - def __init__( - self, root_name: str, include_causal_links: bool = False, service: str | None = None - ): - """Creates a AsyncDDOGTracer. This has the option to specify some parameters. + """Lifecycle adapter to use datadog to run tracing on node execution. This works with the following execution environments: + 1. Vanilla Hamilton -- no task-based computation, just nodes + 2. Task-based, synchronous + 3. Task-based with Multithreading, Ray, and Dask + It will likely work with others, although we have not yet tested them. This does not work with async (yet). + + Note that this is not a typical use of Datadog if you're not using hamilton for a microservice. It does work quite nicely, however! + Monitoring ETLs is not a typical datadog case (you can't see relationships between nodes/tasks or data summaries), + but it is easy enough to work with and gives some basic information. + + This tracer bypasses context management so we can more accurately track relationships between nodes/tags. Also, we plan to + get this working with OpenTelemetry, and use that for datadog integration. + + To use this, you'll want to run `pip install sf-hamilton[ddog]` (or `pip install "sf-hamilton[ddog]"` if using zsh) + """ + + def __init__(self, root_name: str, include_causal_links: bool = False, service: str = None): + """Creates a DDOGTracer. This has the option to specify some parameters. :param root_name: Name of the root trace/span. Due to the way datadog inherits, this will inherit an active span. :param include_causal_links: Whether or not to include span causal links. Note that there are some edge-cases here, and @@ -291,92 +277,141 @@ def __init__( We've disabled this by default, but feel free to test it out -- its likely they'll be improving the docum :param service: Service name -- will pick it up from the environment through DDOG if not available. """ - self.root_name = root_name - self.service = service - self.include_causal_links = include_causal_links - self.run_span_cache: dict[str, Any] = {} # Cache of run_id -> span tuples - self.task_span_cache: dict[ - str, Any - ] = {} # cache of run_iod -> task_id -> span. Note that we will prune this after task execution - self.node_span_cache: dict[ - str, Any - ] = {} # Cache of run_id -> [task_id, node_id] -> span. We use this to open/close general traces + self._impl = _DDOGTracerImpl( + root_name=root_name, include_causal_links=include_causal_links, service=service + ) - @staticmethod - def _serialize_span_dict(span_dict: Dict[str, Span]) -> Dict[str, dict]: - """Serializes to a readable format. We're not propogating span links (see note above on causal links), - but that's fine (for now). We have to do this as passing spans back and forth is frowned upon. + def run_before_graph_execution(self, *, run_id: str, **future_kwargs: Any): + """Runs before graph execution -- sets the state so future ones can reference it. - :param span_dict: A key -> span dictionary - :return: The serialized representation. + :param run_id: ID of the run + :param future_kwargs: reserved for future keyword arguments/backwards compatibility. """ - # For some reason this doesn't use the right ser/deser for dask - # Or for some reason it has contexts instead of spans. Well, we can serialize them both! - return { - key: { - "trace_id": span.context.trace_id if isinstance(span, Span) else span.trace_id, - "span_id": span.context.span_id if isinstance(span, Span) else span.span_id, - } - for key, span in span_dict.items() - } + self._impl.run_before_graph_execution(run_id=run_id, **future_kwargs) - @staticmethod - def _deserialize_span_dict(serialized_repr: Dict[str, dict]) -> Dict[str, context.Context]: - """Note that we deserialize as contexts, as passing spans is not supported - (the child should never terminate the parent span). + def run_before_node_execution( + self, + *, + node_name: str, + node_kwargs: Dict[str, Any], + node_tags: Dict[str, Any], + task_id: Optional[str], + run_id: str, + **future_kwargs: Any, + ): + """Runs before a node's execution. Sets up/stores spans. - :param span_dict: Dict of str -> dict params for contexts - :return: A dictionary of contexts + :param node_name: Name of the node. + :param node_kwargs: Keyword arguments of the node. + :param node_tags: Tags of the node (they'll get stored as datadog tags) + :param task_id: Task ID that spawned the node + :param run_id: ID of the run. + :param future_kwargs: reserved for future keyword arguments/backwards compatibility. """ - return {key: context.Context(**params) for key, params in serialized_repr.items()} + self._impl.run_before_node_execution( + node_name=node_name, + node_kwargs=node_kwargs, + node_tags=node_tags, + task_id=task_id, + run_id=run_id, + **future_kwargs, + ) - def __getstate__(self) -> dict[str, Any]: - """Gets the state for serialization.""" - return dict( - root_trace_name=self.root_name, - service=self.service, - include_causal_links=self.include_causal_links, - run_span_cache=self._serialize_span_dict(self.run_span_cache), - task_span_cache={ - key: self._serialize_span_dict(value) for key, value in self.task_span_cache.items() - }, - # this is unnecessary, but leaving it here for now - # to remove it, we need to add a default check in the one that adds to the nodes - node_span_cache={ - key: self._serialize_span_dict(value) for key, value in self.task_span_cache.items() - }, # Nothing here, we can just wipe it for a new task + def run_after_node_execution( + self, + *, + node_name: str, + error: Optional[Exception], + task_id: Optional[str], + run_id: str, + **future_kwargs: Any, + ): + """Runs after a node's execution -- completes the span. + + :param node_name: Name of the node + :param error: Error that the node raised, if any + :param task_id: Task ID that spawned the node + :param run_id: ID of the run. + :param future_kwargs: reserved for future keyword arguments/backwards compatibility. + """ + self._impl.run_after_node_execution( + node_name=node_name, error=error, task_id=task_id, run_id=run_id, **future_kwargs ) - def __setstate__(self, state: dict) -> None: - """Sets the state for serialization.""" - self.service = state["service"] - self.root_name = state["root_trace_name"] - self.include_causal_links = state["include_causal_links"] - # TODO -- move this out/consider doing it to the others - self.run_span_cache = self._deserialize_span_dict(state["run_span_cache"]) - # We only really need this if we log the stuff before submitting... - # This shouldn't happen but it leaves flexibility for the future - self.task_span_cache = { - key: self._deserialize_span_dict(value) - for key, value in state["task_span_cache"].items() - } - self.node_span_cache = { - key: self._deserialize_span_dict(value) - for key, value in state["node_span_cache"].items() - } + def run_after_graph_execution( + self, *, error: Optional[Exception], run_id: str, **future_kwargs: Any + ): + """Runs after graph execution. Garbage collects + finishes the root span. - @staticmethod - def _sanitize_tags(tags: Dict[str, Any]) -> Dict[str, str]: - """Sanitizes tags to be strings, just in case. + :param error: Error the graph raised when running, if any + :param run_id: ID of the run + :param future_kwargs: reserved for future keyword arguments/backwards compatibility. + """ + self._impl.run_after_graph_execution(error=error, run_id=run_id, **future_kwargs) - :param tags: Node tags. - :return: The string -> string representation of tags + def run_before_task_execution(self, *, task_id: str, run_id: str, **future_kwargs): + """Runs before task execution. Sets up the task span. + + :param task_id: ID of the task + :param run_id: ID of the run, + :param future_kwargs: reserved for future keyword arguments/backwards compatibility. """ - return {f"hamilton.{key}": str(value) for key, value in tags.items()} + self._impl.run_before_task_execution(task_id=task_id, run_id=run_id, **future_kwargs) + + def run_after_task_execution( + self, + *, + task_id: str, + run_id: str, + error: Exception, + **future_kwargs, + ): + """Rusn after task execution. Finishes task-level spans. + + :param task_id: ID of the task, ID of the run. + :param run_id: ID of the run + :param error: Error the graph raised when running, if any + :param future_kwargs: Future keyword arguments for backwards compatibility + """ + self._impl.run_after_task_execution( + task_id=task_id, run_id=run_id, error=error, **future_kwargs + ) + + +class AsyncDDOGTracer( + base.BasePostGraphConstructAsync, + base.BasePreGraphExecuteAsync, + base.BasePreNodeExecuteAsync, + base.BasePostNodeExecuteAsync, + base.BasePostGraphExecuteAsync, +): + def __init__( + self, root_name: str, include_causal_links: bool = False, service: str | None = None + ): + """Creates a AsyncDDOGTracer, the asyncio-friendly version of DDOGTracer. + + This has the option to specify some parameters: + + :param root_name: Name of the root trace/span. Due to the way datadog inherits, this will inherit an active span. + :param include_causal_links: Whether or not to include span causal links. Note that there are some edge-cases here, and + This is in beta for datadog, and actually broken in the current client, but it has been fixed and will be released shortly: + https://github.com/DataDog/dd-trace-py/issues/8049. Furthermore, the query on datadog is slow for displaying causal links. + We've disabled this by default, but feel free to test it out -- its likely they'll be improving the docum + :param service: Service name -- will pick it up from the environment through DDOG if not available. + """ + self._impl = _DDOGTracerImpl( + root_name=root_name, include_causal_links=include_causal_links, service=service + ) async def post_graph_construct( self, graph: h_graph.FunctionGraph, modules: List[ModuleType], config: Dict[str, Any] ) -> None: + """Runs after graph construction. This is a no-op for this plugin. + + :param graph: Graph that has been constructed. + :param modules: Modules passed into the graph + :param config: Config passed into the graph + """ pass async def pre_graph_execute( @@ -387,50 +422,33 @@ async def pre_graph_execute( inputs: Dict[str, Any], overrides: Dict[str, Any], ) -> None: - current_context = ( - tracer.current_trace_context() - ) # Get the current ctx, if available, otherwise returns None - span = tracer.start_span( - name=self.root_name, child_of=current_context, activate=True, service=self.service - ) - self.run_span_cache[run_id] = span # we save this as a root span - self.node_span_cache[run_id] = {} - self.task_span_cache[run_id] = {} + """Runs before graph execution -- sets the state so future ones can reference it. + + :param run_id: ID of the run, unique in scope of the driver. + :param graph: Graph that is being executed + :param final_vars: Variables we are extracting from the graph + :param inputs: Inputs to the graph + :param overrides: Overrides to graph execution + """ + self._impl.run_before_graph_execution(run_id=run_id) async def pre_node_execute( self, run_id: str, node_: node.Node, kwargs: Dict[str, Any], task_id: Optional[str] = None ) -> None: """Runs before a node's execution. Sets up/stores spans. - :param node_name: Name of the node. - :param node_kwargs: Keyword arguments of the node. - :param node_tags: Tags of the node (they'll get stored as datadog tags) - :param task_id: Task ID that spawned the node - :param run_id: ID of the run. - :param future_kwargs: reserved for future keyword arguments/backwards compatibility. + :param run_id: ID of the run, unique in scope of the driver. + :param node_: Node that is being executed + :param kwargs: Keyword arguments that are being passed into the node + :param task_id: ID of the task, defaults to None if not in a task setting """ - # We need to do this on launching tasks and we have not yet exposed it. - # TODO -- do pre-task and post-task execution. - parent_span = self.task_span_cache[run_id].get(task_id) or self.run_span_cache[run_id] - new_span_name = f"{task_id}:" if task_id is not None else "" - new_span_name += node_.name - new_span = tracer.start_span( - name=new_span_name, child_of=parent_span, activate=True, service=self.service + self._impl.run_before_node_execution( + node_name=node_.name, + node_kwargs=kwargs, + node_tags=node_.tags, + task_id=task_id, + run_id=run_id, ) - if self.include_causal_links: - prior_spans = {key: self.node_span_cache[run_id].get((task_id, key)) for key in kwargs} - for input_node, span in prior_spans.items(): - if span is not None: - new_span.link_span( - context=span.context, - attributes={ - "link.name": f"{input_node}_to_{node_.name}", - }, - ) - tags = node_.tags.copy() - tags["hamilton.node_name"] = node_.name - new_span.set_tags(AsyncDDOGTracer._sanitize_tags(tags=tags)) # type: ignore[arg-type] - self.node_span_cache[run_id][(task_id, node_.name)] = new_span async def post_node_execute( self, @@ -442,15 +460,19 @@ async def post_node_execute( task_id: Optional[str] = None, **future_kwargs: dict, ) -> None: - span = self.node_span_cache[run_id][(task_id, node_.name)] - exc_type = None - exc_value = None - tb = None - if error is not None: - exc_type = type(error) - exc_value = error - tb = error.__traceback__ - span.__exit__(exc_type, exc_value, tb) + """Runs after a node's execution -- completes the span. + + :param run_id: ID of the run, unique in scope of the driver. + :param node_: Node that is being executed + :param kwargs: Keyword arguments that are being passed into the node + :param success: Whether or not the node executed successfully + :param error: The error that was raised, if any + :param result: The result of the node execution, if no error was raised + :param task_id: ID of the task, defaults to None if not in a task-based execution + """ + self._impl.run_after_node_execution( + node_name=node_.name, error=error, task_id=task_id, run_id=run_id + ) async def post_graph_execute( self, @@ -460,15 +482,12 @@ async def post_graph_execute( error: Optional[Exception], results: Optional[Dict[str, Any]], ) -> None: - span = self.run_span_cache[run_id] - exc_type = None - exc_value = None - tb = None - if error is not None: - exc_type = type(error) - exc_value = error - tb = error.__traceback__ - span.__exit__(exc_type, exc_value, tb) - del self.run_span_cache[run_id] - del self.node_span_cache[run_id] - del self.task_span_cache[run_id] + """Runs after graph execution. Garbage collects + finishes the root span. + + :param run_id: ID of the run, unique in scope of the driver. + :param graph: Graph that was executed + :param success: Whether or not the graph executed successfully + :param error: Error that was raised, if any + :param results: Results of the graph execution + """ + self._impl.run_after_graph_execution(error=error, run_id=run_id) From 580bb5688ac4f28ae29b2473f111e710b62a9f2a Mon Sep 17 00:00:00 2001 From: Ryan Whitten Date: Tue, 3 Dec 2024 09:53:17 -0500 Subject: [PATCH 3/4] Fix ref to _sanitize_tags method --- hamilton/plugins/h_ddog.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hamilton/plugins/h_ddog.py b/hamilton/plugins/h_ddog.py index 204122cbb..748b9cbbc 100644 --- a/hamilton/plugins/h_ddog.py +++ b/hamilton/plugins/h_ddog.py @@ -155,7 +155,7 @@ def run_before_node_execution( ) tags = node_tags.copy() tags["hamilton.node_name"] = node_name - new_span.set_tags(DDOGTracer._sanitize_tags(tags=tags)) + new_span.set_tags(self._sanitize_tags(tags=tags)) self.node_span_cache[run_id][(task_id, node_name)] = new_span def run_after_node_execution( From b1815e4bc7eb5e8a98010a52580f045a91f66f01 Mon Sep 17 00:00:00 2001 From: Ryan Whitten Date: Wed, 4 Dec 2024 10:20:45 -0500 Subject: [PATCH 4/4] Add DDOGTracer comments to explain impl class & assigning parent context --- hamilton/plugins/h_ddog.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/hamilton/plugins/h_ddog.py b/hamilton/plugins/h_ddog.py index 748b9cbbc..aacd1c1cf 100644 --- a/hamilton/plugins/h_ddog.py +++ b/hamilton/plugins/h_ddog.py @@ -20,6 +20,15 @@ class _DDOGTracerImpl: + """Implementation class for DDOGTracer and AsyncDDOGTracer functionality. + + This class encapsulates the core logic for Datadog tracing within Hamilton's lifecycle hooks. + It provides methods to handle the tracing operations required before and after the execution + of graphs, nodes, and tasks. The DDOGTracer and AsyncDDOGTracer classes are composed of this implementation class, + due to the differences in their base lifecycle classes (sync vs async). This class allows sharing logic between the + two without duplicating code or interfering with the base classes they inherit from. + """ + def __init__(self, root_name: str, include_causal_links: bool = False, service: str = None): self.root_name = root_name self.service = service @@ -106,6 +115,7 @@ def run_before_graph_execution(self, *, run_id: str, **future_kwargs: Any): :param run_id: ID of the run :param future_kwargs: reserved for future keyword arguments/backwards compatibility. """ + # This returns None if there's no active context and works as a no-op, otherwise we tie this span to a parent. current_context = tracer.current_trace_context() span = tracer.start_span( name=self.root_name, child_of=current_context, activate=True, service=self.service