Skip to content
This repository has been archived by the owner on Jul 3, 2023. It is now read-only.

Commit

Permalink
Implements cleaner spec of reuse_functions, moves and renames
Browse files Browse the repository at this point in the history
This is a log nicer. See discussion here for full context:
#86.
  • Loading branch information
elijahbenizzy committed Feb 18, 2023
1 parent dcfb43c commit 2fed9f8
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 202 deletions.
5 changes: 2 additions & 3 deletions examples/reusing_functions/reusable_subdags.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import pandas as pd
import unique_users

from hamilton.experimental.decorators import reuse
from hamilton.experimental.decorators.reuse import reuse_functions
from hamilton.function_modifiers import value
from hamilton.function_modifiers import reuse, value
from hamilton.function_modifiers.reuse import reuse_functions


def website_interactions() -> pd.DataFrame:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import inspect
from types import ModuleType
from typing import Any, Callable, Collection, Dict, List, Tuple, Type, Union

Expand All @@ -8,15 +7,6 @@
from hamilton.function_modifiers import base, dependencies


class MultiOutput:
def __init__(self, **mapping: Type):
self._mapping = mapping

@property
def mapping(self) -> Dict[str, Type]:
return self._mapping


def assign_namespace(node_name: str, namespace: str) -> str:
return f"{namespace}.{node_name}"

Expand Down Expand Up @@ -92,14 +82,14 @@ def node_fn(_value=value):
return node.Node(name=name, typ=typ, callabl=node_fn, input_types={}, namespace=namespace)


class reuse_functions(base.NodeCreator):
class subdag(base.NodeCreator):
def __init__(
self,
with_inputs: Dict[str, dependencies.ParametrizedDependency],
namespace: str,
outputs: Dict[str, str],
with_config: Dict[str, Any],
load_from: Union[Collection[ModuleType], Collection[Callable]],
*load_from: Union[ModuleType, Callable],
inputs: Dict[
str, Union[dependencies.ParametrizedDependency, dependencies.LiteralDependency]
],
config: Dict[str, Any] = None,
):
"""Initializes a replay decorator. This decorator replays a subdag with a specified configuration.
Expand All @@ -110,11 +100,9 @@ def __init__(
:param outputs: A dictionary of original node name -> output node name that forms the output of this DAG.
:param with_config: A configuration dictionary for *just* this subDAG. Note that this passed in value takes precedence.
"""
self.subdag_functions = reuse_functions.collect_functions(load_from)
self.with_inputs = with_inputs
self.namespace = namespace
self.outputs = outputs
self.with_config = with_config
self.subdag_functions = subdag.collect_functions(load_from)
self.inputs = inputs if inputs is not None else {}
self.config = config if config is not None else {}

