Skip to content

Commit

Permalink
Driver-level materialiation #235
Browse files Browse the repository at this point in the history
See issue for more detailed notes. Overall design:

1. Add a .materialize(...) function
2. Materializers are dynamically registered with the same mechanism as
   data savers
3. This manipulates the DAG and calls the materialization node
4. The materialization node can also have a results builder associated
   with it

Left todo:
1. Documentation
2. More work on data savers/loaders
  • Loading branch information
elijahbenizzy committed Aug 5, 2023
1 parent 5236429 commit d370f28
Show file tree
Hide file tree
Showing 17 changed files with 890 additions and 132 deletions.
1 change: 1 addition & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ jobs:
- image: cimg/python:<< parameters.python-version >>
environment:
TASK: << parameters.task >>
CI: true
steps:
- checkout
- run:
Expand Down
29 changes: 29 additions & 0 deletions docs/concepts/driver-capabilities.rst
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,35 @@ There are two ways to use ``execute()``:
We recommend using option (1) where possible. Option (2) only makes sense if you want to reuse the dataflow created for
different data sets, or to chunk over large data or iterate over objects, e.g. images or text.
Materializing DAG Nodes
#######################
The driver comes with the ability to materialize DAG nodes -- this adds side-effects into the DAG to save the data it produces to various places.
These are fully customizable, and utilize the same set of constructs as :doc:`reference/decorators/save_to/`.
It can be used to save data to a file, external data store, or a database -- its a flexible construct that comes with a few built-in options,
but is highly pluggable.
In the following case, we are joining `foo` and `bar` into a dataframe, then saving it to a CSV file:
.. code-block:: python
from hamilton import driver, base
from hamilton.io.materialize import to
dr = driver.Driver(my_module, {})
# foo, bar are pd.Series
metadata, result = dr.Materialize(
to.csv(
path="./output.csv"
id="foo_bar_csv",
dependencies=["foo", "bar"],
combine=base.PandasDataFrameResult()
),
additional_vars=["foo", "bar"]
)
For more information, see :doc:`/reference/drivers/Driver/#hamilton.driver.Driver.materialize`.
Visualizing Execution
#####################
Expand Down
6 changes: 3 additions & 3 deletions examples/data_quality/pandera/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ should happily run on top of Dask, and Ray.
* feature_logic_spark.py - this module contains some feature transformation code specific to running on top of Pandas on Spark.
Specifically, note that the data types checked against are different than in `feature_logic.py`.
* run.py - this is the default Hamilton way of materializing features.
* run_dask.py - this shows how one would materialize features using Dask.
* run_ray.py - this shows how one would materialize features using Ray.
* run_spark.py - this shows how one would materialize features using Pandas on Spark.
* run_dask.py - this shows how one would Materialize features using Dask.
* run_ray.py - this shows how one would Materialize features using Ray.
* run_spark.py - this shows how one would Materialize features using Pandas on Spark.

Each file should have some documentation at the top to help identify what it is and how you can use it.

Expand Down
46 changes: 43 additions & 3 deletions hamilton/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import collections
import inspect
import logging
from typing import Any, Dict, List, Tuple, Type, Union
from typing import Any, Dict, List, Optional, Tuple, Type, Union

import numpy as np
import pandas as pd
Expand All @@ -24,10 +24,14 @@
class ResultMixin(object):
"""Abstract base class housing the static function.
Why a static function? That's because certain frameworks can only pickle a static function, not an entire
object.
Why a static function? That's because certain frameworks can only pickle a static function,
not an entire object. # TODO -- fix this so this can carry state/act as a standard object.
All result builders should inherit from this class and implement the build_result function.
Note that applicable_input_type and output_type are optional, but recommended, for backwards
compatibility. They let us type-check this. They will default to Any, which means that they'll
connect to anything.
"""

@staticmethod
Expand All @@ -36,6 +40,20 @@ def build_result(**outputs: Dict[str, Any]) -> Any:
"""This function builds the result given the computed values."""
pass

def inputs_types(self) -> Optional[List[Type[Type]]]:
"""Gives the applicable types to this result builder.
This is optional for backwards compatibility, but is recommended.
:return: A list of types that this can apply to.
"""
return [Any]

def output_type(self) -> Type:
"""Returns the output type of this result builder
:return: the type that this creates
"""
return Any


class DictResult(ResultMixin):
"""Simple function that returns the dict of column -> value results.
Expand Down Expand Up @@ -70,6 +88,12 @@ def build_result(**outputs: Dict[str, Any]) -> Dict:
"""This function builds a simple dict of output -> computed values."""
return outputs

def inputs_types(self) -> Optional[List[Type[Type]]]:
return [Any]

def output_type(self) -> Type:
return Dict[str, Any]


class PandasDataFrameResult(ResultMixin):
"""Mixin for building a pandas dataframe from the result.
Expand Down Expand Up @@ -275,8 +299,17 @@ def get_output_name(output_name: str, column_name: str) -> str:
f"being added; it may be coming from a dataframe that is being unpacked."
)
flattened_outputs[name] = output

return pd.DataFrame(flattened_outputs)

def inputs_types(self) -> Optional[List[Type[Type]]]:
"""Currently this just shoves anything into a dataframe. We should probably
tighten this up."""
return [Any]

def output_type(self) -> Type:
return pd.DataFrame


class StrictIndexTypePandasDataFrameResult(PandasDataFrameResult):
"""A ResultBuilder that produces a dataframe only if the index types match exactly.
Expand Down Expand Up @@ -371,6 +404,13 @@ def build_result(**outputs: Dict[str, Any]) -> np.matrix:
# Create the matrix with columns as rows and then transpose
return np.asmatrix(list_of_columns).T

def inputs_types(self) -> List[Type[Type]]:
"""Currently returns anything as numpy types are relatively new and"""
return [Any] # Typing

def output_type(self) -> Type:
return pd.DataFrame


class HamiltonGraphAdapter(ResultMixin):
"""Any GraphAdapters should implement this interface to adapt the HamiltonGraph for that particular context.
Expand Down
Loading

0 comments on commit d370f28

Please sign in to comment.