Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose min_commit_interval to optimize.compact and optimize.z_order #1645

Merged
Merged
9 changes: 8 additions & 1 deletion python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ def compact(
partition_filters: Optional[FilterType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
min_commit_interval: Optional[int] = None,
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
) -> Dict[str, Any]:
"""
Compacts small files to reduce the total number of files in the table.
Expand All @@ -708,10 +709,12 @@ def compact(
:param max_concurrent_tasks: the maximum number of concurrent tasks to use for
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
:param min_commit_interval: minimum interval in seconds before a new commit is created. Interval is
useful for long running executions.
:return: the metrics from optimize
"""
metrics = self.table._table.compact_optimize(
partition_filters, target_size, max_concurrent_tasks
partition_filters, target_size, max_concurrent_tasks, min_commit_interval
)
self.table.update_incremental()
return json.loads(metrics)
Expand All @@ -723,6 +726,7 @@ def z_order(
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
max_spill_size: int = 20 * 1024 * 1024 * 1024,
min_commit_interval: Optional[int] = None,
) -> Dict[str, Any]:
"""
Reorders the data using a Z-order curve to improve data skipping.
Expand All @@ -738,6 +742,8 @@ def z_order(
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
:param max_spill_size: the maximum number of bytes to spill to disk. Defaults to 20GB.
:param min_commit_interval: minimum interval in seconds before a new commit is created. Interval is
useful for long running executions.
:return: the metrics from optimize
"""
metrics = self.table._table.z_order_optimize(
Expand All @@ -746,6 +752,7 @@ def z_order(
target_size,
max_concurrent_tasks,
max_spill_size,
min_commit_interval,
)
self.table.update_incremental()
return json.loads(metrics)
14 changes: 12 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::future::IntoFuture;
use std::sync::Arc;
use std::time;
use std::time::{SystemTime, UNIX_EPOCH};

use arrow::pyarrow::PyArrowType;
Expand Down Expand Up @@ -271,18 +272,22 @@ impl RawDeltaTable {
}

/// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing.
#[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None))]
#[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None, min_commit_interval = None))]
pub fn compact_optimize(
&mut self,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<i64>,
max_concurrent_tasks: Option<usize>,
min_commit_interval: Option<u64>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get));
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
if let Some(commit_interval) = min_commit_interval {
cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval));
}
let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);
Expand All @@ -295,14 +300,15 @@ impl RawDeltaTable {
}

/// Run z-order variation of optimize
#[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024))]
#[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024, min_commit_interval = None))]
pub fn z_order_optimize(
&mut self,
z_order_columns: Vec<String>,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<i64>,
max_concurrent_tasks: Option<usize>,
max_spill_size: usize,
min_commit_interval: Option<u64>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get))
Expand All @@ -311,6 +317,10 @@ impl RawDeltaTable {
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
if let Some(commit_interval) = min_commit_interval {
cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval));
}

let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);
Expand Down
18 changes: 18 additions & 0 deletions python/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,21 @@ def test_z_order_optimize(
last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
assert dt.version() == old_version + 1


def test_optimize_min_commit_interval(
tmp_path: pathlib.Path,
sample_data: pa.Table,
):
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")

dt = DeltaTable(tmp_path)
old_version = dt.version()

dt.optimize.z_order(["date32", "timestamp"], min_commit_interval=0)

last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
assert dt.version() == old_version + 5
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved