From 1b89be792d7c764dbc4d99a2146d232be35936f3 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris Date: Tue, 19 Sep 2023 19:51:44 +0200 Subject: [PATCH 01/10] Expose min_commit_interval to python api --- python/deltalake/table.py | 9 ++++++++- python/src/lib.rs | 14 ++++++++++++-- python/tests/test_optimize.py | 18 ++++++++++++++++++ 3 files changed, 38 insertions(+), 3 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index add8342555..d947043702 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -688,6 +688,7 @@ def compact( partition_filters: Optional[FilterType] = None, target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, + min_commit_interval: Optional[int] = None, ) -> Dict[str, Any]: """ Compacts small files to reduce the total number of files in the table. @@ -705,10 +706,12 @@ def compact( :param max_concurrent_tasks: the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. + :param min_commit_interval: minimum interval in seconds before a new commit is created used for + long running executions. :return: the metrics from optimize """ metrics = self.table._table.compact_optimize( - partition_filters, target_size, max_concurrent_tasks + partition_filters, target_size, max_concurrent_tasks, min_commit_interval ) self.table.update_incremental() return json.loads(metrics) @@ -720,6 +723,7 @@ def z_order( target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, max_spill_size: int = 20 * 1024 * 1024 * 1024, + min_commit_interval: Optional[int] = None, ) -> Dict[str, Any]: """ Reorders the data using a Z-order curve to improve data skipping. @@ -735,6 +739,8 @@ def z_order( file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. :param max_spill_size: the maximum number of bytes to spill to disk. Defaults to 20GB. + :param min_commit_interval: minimum interval in seconds before a new commit is created used for + long running executions. :return: the metrics from optimize """ metrics = self.table._table.z_order_optimize( @@ -743,6 +749,7 @@ def z_order( target_size, max_concurrent_tasks, max_spill_size, + min_commit_interval, ) self.table.update_incremental() return json.loads(metrics) diff --git a/python/src/lib.rs b/python/src/lib.rs index eed4558342..3c9706b4a9 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -9,6 +9,7 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::future::IntoFuture; use std::sync::Arc; +use std::time; use std::time::{SystemTime, UNIX_EPOCH}; use arrow::pyarrow::PyArrowType; @@ -271,18 +272,22 @@ impl RawDeltaTable { } /// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing. - #[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None))] + #[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None, min_commit_interval = None))] pub fn compact_optimize( &mut self, partition_filters: Option>, target_size: Option, max_concurrent_tasks: Option, + min_commit_interval: Option, ) -> PyResult { let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); if let Some(size) = target_size { cmd = cmd.with_target_size(size); } + if let Some(commit_interval) = min_commit_interval { + cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); + } let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) .map_err(PythonError::from)?; cmd = cmd.with_filters(&converted_filters); @@ -295,7 +300,7 @@ impl RawDeltaTable { } /// Run z-order variation of optimize - #[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024))] + #[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024, min_commit_interval = None))] pub fn z_order_optimize( &mut self, z_order_columns: Vec, @@ -303,6 +308,7 @@ impl RawDeltaTable { target_size: Option, max_concurrent_tasks: Option, max_spill_size: usize, + min_commit_interval: Option, ) -> PyResult { let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) @@ -311,6 +317,10 @@ impl RawDeltaTable { if let Some(size) = target_size { cmd = cmd.with_target_size(size); } + if let Some(commit_interval) = min_commit_interval { + cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); + } + let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) .map_err(PythonError::from)?; cmd = cmd.with_filters(&converted_filters); diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index 665aaaec8f..4de8f7c498 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -46,3 +46,21 @@ def test_z_order_optimize( last_action = dt.history(1)[0] assert last_action["operation"] == "OPTIMIZE" assert dt.version() == old_version + 1 + + +def test_optimize_min_commit_interval( + tmp_path: pathlib.Path, + sample_data: pa.Table, +): + write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append") + write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append") + write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append") + + dt = DeltaTable(tmp_path) + old_version = dt.version() + + dt.optimize.z_order(["date32", "timestamp"], min_commit_interval=0) + + last_action = dt.history(1)[0] + assert last_action["operation"] == "OPTIMIZE" + assert dt.version() == old_version + 5 From d3282a1231dd4a4377eaa289342b618704bbc85d Mon Sep 17 00:00:00 2001 From: Ion Koutsouris Date: Tue, 19 Sep 2023 19:56:05 +0200 Subject: [PATCH 02/10] Rephrase doc --- python/deltalake/table.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index d947043702..f9db002e0b 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -706,8 +706,8 @@ def compact( :param max_concurrent_tasks: the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. - :param min_commit_interval: minimum interval in seconds before a new commit is created used for - long running executions. + :param min_commit_interval: minimum interval in seconds before a new commit is created. Interval is + useful for long running executions. :return: the metrics from optimize """ metrics = self.table._table.compact_optimize( @@ -739,8 +739,8 @@ def z_order( file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. :param max_spill_size: the maximum number of bytes to spill to disk. Defaults to 20GB. - :param min_commit_interval: minimum interval in seconds before a new commit is created used for - long running executions. + :param min_commit_interval: minimum interval in seconds before a new commit is created. Interval is + useful for long running executions. :return: the metrics from optimize """ metrics = self.table._table.z_order_optimize( From 3d69f1a3ae5fe4831fb9ec92a306e7d0a45f78ab Mon Sep 17 00:00:00 2001 From: Ion Koutsouris Date: Wed, 20 Sep 2023 07:55:00 +0200 Subject: [PATCH 03/10] Update python/tests/test_optimize.py Co-authored-by: Will Jones --- python/tests/test_optimize.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index 4de8f7c498..d5b3571647 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -63,4 +63,7 @@ def test_optimize_min_commit_interval( last_action = dt.history(1)[0] assert last_action["operation"] == "OPTIMIZE" + # The table has 5 distinct partitions, each of which are Z-ordered + # independently. So with min_commit_interval=0, each will get its + # own commit. assert dt.version() == old_version + 5 From 685cb9ee977b1ff23189a10185ad53a413b22ce1 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris Date: Wed, 20 Sep 2023 08:23:19 +0200 Subject: [PATCH 04/10] Add timedelta als optional dtype, add example and improve doc string --- python/deltalake/table.py | 48 +++++++++++++++++++++++++++++++-------- 1 file changed, 39 insertions(+), 9 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 37e5d9479b..bbc560a754 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -2,7 +2,7 @@ import operator import warnings from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timedelta from functools import reduce from pathlib import Path from typing import ( @@ -691,7 +691,7 @@ def compact( partition_filters: Optional[FilterType] = None, target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, - min_commit_interval: Optional[int] = None, + min_commit_interval: Optional[int | timedelta] = None, ) -> Dict[str, Any]: """ Compacts small files to reduce the total number of files in the table. @@ -709,12 +709,27 @@ def compact( :param max_concurrent_tasks: the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. - :param min_commit_interval: minimum interval in seconds before a new commit is created. Interval is - useful for long running executions. + :param min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is + created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you + want a commit per partition. :return: the metrics from optimize + + Examples: + + Use a timedelta object to specify the seconds, minutes or hours of the interval. + >>> from deltalake import DeltaTable + >>> from datetime import timedelta + >>> dt = DeltaTable("tmp") + >>> time_delta = timedelta(minutes=10) + >>> dt.optimize.z_order(["timestamp"], min_commit_interval=time_delta) """ + if isinstance(min_commit_interval, timedelta): + commit_interval = timedelta.seconds + else: + commit_interval = min_commit_interval + metrics = self.table._table.compact_optimize( - partition_filters, target_size, max_concurrent_tasks, min_commit_interval + partition_filters, target_size, max_concurrent_tasks, commit_interval ) self.table.update_incremental() return json.loads(metrics) @@ -726,7 +741,7 @@ def z_order( target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, max_spill_size: int = 20 * 1024 * 1024 * 1024, - min_commit_interval: Optional[int] = None, + min_commit_interval: Optional[int | timedelta] = None, ) -> Dict[str, Any]: """ Reorders the data using a Z-order curve to improve data skipping. @@ -742,17 +757,32 @@ def z_order( file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. :param max_spill_size: the maximum number of bytes to spill to disk. Defaults to 20GB. - :param min_commit_interval: minimum interval in seconds before a new commit is created. Interval is - useful for long running executions. + :param min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is + created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you + want a commit per partition. :return: the metrics from optimize + + Examples: + + Use a timedelta object to specify the seconds, minutes or hours of the interval. + >>> from deltalake import DeltaTable + >>> from datetime import timedelta + >>> dt = DeltaTable("tmp") + >>> time_delta = timedelta(minutes=10) + >>> dt.optimize.compact(min_commit_interval=time_delta) """ + if isinstance(min_commit_interval, timedelta): + commit_interval = timedelta.seconds + else: + commit_interval = min_commit_interval + metrics = self.table._table.z_order_optimize( list(columns), partition_filters, target_size, max_concurrent_tasks, max_spill_size, - min_commit_interval, + commit_interval, ) self.table.update_incremental() return json.loads(metrics) From 04f6e8eb38e26912a2a6794d77b5d246a510eb33 Mon Sep 17 00:00:00 2001 From: Ion Koutsouris Date: Wed, 20 Sep 2023 08:36:48 +0200 Subject: [PATCH 05/10] Fix typo :) --- python/deltalake/table.py | 4 ++-- python/tests/test_optimize.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index bbc560a754..225d26dc18 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -724,7 +724,7 @@ def compact( >>> dt.optimize.z_order(["timestamp"], min_commit_interval=time_delta) """ if isinstance(min_commit_interval, timedelta): - commit_interval = timedelta.seconds + commit_interval = min_commit_interval.seconds else: commit_interval = min_commit_interval @@ -772,7 +772,7 @@ def z_order( >>> dt.optimize.compact(min_commit_interval=time_delta) """ if isinstance(min_commit_interval, timedelta): - commit_interval = timedelta.seconds + commit_interval = min_commit_interval.seconds else: commit_interval = min_commit_interval diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index d5b3571647..ffd3a1a3a5 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -2,7 +2,7 @@ import pyarrow as pa import pytest - +from datetime import timedelta from deltalake import DeltaTable, write_deltalake @@ -59,7 +59,7 @@ def test_optimize_min_commit_interval( dt = DeltaTable(tmp_path) old_version = dt.version() - dt.optimize.z_order(["date32", "timestamp"], min_commit_interval=0) + dt.optimize.z_order(["date32", "timestamp"], min_commit_interval=timedelta(0)) last_action = dt.history(1)[0] assert last_action["operation"] == "OPTIMIZE" From 407be9e64ebea63282ac9bd1132ef7091a71ba1c Mon Sep 17 00:00:00 2001 From: Ion Koutsouris Date: Wed, 20 Sep 2023 08:37:01 +0200 Subject: [PATCH 06/10] formatting --- python/tests/test_optimize.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index ffd3a1a3a5..4b746b1434 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -1,8 +1,9 @@ import pathlib +from datetime import timedelta import pyarrow as pa import pytest -from datetime import timedelta + from deltalake import DeltaTable, write_deltalake From 7fe829de53e5bd9f7372ebab493825f5f5856cca Mon Sep 17 00:00:00 2001 From: ion-elgreco Date: Fri, 22 Sep 2023 07:57:03 +0200 Subject: [PATCH 07/10] Fix --- python/deltalake/table.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 225d26dc18..b0f6191d7b 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -691,7 +691,7 @@ def compact( partition_filters: Optional[FilterType] = None, target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, - min_commit_interval: Optional[int | timedelta] = None, + min_commit_interval: Optional[Union[int, timedelta]] = None, ) -> Dict[str, Any]: """ Compacts small files to reduce the total number of files in the table. @@ -724,7 +724,7 @@ def compact( >>> dt.optimize.z_order(["timestamp"], min_commit_interval=time_delta) """ if isinstance(min_commit_interval, timedelta): - commit_interval = min_commit_interval.seconds + commit_interval = int(min_commit_interval.total_seconds()) else: commit_interval = min_commit_interval @@ -741,7 +741,7 @@ def z_order( target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, max_spill_size: int = 20 * 1024 * 1024 * 1024, - min_commit_interval: Optional[int | timedelta] = None, + min_commit_interval: Optional[Union[int, timedelta]] = None, ) -> Dict[str, Any]: """ Reorders the data using a Z-order curve to improve data skipping. @@ -772,7 +772,7 @@ def z_order( >>> dt.optimize.compact(min_commit_interval=time_delta) """ if isinstance(min_commit_interval, timedelta): - commit_interval = min_commit_interval.seconds + commit_interval = int(min_commit_interval.total_seconds()) else: commit_interval = min_commit_interval From 2c30cb2edc71a7244172b38c4bb89362f811dee9 Mon Sep 17 00:00:00 2001 From: ion-elgreco Date: Fri, 22 Sep 2023 08:22:42 +0200 Subject: [PATCH 08/10] Fix mypy instead of pyright --- python/deltalake/table.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index b0f6191d7b..b0638a2f72 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -725,8 +725,10 @@ def compact( """ if isinstance(min_commit_interval, timedelta): commit_interval = int(min_commit_interval.total_seconds()) - else: + elif isinstance(min_commit_interval, int): commit_interval = min_commit_interval + else: + commit_interval = None metrics = self.table._table.compact_optimize( partition_filters, target_size, max_concurrent_tasks, commit_interval @@ -773,8 +775,10 @@ def z_order( """ if isinstance(min_commit_interval, timedelta): commit_interval = int(min_commit_interval.total_seconds()) - else: + elif isinstance(min_commit_interval, int): commit_interval = min_commit_interval + else: + commit_interval = None metrics = self.table._table.z_order_optimize( list(columns), From b369da53d60993d1cf3ee4d47915382f14492256 Mon Sep 17 00:00:00 2001 From: ion-elgreco Date: Fri, 22 Sep 2023 08:53:40 +0200 Subject: [PATCH 09/10] Reduce lines code --- python/deltalake/table.py | 16 ++++------------ 1 file changed, 4 insertions(+), 12 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index b0638a2f72..367debbf18 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -724,14 +724,10 @@ def compact( >>> dt.optimize.z_order(["timestamp"], min_commit_interval=time_delta) """ if isinstance(min_commit_interval, timedelta): - commit_interval = int(min_commit_interval.total_seconds()) - elif isinstance(min_commit_interval, int): - commit_interval = min_commit_interval - else: - commit_interval = None + min_commit_interval = int(min_commit_interval.total_seconds()) metrics = self.table._table.compact_optimize( - partition_filters, target_size, max_concurrent_tasks, commit_interval + partition_filters, target_size, max_concurrent_tasks, min_commit_interval ) self.table.update_incremental() return json.loads(metrics) @@ -774,11 +770,7 @@ def z_order( >>> dt.optimize.compact(min_commit_interval=time_delta) """ if isinstance(min_commit_interval, timedelta): - commit_interval = int(min_commit_interval.total_seconds()) - elif isinstance(min_commit_interval, int): - commit_interval = min_commit_interval - else: - commit_interval = None + min_commit_interval = int(min_commit_interval.total_seconds()) metrics = self.table._table.z_order_optimize( list(columns), @@ -786,7 +778,7 @@ def z_order( target_size, max_concurrent_tasks, max_spill_size, - commit_interval, + min_commit_interval, ) self.table.update_incremental() return json.loads(metrics) From 64c1025a1679f4cd37b286aee14b58faa4d2996a Mon Sep 17 00:00:00 2001 From: ion-elgreco Date: Fri, 22 Sep 2023 09:30:56 +0200 Subject: [PATCH 10/10] Add timedelta in ignore --- python/docs/source/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index 72a6cd929d..c11b808659 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -66,6 +66,7 @@ def get_release_version() -> str: ("py:class", "pyarrow._dataset_parquet.ParquetFileWriteOptions"), ("py:class", "pathlib.Path"), ("py:class", "datetime.datetime"), + ("py:class", "datetime.timedelta"), ] # Add any paths that contain templates here, relative to this directory.