@staticmethod
def collect_functions(
Expand All @@ -126,9 +114,7 @@ def collect_functions(
:return: a list of callables to use to create a DAG.
"""
if len(load_from) == 0:
raise ValueError(
f"No functions were passed to {reuse_functions.__name__}(load_from=...)"
)
raise ValueError(f"No functions were passed to {subdag.__name__}(load_from=...)")
out = []
for item in load_from:
if isinstance(item, Callable):
Expand All @@ -139,29 +125,29 @@ def collect_functions(
return out

def _collect_nodes(self, original_config: Dict[str, Any]):
combined_config = dict(original_config, **self.with_config)
combined_config = dict(original_config, **self.config)
nodes = []
for fn in self.subdag_functions:
nodes.extend(base.resolve_nodes(fn, combined_config))
return nodes

def _create_additional_static_nodes(
self, nodes: Collection[node.Node]
self, nodes: Collection[node.Node], namespace: str
) -> Collection[node.Node]:
# These already have the namespace on them
# This allows us to inject values into the replayed subdag
node_types = extract_all_known_types(nodes)
out = []
for key, value in self.with_inputs.items():
for key, value in self.inputs.items():
# TODO -- fix type derivation. Currently we don't use the specified type as we don't really know what it should be...
new_node_name = assign_namespace(key, self.namespace)
new_node_name = assign_namespace(key, namespace)
if value.get_dependency_type() == dependencies.ParametrizedDependencySource.LITERAL:
out.append(
create_static_node(
typ=derive_type(value),
name=key,
value=value.value,
namespace=(self.namespace,),
namespace=(namespace,),
)
)
elif value.get_dependency_type() == dependencies.ParametrizedDependencySource.UPSTREAM:
Expand All @@ -170,12 +156,12 @@ def _create_additional_static_nodes(
from_=value.source,
typ=node_types[new_node_name],
name=key,
namespace=(self.namespace,),
namespace=(namespace,),
)
)
return out

def _add_namespace(self, nodes: List[node.Node]) -> List[node.Node]:
def _add_namespace(self, nodes: List[node.Node], namespace: str) -> List[node.Node]:
"""Utility function to add a namespace to nodes. Note that this is
:param nodes:
Expand All @@ -186,15 +172,15 @@ def _add_namespace(self, nodes: List[node.Node]) -> List[node.Node]:
new_name_map = {}
# First pass we validate + collect names so we can alter dependencies
for node_ in nodes:
new_name = assign_namespace(node_.name, self.namespace)
new_name = assign_namespace(node_.name, namespace)
new_name_map[node_.name] = new_name
current_node_namespaces = node_.namespace
if current_node_namespaces:
already_namespaced_nodes.append(node_)
for dep, value in self.with_inputs.items():
for dep, value in self.inputs.items():
# We create nodes for both namespace assignment and source assignment
# Why? Cause we need unique parameter names, and with source() some can share params
new_name_map[dep] = assign_namespace(dep, self.namespace)
new_name_map[dep] = assign_namespace(dep, namespace)
if already_namespaced_nodes:
raise ValueError(
f"The following nodes are already namespaced: {already_namespaced_nodes}. "
Expand Down Expand Up @@ -229,55 +215,70 @@ def fn(
)
return new_nodes

def _add_output_nodes(self, nodes: List[node.Node]) -> List[node.Node]:
nodes_by_name = {node_.name: node_ for node_ in nodes}
new_nodes = []
for from_node, to_node in self.outputs.items():
from_node_namespaced = assign_namespace(from_node, self.namespace)
new_nodes.append(
create_identity_node(
from_=assign_namespace(from_node, self.namespace),
name=to_node,
typ=nodes_by_name[from_node_namespaced].type,
namespace=(),
)
)
return nodes + new_nodes
# def _add_output_nodes(self, nodes: List[node.Node]) -> List[node.Node]:
# nodes_by_name = {node_.name: node_ for node_ in nodes}
# new_nodes = []
# for from_node, to_node in self.outputs.items():
# from_node_namespaced = assign_namespace(from_node, self.namespace)
# new_nodes.append(
# create_identity_node(
# from_=assign_namespace(from_node, self.namespace),
# name=to_node,
# typ=nodes_by_name[from_node_namespaced].type,
# namespace=(),
# )
# )
# return nodes + new_nodes

def add_final_node(self, fn: Callable, namespace: str):
"""
:param fn:
:return:
"""
node_ = node.Node.from_fn(fn)
namespaced_input_map = {assign_namespace(key, namespace): key for key in node_.input_types}
new_input_types = {
assign_namespace(key, namespace): value for key, value in node_.input_types.items()
}

def new_function(**kwargs):
kwargs_without_namespace = {
namespaced_input_map[key]: value for key, value in kwargs.items()
}
# Have to translate it back to use the kwargs the fn is expecting
return fn(**kwargs_without_namespace)

return node_.copy_with(input_types=new_input_types, callabl=new_function)

def _derive_namespace(self, fn: Callable) -> str:
"""Utility function to derive a namespace from a function.
The user will be able to likely pass this in as an override, but
we have not exposed it yet.
:param fn: Function we're decorating.
:return: The function we're outputting.
"""
return fn.__name__

# def _derive_outputs_from_function(self, fn: Callable, nodes_produced: nodes) -> :

def generate_nodes(self, fn: Callable, configuration: Dict[str, Any]) -> Collection[node.Node]:
# Resolve all nodes from passed in functions
nodes = self._collect_nodes(original_config=configuration)
# Derive the namespace under which all these nodes will live
namespace = self._derive_namespace(fn)
# Rename them all to have the right namespace
nodes = self._add_namespace(nodes)
# Add output nodes (these are identity nodes that assign the desired name
nodes = self._add_output_nodes(nodes)
# Create any static input nodes we need
nodes += self._create_additional_static_nodes(nodes)
nodes = self._add_namespace(nodes, namespace)
# Create any static input nodes we need to translate
nodes += self._create_additional_static_nodes(nodes, namespace)
# Add the final node that does the translation
nodes += [self.add_final_node(fn, namespace)]
return nodes

def _validate_function_output(self, fn: Callable):
"""Validates that the function outputs a MultiNodeOutput that contains all the nodes that are mapped in output.
:param fn: Function to inspect.
:raises InvalidDecoratorException: If the function does not supply the right outputs.
"""
return_type = inspect.signature(fn).return_annotation
if not isinstance(return_type, MultiOutput):
raise base.InvalidDecoratorException(
f"Output of function {fn.__name__} must be 'MultiOutput'. Instead got {return_type}"
)
output_mapping_nodes = set(self.outputs.values())
type_spec_keys = set(return_type.mapping.keys())
if len(output_mapping_nodes.symmetric_difference(type_spec_keys)) != 0:
raise base.InvalidDecoratorException(
"The mapping of outputs to types "
"must exactly match the assignment of outputs in a "
"subDAG to corresponding names in the overall DAG."
)

def _validate_parameterization(self):
invalid_values = []
for key, value in self.with_inputs.items():
for key, value in self.inputs.items():
if not isinstance(value, dependencies.ParametrizedDependency):
invalid_values.append(value)
if invalid_values:
Expand All @@ -293,5 +294,4 @@ def validate(self, fn):
:raises InvalidDecoratorException: if this is not a valid decorator
"""

self._validate_function_output(fn)
self._validate_parameterization()
12 changes: 12 additions & 0 deletions hamilton/storage/loader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import abc
from typing import Type


class DataLoader(abc.ABC):
def applies_to(self, type_: Type[Type]) -> bool:
"""This tells whether this data loader can load in the format required.
Say, for instance, you want to load into a pandas dataframe. This would
have to return True for pandas.DataFrame.
:param type_: Type to test against
:return: Whether or not this data loader can produce data in the desired format.
"""
Loading

0 comments on commit 2fed9f8

Please sign in to comment.