Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Polars Lazyframe Support #775

Merged
merged 30 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 26 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
51b875a
temp updates
Mar 21, 2024
ec37b32
initial prototype migration to shared polars for lazyframe support
Mar 22, 2024
ef90eb3
clean up log lines
Mar 22, 2024
604d136
fix function refactor
Mar 22, 2024
ae70b41
Implement materializer so that it returns a dataframe after processing
Mar 22, 2024
760fe21
fix linting
Mar 22, 2024
ec8a11d
fix linting
Mar 22, 2024
bf34df2
fix linting
Mar 22, 2024
1eb3929
update linting
Mar 22, 2024
3eaa10a
update linting
Mar 22, 2024
a32c04a
update linting
Mar 22, 2024
5cb77d4
fix linting
Mar 22, 2024
3ff84b0
fix linting
Mar 22, 2024
cfbc088
update for example
Mar 25, 2024
7a9eb83
update PR prototype code
Mar 26, 2024
7513060
update tests
Mar 27, 2024
25662b2
update tests
Mar 27, 2024
0abb830
update tests
Mar 27, 2024
bed412a
finish tests for other parsers
Mar 27, 2024
f302b14
Merge branch 'main' of github.com:buggtb/hamilton
Mar 27, 2024
de9cd80
Merge branch 'DAGWorks-Inc:main' into main
buggtb Mar 27, 2024
5d48c6e
Add lazyframe implementation
buggtb Mar 27, 2024
3ccdde2
Extended applicable types for Polars writers
buggtb Mar 27, 2024
d65bace
Updated PolarsLazyFrameResult and data writers
buggtb Mar 27, 2024
8aa9695
Extended support for LazyFrame in Polars extensions
buggtb Mar 27, 2024
fd1c011
fix test
buggtb Mar 27, 2024
2e7ff6c
Added Polars LazyFrame example
buggtb Mar 28, 2024
557ec54
Updated data loading method in tests
buggtb Mar 28, 2024
4a8c27b
Merge branch 'DAGWorks-Inc:main' into main
buggtb Mar 28, 2024
2c32470
update to force new build cause why not
buggtb Mar 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hamilton/function_modifiers/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
"pandas",
"plotly",
"polars",
"polars_lazyframe",
"pyspark_pandas",
"spark",
"dask",
Expand Down
29 changes: 21 additions & 8 deletions hamilton/io/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,27 @@ def get_dataframe_metadata(df: pd.DataFrame) -> Dict[str, Any]:
- the column names
- the data types
"""
return {
DATAFRAME_METADATA: {
"rows": len(df),
"columns": len(df.columns),
"column_names": list(df.columns),
"datatypes": [str(t) for t in list(df.dtypes)], # for serialization purposes
}
}
metadata = {}
try:
metadata["rows"] = len(df)
except TypeError:
metadata["rows"] = None

try:
metadata["columns"] = len(df.columns)
except (AttributeError, TypeError):
metadata["columns"] = None

try:
metadata["column_names"] = list(df.columns)
except (AttributeError, TypeError):
metadata["column_names"] = None

try:
metadata["datatypes"] = [str(t) for t in list(df.dtypes)]
except (AttributeError, TypeError):
metadata["datatypes"] = None
return {DATAFRAME_METADATA: metadata}


def get_file_and_dataframe_metadata(path: str, df: pd.DataFrame) -> Dict[str, Any]:
Expand Down
2 changes: 2 additions & 0 deletions hamilton/plugins/h_polars.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ def build_result(
(value,) = outputs.values() # this works because it's length 1.
if isinstance(value, pl.DataFrame): # it's a dataframe
return value
if isinstance(value, pl.LazyFrame): # it's a lazyframe
return value.collect()
elif not isinstance(value, pl.Series): # it's a single scalar/object
key, value = outputs.popitem()
return pl.DataFrame({key: [value]})
Expand Down
46 changes: 46 additions & 0 deletions hamilton/plugins/h_polars_lazyframe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from typing import Any, Dict, Type, Union

import polars as pl

from hamilton import base


class PolarsLazyFrameResult(base.ResultMixin):
"""A ResultBuilder that produces a polars dataframe.
Use this when you want to create a polars dataframe from the outputs. Caveat: you need to ensure that the length
of the outputs is the same, otherwise you will get an error; mixed outputs aren't that well handled.
To use:
.. code-block:: python
from hamilton import base, driver
from hamilton.plugins import polars_extensions
polars_builder = polars_extensions.PolarsLazyFrameResult()
adapter = base.SimplePythonGraphAdapter(polars_builder)
dr = driver.Driver(config, *modules, adapter=adapter)
df = dr.execute([...], inputs=...) # returns polars dataframe
Note: this is just a first attempt at something for Polars. Think it should handle more? Come chat/open a PR!
"""

def build_result(
self, **outputs: Dict[str, Union[pl.Series, pl.LazyFrame, Any]]
) -> pl.LazyFrame:
"""This is the method that Hamilton will call to build the final result. It will pass in the results
of the requested outputs that you passed in to the execute() method.
Note: this function could do smarter things; looking for contributions here!
:param outputs: The results of the requested outputs.
:return: a polars DataFrame.
"""
if len(outputs) == 1:
(value,) = outputs.values() # this works because it's length 1.
if isinstance(value, pl.LazyFrame): # it's a lazyframe
return value
return pl.LazyFrame(outputs)

def output_type(self) -> Type:
return pl.LazyFrame
56 changes: 35 additions & 21 deletions hamilton/plugins/polars_extensions.py
buggtb marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,6 @@ def _get_loading_kwargs(self):
kwargs["row_count_name"] = self.row_count_name
if self.row_count_offset is not None:
kwargs["row_count_offset"] = self.row_count_offset
if self.sample_size is not None:
kwargs["sample_size"] = self.sample_size
if self.eol_char is not None:
kwargs["eol_char"] = self.eol_char
if self.raise_if_empty is not None:
Expand All @@ -176,6 +174,7 @@ def _get_loading_kwargs(self):

def load_data(self, type_: Type) -> Tuple[DATAFRAME_TYPE, Dict[str, Any]]:
df = pl.read_csv(self.file, **self._get_loading_kwargs())

metadata = utils.get_file_and_dataframe_metadata(self.file, df)
return df, metadata

Expand Down Expand Up @@ -206,7 +205,7 @@ class PolarsCSVWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]
buggtb marked this conversation as resolved.
Show resolved Hide resolved

def _get_saving_kwargs(self):
kwargs = {}
Expand Down Expand Up @@ -236,15 +235,12 @@ def _get_saving_kwargs(self):
kwargs["quote_style"] = self.quote_style
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()
data.write_csv(self.file, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.file, data)

def load_data(self, type_: Type) -> Tuple[DATAFRAME_TYPE, Dict[str, Any]]:
df = pl.read_csv(self.file, **self._get_loading_kwargs())
metadata = utils.get_file_and_dataframe_metadata(self.file, df)
return df, metadata

@classmethod
def name(cls) -> str:
return "csv"
Expand Down Expand Up @@ -330,7 +326,7 @@ class PolarsParquetWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand All @@ -348,8 +344,12 @@ def _get_saving_kwargs(self):
kwargs["pyarrow_options"] = self.pyarrow_options
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_parquet(self.file, **self._get_saving_kwargs())

return utils.get_file_and_dataframe_metadata(self.file, data)

@classmethod
Expand Down Expand Up @@ -422,15 +422,17 @@ class PolarsFeatherWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
if self.compression is not None:
kwargs["compression"] = self.compression
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()
data.write_ipc(self.file, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.file, data)

Expand Down Expand Up @@ -484,15 +486,18 @@ class PolarsAvroWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
if self.compression is not None:
kwargs["compression"] = self.compression
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_avro(self.file, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.file, data)

Expand Down Expand Up @@ -547,7 +552,7 @@ class PolarsJSONWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand All @@ -557,7 +562,10 @@ def _get_saving_kwargs(self):
kwargs["row_oriented"] = self.row_oriented
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_json(self.file, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.file, data)

Expand Down Expand Up @@ -665,7 +673,7 @@ class PolarsSpreadsheetWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand Down Expand Up @@ -713,7 +721,10 @@ def _get_saving_kwargs(self):
kwargs["freeze_panes"] = self.freeze_panes
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_excel(self.workbook, self.worksheet, **self._get_saving_kwargs())
return utils.get_file_and_dataframe_metadata(self.workbook, data)

Expand Down Expand Up @@ -782,7 +793,7 @@ class PolarsDatabaseWriter(DataSaver):

@classmethod
def applicable_types(cls) -> Collection[Type]:
return [DATAFRAME_TYPE]
return [DATAFRAME_TYPE, pl.LazyFrame]

def _get_saving_kwargs(self):
kwargs = {}
Expand All @@ -792,7 +803,10 @@ def _get_saving_kwargs(self):
kwargs["engine"] = self.engine
return kwargs

def save_data(self, data: DATAFRAME_TYPE) -> Dict[str, Any]:
def save_data(self, data: Union[DATAFRAME_TYPE, pl.LazyFrame]) -> Dict[str, Any]:
if isinstance(data, pl.LazyFrame):
data = data.collect()

data.write_database(
table_name=self.table_name,
connection=self.connection,
Expand Down
Loading