diff --git a/python/deltalake/table.py b/python/deltalake/table.py index cf7d844e11..367debbf18 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -2,7 +2,7 @@ import operator import warnings from dataclasses import dataclass -from datetime import datetime +from datetime import datetime, timedelta from functools import reduce from pathlib import Path from typing import ( @@ -691,6 +691,7 @@ def compact( partition_filters: Optional[FilterType] = None, target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, + min_commit_interval: Optional[Union[int, timedelta]] = None, ) -> Dict[str, Any]: """ Compacts small files to reduce the total number of files in the table. @@ -708,10 +709,25 @@ def compact( :param max_concurrent_tasks: the maximum number of concurrent tasks to use for file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. + :param min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is + created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you + want a commit per partition. :return: the metrics from optimize + + Examples: + + Use a timedelta object to specify the seconds, minutes or hours of the interval. + >>> from deltalake import DeltaTable + >>> from datetime import timedelta + >>> dt = DeltaTable("tmp") + >>> time_delta = timedelta(minutes=10) + >>> dt.optimize.z_order(["timestamp"], min_commit_interval=time_delta) """ + if isinstance(min_commit_interval, timedelta): + min_commit_interval = int(min_commit_interval.total_seconds()) + metrics = self.table._table.compact_optimize( - partition_filters, target_size, max_concurrent_tasks + partition_filters, target_size, max_concurrent_tasks, min_commit_interval ) self.table.update_incremental() return json.loads(metrics) @@ -723,6 +739,7 @@ def z_order( target_size: Optional[int] = None, max_concurrent_tasks: Optional[int] = None, max_spill_size: int = 20 * 1024 * 1024 * 1024, + min_commit_interval: Optional[Union[int, timedelta]] = None, ) -> Dict[str, Any]: """ Reorders the data using a Z-order curve to improve data skipping. @@ -738,14 +755,30 @@ def z_order( file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction faster, but will also use more memory. :param max_spill_size: the maximum number of bytes to spill to disk. Defaults to 20GB. + :param min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is + created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you + want a commit per partition. :return: the metrics from optimize + + Examples: + + Use a timedelta object to specify the seconds, minutes or hours of the interval. + >>> from deltalake import DeltaTable + >>> from datetime import timedelta + >>> dt = DeltaTable("tmp") + >>> time_delta = timedelta(minutes=10) + >>> dt.optimize.compact(min_commit_interval=time_delta) """ + if isinstance(min_commit_interval, timedelta): + min_commit_interval = int(min_commit_interval.total_seconds()) + metrics = self.table._table.z_order_optimize( list(columns), partition_filters, target_size, max_concurrent_tasks, max_spill_size, + min_commit_interval, ) self.table.update_incremental() return json.loads(metrics) diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index 72a6cd929d..c11b808659 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -66,6 +66,7 @@ def get_release_version() -> str: ("py:class", "pyarrow._dataset_parquet.ParquetFileWriteOptions"), ("py:class", "pathlib.Path"), ("py:class", "datetime.datetime"), + ("py:class", "datetime.timedelta"), ] # Add any paths that contain templates here, relative to this directory. diff --git a/python/src/lib.rs b/python/src/lib.rs index b4fc515f2b..8d7cdb486d 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -9,6 +9,7 @@ use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::future::IntoFuture; use std::sync::Arc; +use std::time; use std::time::{SystemTime, UNIX_EPOCH}; use arrow::pyarrow::PyArrowType; @@ -273,18 +274,22 @@ impl RawDeltaTable { } /// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing. - #[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None))] + #[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None, min_commit_interval = None))] pub fn compact_optimize( &mut self, partition_filters: Option>, target_size: Option, max_concurrent_tasks: Option, + min_commit_interval: Option, ) -> PyResult { let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)); if let Some(size) = target_size { cmd = cmd.with_target_size(size); } + if let Some(commit_interval) = min_commit_interval { + cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); + } let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) .map_err(PythonError::from)?; cmd = cmd.with_filters(&converted_filters); @@ -297,7 +302,7 @@ impl RawDeltaTable { } /// Run z-order variation of optimize - #[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024))] + #[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024, min_commit_interval = None))] pub fn z_order_optimize( &mut self, z_order_columns: Vec, @@ -305,6 +310,7 @@ impl RawDeltaTable { target_size: Option, max_concurrent_tasks: Option, max_spill_size: usize, + min_commit_interval: Option, ) -> PyResult { let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()) .with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get)) @@ -313,6 +319,10 @@ impl RawDeltaTable { if let Some(size) = target_size { cmd = cmd.with_target_size(size); } + if let Some(commit_interval) = min_commit_interval { + cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval)); + } + let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) .map_err(PythonError::from)?; cmd = cmd.with_filters(&converted_filters); diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index 665aaaec8f..4b746b1434 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -1,4 +1,5 @@ import pathlib +from datetime import timedelta import pyarrow as pa import pytest @@ -46,3 +47,24 @@ def test_z_order_optimize( last_action = dt.history(1)[0] assert last_action["operation"] == "OPTIMIZE" assert dt.version() == old_version + 1 + + +def test_optimize_min_commit_interval( + tmp_path: pathlib.Path, + sample_data: pa.Table, +): + write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append") + write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append") + write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append") + + dt = DeltaTable(tmp_path) + old_version = dt.version() + + dt.optimize.z_order(["date32", "timestamp"], min_commit_interval=timedelta(0)) + + last_action = dt.history(1)[0] + assert last_action["operation"] == "OPTIMIZE" + # The table has 5 distinct partitions, each of which are Z-ordered + # independently. So with min_commit_interval=0, each will get its + # own commit. + assert dt.version() == old_version + 5