From ec3185e98fc77f4b0c6aee85f47b5dfe543cceed Mon Sep 17 00:00:00 2001 From: Richard Pelgrim <68642378+rrpelgrim@users.noreply.github.com> Date: Fri, 26 Jan 2024 11:55:48 +0000 Subject: [PATCH 1/3] Create delta-lake-dask.md --- docs/integrations/delta-lake-dask.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) create mode 100644 docs/integrations/delta-lake-dask.md diff --git a/docs/integrations/delta-lake-dask.md b/docs/integrations/delta-lake-dask.md new file mode 100644 index 0000000000..e69de29bb2 From 65c27cba9b070283f75f06caddf97ec66ff7feb1 Mon Sep 17 00:00:00 2001 From: Richard Pelgrim <68642378+rrpelgrim@users.noreply.github.com> Date: Fri, 26 Jan 2024 11:57:01 +0000 Subject: [PATCH 2/3] Update delta-lake-dask.md --- docs/integrations/delta-lake-dask.md | 62 ++++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/docs/integrations/delta-lake-dask.md b/docs/integrations/delta-lake-dask.md index e69de29bb2..79be96568f 100644 --- a/docs/integrations/delta-lake-dask.md +++ b/docs/integrations/delta-lake-dask.md @@ -0,0 +1,62 @@ +# Using Delta Lake with Dask + +Delta Lake is a great storage format for Dask analyses. This page explains why and how to use Delta Lake with Dask. + +You will learn how to read Delta Lakes into Dask DataFrames, how to query Delta tables with Dask, and the unique advantages Delta Lake offers the Dask community. + +Here are some of the benefits that Delta Lake provides Dask users: +- better performance with file skipping +- enhanced file skipping via Z Ordering +- ACID transactions for reliable writes +- easy time-travel functionality + +> ❗️ dask-deltatable currently works with deltalake<=13.0. See https://github.com/dask-contrib/dask-deltatable/issues/65 + +## Install Dask-Deltatable + +To use Delta Lake with Dask, first install the library using + +``` +pip install dask-deltatable +``` + + +## Reading Delta Tables into a Dask DataFrame + +You can read data stored in a Delta Lake into a Dask DataFrame using `dask-deltatable.read_deltalake`. For example: + +``` +import dask_deltatable as ddt + +delta_path = "../../data/people_countries_delta_dask/" +ddf = ddt.read_deltalake(delta_path) +``` + +Dask is a library for efficient distributed computing and works with lazy evaluation. Function calls to `dask.dataframe` build a task graph in the background. To trigger computation, call `.compute()`: + +``` +ddf.compute() +``` + +You can read in specific versions of Delta tables by specifying a `version` number or a timestamp: + +``` +# with specific version +ddf = ddt.read_deltalake(delta_path, version=3) + +# with specific datetime +ddt.read_deltalake(delta_path, datetime="2018-12-19T16:39:57-08:00")``` + +`dask-deltatable` also supports reading from remote sources like S3 with: + +``` +ddt.read_deltalake("s3://bucket_name/delta_path", version=3) +``` + +> To read data from remote sources you'll need to make sure the credentials are properly configured in environment variables or config files. Refer to your cloud provider documentation to configure these. + +## What can I do with a Dask Deltatable? + +Reading a Delta Lake in with `dask-deltatable` returns a regular Dask DataFrame. You can perform [all the regular Dask operations](https://docs.dask.org/en/stable/dataframe.html) on this DataFrame. + + From b5b61612a8c2099489fe7c672e76c0467b89d913 Mon Sep 17 00:00:00 2001 From: Richard Pelgrim <68642378+rrpelgrim@users.noreply.github.com> Date: Fri, 26 Jan 2024 14:30:29 +0000 Subject: [PATCH 3/3] Update delta-lake-dask.md --- docs/integrations/delta-lake-dask.md | 125 ++++++++++++++++++++++++++- 1 file changed, 122 insertions(+), 3 deletions(-) diff --git a/docs/integrations/delta-lake-dask.md b/docs/integrations/delta-lake-dask.md index 79be96568f..d3f0ebaf18 100644 --- a/docs/integrations/delta-lake-dask.md +++ b/docs/integrations/delta-lake-dask.md @@ -23,21 +23,34 @@ pip install dask-deltatable ## Reading Delta Tables into a Dask DataFrame -You can read data stored in a Delta Lake into a Dask DataFrame using `dask-deltatable.read_deltalake`. For example: +You can read data stored in a Delta Lake into a Dask DataFrame using `dask-deltatable.read_deltalake`. + +Let's read in a toy dataset to see what we can do with Delta Lake and Dask. You can access the data stored as a Delta Lake [on Github](https://github.com/rrpelgrim/delta-examples/tree/master/data) ``` import dask_deltatable as ddt -delta_path = "../../data/people_countries_delta_dask/" +# read delta table into Dask DataFrame +delta_path = "path/to/data/people_countries_delta_dask" ddf = ddt.read_deltalake(delta_path) + ``` Dask is a library for efficient distributed computing and works with lazy evaluation. Function calls to `dask.dataframe` build a task graph in the background. To trigger computation, call `.compute()`: ``` -ddf.compute() +> ddf.compute() + +| | first_name | last_name | country | continent | +|---:|:-------------|:------------|:----------|:------------| +| 0 | Ernesto | Guevara | Argentina | NaN | +| 0 | Bruce | Lee | China | Asia | +| 1 | Jack | Ma | China | Asia | +| 0 | Wolfgang | Manche | Germany | NaN | +| 1 | Soraya | Jala | Germany | NaN | ``` + You can read in specific versions of Delta tables by specifying a `version` number or a timestamp: ``` @@ -59,4 +72,110 @@ ddt.read_deltalake("s3://bucket_name/delta_path", version=3) Reading a Delta Lake in with `dask-deltatable` returns a regular Dask DataFrame. You can perform [all the regular Dask operations](https://docs.dask.org/en/stable/dataframe.html) on this DataFrame. +Let's take a look at the first few rows: + +``` +> ddf.head(n=3) + +| | first_name | last_name | country | continent | +|---:|:-------------|:------------|:----------|------------:| +| 0 | Ernesto | Guevara | Argentina | nan | +``` + +`dask.dataframe.head()` shows you the first rows of the first partition in the dataframe. In this case, the first partition only has 1 row. + +This is because the Delta Lake has been partitioned by country: + +``` +> !ls ../../data/people_countries_delta_dask +_delta_log country=Argentina country=China country=Germany +``` + +`dask-deltatable` neatly reads in the partitioned Delta Lake into corresponding Dask DataFrame partitions: + +``` +> # see number of partitions +> ddf.npartitions +3 +``` + +You can inspect a single partition using `dask.dataframe.get_partition()`: + +``` +> ddf.get_partition(n=1).compute() + +| | first_name | last_name | country | continent | +|---:|:-------------|:------------|:----------|:------------| +| 0 | Bruce | Lee | China | Asia | +| 1 | Jack | Ma | China | Asia | +``` + +## Perform Dask Operations + +Let's perform some basic computations over the Delta Lake data that's now stored in our Dask DataFrame. + +Suppose you want to group the dataset by the `country` column: + +``` +> ddf.groupby(['country']).count().compute() + +| country | first_name | last_name | continent | +|:----------|-------------:|------------:|------------:| +| Argentina | 1 | 1 | 1 | +| China | 2 | 2 | 2 | +| Germany | 2 | 2 | 2 | +``` + +Dask executes this `groupby` operation in parallel across all available cores. + +## Map Functions over Partitions + +You can also use Dask's `map_partitions` method to map a custom Python function over all the partitions. + +Let's write a function that will replace the missing `continent` values with the right continent names. + +``` +# define custom python function + +# get na_string +df = ddf.get_partition(0).compute() +na_string = df.iloc[0].continent +na_string + +# define function +def replace_proper(partition, na_string): + if [partition.country == "Argentina"]: + partition.loc[partition.country=="Argentina"] = partition.loc[partition.country=="Argentina"].replace(na_string, "South America") + if [partition.country == "Germany"]: + partition.loc[partition.country=="Germany"] = partition.loc[partition.country=="Germany"].replace(na_string, "Europe") + else: + pass + return partition +``` + +Now map this over all partitions in the Dask DataFrame: + +``` +# define metadata and map function over partitions +> meta = dict(ddf.dtypes) +> ddf3 = ddf.map_partitions(replace_proper, na_string, meta=meta) +> ddf3.compute() + +| | first_name | last_name | country | continent | +|---:|:-------------|:------------|:----------|:--------------| +| 0 | Ernesto | Guevara | Argentina | South America | +| 0 | Bruce | Lee | China | Asia | +| 1 | Jack | Ma | China | Asia | +| 0 | Wolfgang | Manche | Germany | Europe | +| 1 | Soraya | Jala | Germany | Europe | +``` + +## Write to Delta Lake +After doing your data processing in Dask, you can write the data back out to Delta Lake using `to_deltalake`: + +``` +ddt.to_deltalake(ddf, "tmp/test_write") +``` +## Contribute to `dask-deltalake` +To contribute, go to the [`dask-deltalake` Github repository](https://github.com/rrpelgrim/dask-deltatable). \ No newline at end of file