Skip to content

Commit

Permalink
Driver-level materialiation #235
Browse files Browse the repository at this point in the history
See issue for more detailed notes. Overall design:

1. Add a .materialize(...) function
2. Materializers are dynamically registered with the same mechanism as
   data savers
3. This manipulates the DAG and calls the materialization node
4. The materialization node can also have a results builder associated
   with it

Left todo:
1. Documentation
2. More work on data savers/loaders
  • Loading branch information
elijahbenizzy committed Aug 6, 2023
1 parent 5236429 commit f342444
Show file tree
Hide file tree
Showing 17 changed files with 903 additions and 153 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ jobs:
- image: cimg/python:<< parameters.python-version >>
environment:
TASK: << parameters.task >>
CI: true
steps:
- checkout
- run:
Expand Down
29 changes: 29 additions & 0 deletions docs/concepts/driver-capabilities.rst
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,35 @@ There are two ways to use ``execute()``:
We recommend using option (1) where possible. Option (2) only makes sense if you want to reuse the dataflow created for
different data sets, or to chunk over large data or iterate over objects, e.g. images or text.
Materializing DAG Nodes
#######################
The driver comes with the ability to materialize DAG nodes -- this adds side-effects into the DAG to save the data it produces to various places.
These are fully customizable, and utilize the same set of constructs as :doc:`/reference/decorators/save_to/`.
It can be used to save data to a file, external data store, or a database -- its a flexible construct that comes with a few built-in options,
but is highly pluggable.
In the following case, we are joining `foo` and `bar` into a dataframe, then saving it to a CSV file:
.. code-block:: python
from hamilton import driver, base
from hamilton.io.materialize import to
dr = driver.Driver(my_module, {})
# foo, bar are pd.Series
metadata, result = dr.Materialize(
to.csv(
path="./output.csv"
id="foo_bar_csv",
dependencies=["foo", "bar"],
combine=base.PandasDataFrameResult()
),
additional_vars=["foo", "bar"]
)
For more information, see `materialize` in the :doc:`driver </reference/drivers/Driver>`.
Visualizing Execution
#####################
Expand Down
4 changes: 2 additions & 2 deletions docs/reference/drivers/Driver.rst
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ it is here purely for documentation, and you should never need to instantiate it
TaskBasedGraphExecutor
-----------------------

This is a task based graph executor. It can handle parallelism with `Parallelizable`/`Collect[]`,
This is a task based graph executor. It can handle parallelism with the `Parallelizable`/`Collect` constructs,
allowing it to spawn dynamic tasks and execute them as a group. Note that this is only
exposed through the `Builder` when called with `enable_dynamic_execution(allow_experimental_mode: bool) --
exposed through the `Builder` when called with `enable_dynamic_execution(allow_experimental_mode: bool)` --
it is here purely for documentation, and you should never need to instantiate it directly.

.. autoclass:: hamilton.driver.TaskBasedGraphExecutor
Expand Down
47 changes: 44 additions & 3 deletions hamilton/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import collections
import inspect
import logging
from typing import Any, Dict, List, Tuple, Type, Union
from typing import Any, Dict, List, Optional, Tuple, Type, Union

import numpy as np
import pandas as pd
Expand All @@ -24,10 +24,14 @@
class ResultMixin(object):
"""Abstract base class housing the static function.
Why a static function? That's because certain frameworks can only pickle a static function, not an entire
object.
Why a static function? That's because certain frameworks can only pickle a static function,
not an entire object. # TODO -- fix this so this can carry state/act as a standard object.
All result builders should inherit from this class and implement the build_result function.
Note that applicable_input_type and output_type are optional, but recommended, for backwards
compatibility. They let us type-check this. They will default to Any, which means that they'll
connect to anything.
"""

@staticmethod
Expand All @@ -36,6 +40,20 @@ def build_result(**outputs: Dict[str, Any]) -> Any:
"""This function builds the result given the computed values."""
pass

def input_types(self) -> List[Type[Type]]:
"""Gives the applicable types to this result builder.
This is optional for backwards compatibility, but is recommended.
:return: A list of types that this can apply to.
"""
return [Any]

def output_type(self) -> Type:
"""Returns the output type of this result builder
:return: the type that this creates
"""
return Any


class DictResult(ResultMixin):
"""Simple function that returns the dict of column -> value results.
Expand All @@ -62,6 +80,7 @@ class DictResult(ResultMixin):
DefaultAdapter
.. code-block:: python
adapter = base.DefaultAdapter()
"""

Expand All @@ -70,6 +89,12 @@ def build_result(**outputs: Dict[str, Any]) -> Dict:
"""This function builds a simple dict of output -> computed values."""
return outputs

def input_types(self) -> Optional[List[Type[Type]]]:
return [Any]

def output_type(self) -> Type:
return Dict[str, Any]


class PandasDataFrameResult(ResultMixin):
"""Mixin for building a pandas dataframe from the result.
Expand Down Expand Up @@ -275,8 +300,17 @@ def get_output_name(output_name: str, column_name: str) -> str:
f"being added; it may be coming from a dataframe that is being unpacked."
)
flattened_outputs[name] = output

return pd.DataFrame(flattened_outputs)

def input_types(self) -> List[Type[Type]]:
"""Currently this just shoves anything into a dataframe. We should probably
tighten this up."""
return [Any]

def output_type(self) -> Type:
return pd.DataFrame


class StrictIndexTypePandasDataFrameResult(PandasDataFrameResult):
"""A ResultBuilder that produces a dataframe only if the index types match exactly.
Expand Down Expand Up @@ -371,6 +405,13 @@ def build_result(**outputs: Dict[str, Any]) -> np.matrix:
# Create the matrix with columns as rows and then transpose
return np.asmatrix(list_of_columns).T

def input_types(self) -> List[Type[Type]]:
"""Currently returns anything as numpy types are relatively new and"""
return [Any] # Typing

def output_type(self) -> Type:
return pd.DataFrame


class HamiltonGraphAdapter(ResultMixin):
"""Any GraphAdapters should implement this interface to adapt the HamiltonGraph for that particular context.
Expand Down
Loading

0 comments on commit f342444

Please sign in to comment.