From 26bc1cc9958157e17f88d1e314c9c3a7ffa7a294 Mon Sep 17 00:00:00 2001 From: zilto Date: Wed, 17 Apr 2024 16:24:00 -0400 Subject: [PATCH] added pyarrow resultbuilder; updated dlt example --- examples/dlt/dlt_plugin.ipynb | 200 ++++++++++++++++++++++++++------ examples/dlt/requirements.txt | 1 + hamilton/plugins/h_pyarrow.py | 39 +++++++ tests/plugins/test_h_pyarrow.py | 25 ++++ 4 files changed, 232 insertions(+), 33 deletions(-) create mode 100644 hamilton/plugins/h_pyarrow.py create mode 100644 tests/plugins/test_h_pyarrow.py diff --git a/examples/dlt/dlt_plugin.ipynb b/examples/dlt/dlt_plugin.ipynb index 032bff17c..2cc75a0cc 100644 --- a/examples/dlt/dlt_plugin.ipynb +++ b/examples/dlt/dlt_plugin.ipynb @@ -41,15 +41,15 @@ "\n", "\n", - "\n", - "\n", + "\n", + "\n", "%3\n", - "\n", + "\n", "\n", "cluster__legend\n", - "\n", - "Legend\n", + "\n", + "Legend\n", "\n", "\n", "\n", @@ -58,43 +58,50 @@ "table\n", "DataFrame\n", "\n", - "\n", + "\n", "\n", + "polars_table\n", + "\n", + "polars_table\n", + "DataFrame\n", + "\n", + "\n", + "\n", "print_df_head\n", - "\n", - "print_df_head\n", - "DataFrame\n", + "\n", + "print_df_head\n", + "DataFrame\n", "\n", "\n", - "\n", + "\n", "_print_df_head_inputs\n", - "\n", - "external\n", - "DataFrame\n", + "\n", + "external\n", + "DataFrame\n", "\n", "\n", "\n", "_print_df_head_inputs->print_df_head\n", - "\n", - "\n", + "\n", + "\n", "\n", "\n", - "\n", + "\n", "input\n", - "\n", - "input\n", + "\n", + "input\n", "\n", "\n", - "\n", + "\n", "function\n", - "\n", - "function\n", + "\n", + "function\n", "\n", "\n", "\n" ], "text/plain": [ - "" + "" ] }, "execution_count": 2, @@ -105,10 +112,14 @@ "source": [ "%%cell_to_module -m my_module -d\n", "import pandas as pd\n", + "import polars as pl\n", "\n", "def table() -> pd.DataFrame:\n", " return pd.DataFrame([{\"C\": 1}, {\"C\": 2}])\n", "\n", + "def polars_table() -> pl.DataFrame:\n", + " return pl.DataFrame([{\"C\": 1}, {\"C\": 2}])\n", + "\n", "def print_df_head(external: pd.DataFrame) -> pd.DataFrame:\n", " print(\"from print_df_head:\\n\", external.head())" ] @@ -161,7 +172,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "{'dlt_metadata': {'pipeline': {'pipeline_name': 'saver_pipe'}, 'metrics': [{'started_at': DateTime(2024, 4, 15, 20, 18, 40, 423767, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 4, 15, 20, 18, 40, 587988, tzinfo=Timezone('UTC')), 'load_id': '1713212320.0496533'}], 'destination_type': 'dlt.destinations.duckdb', 'destination_displayable_credentials': 'duckdb:////home/tjean/projects/dagworks/hamilton/examples/dlt/saver_pipe.duckdb', 'destination_name': 'duckdb', 'environment': None, 'staging_type': None, 'staging_name': None, 'staging_displayable_credentials': None, 'destination_fingerprint': '', 'dataset_name': 'saver_pipe_dataset', 'loads_ids': ['1713212320.0496533'], 'load_packages': [{'load_id': '1713212320.0496533', 'package_path': '/home/tjean/.dlt/pipelines/saver_pipe/load/loaded/1713212320.0496533', 'state': 'loaded', 'completed_at': DateTime(2024, 4, 15, 20, 18, 40, 570607, tzinfo=Timezone('UTC')), 'jobs': [{'state': 'completed_jobs', 'file_path': '/home/tjean/.dlt/pipelines/saver_pipe/load/loaded/1713212320.0496533/completed_jobs/my_table.7352fcd48a.0.parquet', 'file_size': 574, 'created_at': DateTime(2024, 4, 15, 20, 18, 40, 60607, tzinfo=Timezone('UTC')), 'elapsed': 0.5100002288818359, 'failed_message': None, 'table_name': 'my_table', 'file_id': '7352fcd48a', 'retry_count': 0, 'file_format': 'parquet'}], 'schema_hash': 'UE8l1iVz3xnHM+zYpjm8Bqd+3m6rDG++zNubWIUyecg=', 'schema_name': 'saver_pipe', 'tables': []}], 'first_run': False, 'started_at': DateTime(2024, 4, 15, 20, 18, 40, 423767, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 4, 15, 20, 18, 40, 587988, tzinfo=Timezone('UTC'))}}\n" + "{'dlt_metadata': {'pipeline': {'pipeline_name': 'saver_pipe'}, 'metrics': [{'started_at': DateTime(2024, 4, 17, 20, 22, 7, 283298, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 4, 17, 20, 22, 7, 453053, tzinfo=Timezone('UTC')), 'load_id': '1713385326.9071813'}], 'destination_type': 'dlt.destinations.duckdb', 'destination_displayable_credentials': 'duckdb:////home/tjean/projects/dagworks/hamilton/examples/dlt/saver_pipe.duckdb', 'destination_name': 'duckdb', 'environment': None, 'staging_type': None, 'staging_name': None, 'staging_displayable_credentials': None, 'destination_fingerprint': '', 'dataset_name': 'saver_pipe_dataset', 'loads_ids': ['1713385326.9071813'], 'load_packages': [{'load_id': '1713385326.9071813', 'package_path': '/home/tjean/.dlt/pipelines/saver_pipe/load/loaded/1713385326.9071813', 'state': 'loaded', 'completed_at': DateTime(2024, 4, 17, 20, 22, 7, 435481, tzinfo=Timezone('UTC')), 'jobs': [{'state': 'completed_jobs', 'file_path': '/home/tjean/.dlt/pipelines/saver_pipe/load/loaded/1713385326.9071813/completed_jobs/my_table.777bd2e418.0.parquet', 'file_size': 574, 'created_at': DateTime(2024, 4, 17, 20, 22, 6, 915481, tzinfo=Timezone('UTC')), 'elapsed': 0.5199999809265137, 'failed_message': None, 'table_name': 'my_table', 'file_id': '777bd2e418', 'retry_count': 0, 'file_format': 'parquet'}], 'schema_hash': 'UE8l1iVz3xnHM+zYpjm8Bqd+3m6rDG++zNubWIUyecg=', 'schema_name': 'saver_pipe', 'tables': []}], 'first_run': False, 'started_at': DateTime(2024, 4, 17, 20, 22, 7, 283298, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 4, 17, 20, 22, 7, 453053, tzinfo=Timezone('UTC'))}}\n" ] }, { @@ -227,7 +238,7 @@ "\n" ], "text/plain": [ - "" + "" ] }, "execution_count": 5, @@ -249,6 +260,129 @@ "dr.visualize_materialization(*materializers)" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Alternative dataframe libraries\n", + "By default, dlt only supports Python `Iterable` of records (e.g., JSON objects), pandas (`pd.DataFrame`) and pyarrow (`pyarrow.Table`, `pyarrow.BatchedRecords`). To save a polars, dask, vaex, velox, or duckdb object, you would need to convert it to a supported type first.\n", + "\n", + "Hamilton provides adapter to make the process easy! Simply add the adapter to the `combine=` keyword of the data saver." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'dlt_metadata': {'pipeline': {'pipeline_name': 'saver_pipe'}, 'metrics': [{'started_at': DateTime(2024, 4, 17, 20, 22, 24, 280884, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 4, 17, 20, 22, 24, 447750, tzinfo=Timezone('UTC')), 'load_id': '1713385343.9070144'}], 'destination_type': 'dlt.destinations.duckdb', 'destination_displayable_credentials': 'duckdb:////home/tjean/projects/dagworks/hamilton/examples/dlt/saver_pipe.duckdb', 'destination_name': 'duckdb', 'environment': None, 'staging_type': None, 'staging_name': None, 'staging_displayable_credentials': None, 'destination_fingerprint': '', 'dataset_name': 'saver_pipe_dataset', 'loads_ids': ['1713385343.9070144'], 'load_packages': [{'load_id': '1713385343.9070144', 'package_path': '/home/tjean/.dlt/pipelines/saver_pipe/load/loaded/1713385343.9070144', 'state': 'loaded', 'completed_at': DateTime(2024, 4, 17, 20, 22, 24, 425481, tzinfo=Timezone('UTC')), 'jobs': [{'state': 'completed_jobs', 'file_path': '/home/tjean/.dlt/pipelines/saver_pipe/load/loaded/1713385343.9070144/completed_jobs/my_polars_table.a4e2d05d46.0.parquet', 'file_size': 574, 'created_at': DateTime(2024, 4, 17, 20, 22, 23, 915481, tzinfo=Timezone('UTC')), 'elapsed': 0.5099999904632568, 'failed_message': None, 'table_name': 'my_polars_table', 'file_id': 'a4e2d05d46', 'retry_count': 0, 'file_format': 'parquet'}], 'schema_hash': '4ezuw/Ke94mRLdyi/MbomA4EPL+AciFUjmfshpA07dU=', 'schema_name': 'saver_pipe', 'tables': []}], 'first_run': False, 'started_at': DateTime(2024, 4, 17, 20, 22, 24, 280884, tzinfo=Timezone('UTC')), 'finished_at': DateTime(2024, 4, 17, 20, 22, 24, 447750, tzinfo=Timezone('UTC'))}}\n" + ] + }, + { + "data": { + "image/svg+xml": [ + "\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "%3\n", + "\n", + "\n", + "cluster__legend\n", + "\n", + "Legend\n", + "\n", + "\n", + "\n", + "polars_saver_node_build_result\n", + "\n", + "polars_saver_node_build_result\n", + "Table\n", + "\n", + "\n", + "\n", + "polars_saver_node\n", + "\n", + "\n", + "polars_saver_node\n", + "DltDestinationSaver\n", + "\n", + "\n", + "\n", + "polars_saver_node_build_result->polars_saver_node\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "polars_table\n", + "\n", + "polars_table\n", + "DataFrame\n", + "\n", + "\n", + "\n", + "polars_table->polars_saver_node_build_result\n", + "\n", + "\n", + "\n", + "\n", + "\n", + "function\n", + "\n", + "function\n", + "\n", + "\n", + "\n", + "output\n", + "\n", + "output\n", + "\n", + "\n", + "\n", + "materializer\n", + "\n", + "\n", + "materializer\n", + "\n", + "\n", + "\n" + ], + "text/plain": [ + "" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "from hamilton.plugins import h_pyarrow\n", + "\n", + "materializers = [\n", + " to.dlt(\n", + " id=\"polars_saver_node\",\n", + " dependencies=[\"polars_table\"],\n", + " combine=h_pyarrow.PyarrowTableResult(),\n", + " table_name=\"my_polars_table\",\n", + " pipeline=saver_pipeline,\n", + " )\n", + "]\n", + "results, _ = dr.materialize(*materializers)\n", + "print(results[\"polars_saver_node\"])\n", + "dr.visualize_materialization(*materializers)" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -261,7 +395,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 8, "metadata": {}, "outputs": [], "source": [ @@ -289,7 +423,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 9, "metadata": {}, "outputs": [ { @@ -297,12 +431,12 @@ "output_type": "stream", "text": [ "from print_df_head:\n", - " col _dlt_load_id _dlt_id\n", - "0 1 1713212320.641402 vG/cmM5Ty/F/WQ\n", - "1 2 1713212320.641402 xZ/tUsoBiWTneQ\n", - "2 3 1713212320.641402 d+8Ah9hx2214Vw\n", - "3 1 1713212320.641402 XxYKANM6PlMl2A\n", - "4 2 1713212320.641402 jnnYhXp5KA2NsQ\n" + " col _dlt_load_id _dlt_id\n", + "0 1 1713385353.1057432 nV52FbDDaG8Hng\n", + "1 2 1713385353.1057432 1PnuRBfd/pFmbg\n", + "2 3 1713385353.1057432 E29IvCLX2o0hBw\n", + "3 1 1713385353.1057432 PHnW5pOvp3WRmA\n", + "4 2 1713385353.1057432 oRTCJeKpMP2OCQ\n" ] } ], diff --git a/examples/dlt/requirements.txt b/examples/dlt/requirements.txt index 2662e32d1..ea9a32ac5 100644 --- a/examples/dlt/requirements.txt +++ b/examples/dlt/requirements.txt @@ -2,5 +2,6 @@ dlt[duckdb]>=0.3.12 ibis-framework[duckdb] openai pandas +polars sf-hamilton[visualization] streamlit diff --git a/hamilton/plugins/h_pyarrow.py b/hamilton/plugins/h_pyarrow.py new file mode 100644 index 000000000..2600634b2 --- /dev/null +++ b/hamilton/plugins/h_pyarrow.py @@ -0,0 +1,39 @@ +from typing import Any, Type + +import pyarrow +from pyarrow.interchange import from_dataframe + +from hamilton.lifecycle.api import ResultBuilder + + +class PyarrowTableResult(ResultBuilder): + """Add this result builder to a materializer's `combine` statement to convert your dataframe + object to a pyarrow representation and make it compatible with pyarrow DataSavers. + + It implicitly support input_type == Any, but it expects dataframe objects implementing + the dataframe interchange protocol: ref: https://arrow.apache.org/docs/python/interchange_protocol.html + for example: + - pandas + - polars + - dask + - vaex + - ibis + - duckdb results + """ + + def output_type(self) -> Type: + return pyarrow.Table + + def build_result(self, **outputs: Any) -> Any: + """This function converts objects implementing the `__dataframe__` protocol to + a pyarrow table. It doesn't support receiving multiple outputs because it can't + handle any joining logic. + + ref: https://arrow.apache.org/docs/python/interchange_protocol.html + """ + if len(outputs) != 1: + raise AssertionError( + "PyarrowTableResult can only receive 1 output, i.e., only one item in `to.SAVER(dependencies=[])`" + f"It received {len(outputs)} outputs." + ) + return from_dataframe(next(iter(outputs.values()))) diff --git a/tests/plugins/test_h_pyarrow.py b/tests/plugins/test_h_pyarrow.py new file mode 100644 index 000000000..8b32a551a --- /dev/null +++ b/tests/plugins/test_h_pyarrow.py @@ -0,0 +1,25 @@ +import pandas as pd +import pyarrow +import pytest + +from hamilton.plugins import h_pyarrow + + +@pytest.fixture() +def pandas(): + return pd.DataFrame({"a": [0, 1, 2], "b": ["a", "b", "c"]}) + + +def test_pandas_to_pyarrow(pandas): + result_builder = h_pyarrow.PyarrowTableResult() + data = {"df": pandas} + # ResultBuilder receive unpacked dict as arg, i.e., kwargs only + table = result_builder.build_result(**data) + assert isinstance(table, pyarrow.Table) + + +def test_fail_for_multiple_outputs(pandas): + result_builder = h_pyarrow.PyarrowTableResult() + data = {"df": pandas, "df2": pandas} + with pytest.raises(AssertionError): + result_builder.build_result(**data)