Skip to content

Commit

Permalink
Sets up is_expander and is_collect
Browse files Browse the repository at this point in the history
This preps us for handling failure through our GracefulErrorAdapter
better as we can react to the node type.
  • Loading branch information
elijahbenizzy committed Jul 9, 2024
1 parent 3763ae8 commit d9e8ef1
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 3 deletions.
2 changes: 1 addition & 1 deletion hamilton/execution/executors.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ def new_callable(*args, _callable=None, **kwargs):

def _modify_callable(node_source: node.NodeType, callabl: Callable):
"""This is a bit of a shortcut -- we modify the callable here as
we want to allow `Parallelizable[]` nodes to return a generaot
we want to allow `Parallelizable[]` nodes to return a generator
:param node_source:
:param callabl:
Expand Down
4 changes: 4 additions & 0 deletions hamilton/htypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ class Parallelizable(typing.Generator[U, None, None], ABC):
pass


def is_parallelizable_type(type_: Type) -> bool:
return issubclass(type_, Parallelizable)


class Collect(Generator[V, None, None], ABC):
pass

Expand Down
6 changes: 6 additions & 0 deletions hamilton/lifecycle/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,8 @@ def do_node_execute(
node_callable=node_.callable,
node_kwargs=kwargs,
task_id=task_id,
is_expand=node_.node_role == node.NodeType.EXPAND,
is_collect=node_.node_role == node.NodeType.COLLECT,
)

@abc.abstractmethod
Expand All @@ -531,6 +533,8 @@ def run_to_execute_node(
node_callable: Any,
node_kwargs: Dict[str, Any],
task_id: Optional[str],
is_expand: bool,
is_collect: bool,
**future_kwargs: Any,
) -> Any:
"""This method is responsible for executing the node and returning the result.
Expand All @@ -540,6 +544,8 @@ def run_to_execute_node(
:param node_callable: Callable of the node.
:param node_kwargs: Keyword arguments to pass to the node.
:param task_id: The ID of the task, none if not in a task-based environment
:param is_expand: Whether the node is parallelizable.
:param is_collect: Whether the node is a collect node.
:param future_kwargs: Additional keyword arguments -- this is kept for backwards compatibility
:return: The result of the node execution -- up to you to return this.
"""
Expand Down
13 changes: 12 additions & 1 deletion hamilton/lifecycle/default.py
Original file line number Diff line number Diff line change
Expand Up @@ -570,8 +570,19 @@ def never_reached(wont_proceed: int) -> int:
self.sentinel_value = sentinel_value

def run_to_execute_node(
self, *, node_callable: Any, node_kwargs: Dict[str, Any], **future_kwargs: Any
self,
*,
node_callable: Any,
node_kwargs: Dict[str, Any],
is_expand: bool,
is_collect: bool,
**future_kwargs: Any,
) -> Any:
# You can use the `is_expand` to see if the node is parallelizable
# You can use the `is_collect` to see if the node is a collect node
# TODO -- if it is parallelizable, run the generator in a special way (E.G. loop through the node callable
# and truncate it/provide sentinels for every failure)
# TODO -- decide what to do with collect
"""Executes a node. If the node fails, returns the sentinel value."""
for key, value in node_kwargs.items():
if value == self.sentinel_value: # == versus is
Expand Down
2 changes: 1 addition & 1 deletion hamilton/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = (1, 69, 0)
VERSION = (1, 70, "0rc0")

0 comments on commit d9e8ef1

Please sign in to comment.