Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: expose min_commit_interval to optimize.compact and optimize.z_order #1645

Merged
Merged
41 changes: 39 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import operator
import warnings
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timedelta
from functools import reduce
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -691,6 +691,7 @@ def compact(
partition_filters: Optional[FilterType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
min_commit_interval: Optional[int | timedelta] = None,
) -> Dict[str, Any]:
"""
Compacts small files to reduce the total number of files in the table.
Expand All @@ -708,10 +709,27 @@ def compact(
:param max_concurrent_tasks: the maximum number of concurrent tasks to use for
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
:param min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
want a commit per partition.
:return: the metrics from optimize

Examples:

Use a timedelta object to specify the seconds, minutes or hours of the interval.
>>> from deltalake import DeltaTable
>>> from datetime import timedelta
>>> dt = DeltaTable("tmp")
>>> time_delta = timedelta(minutes=10)
>>> dt.optimize.z_order(["timestamp"], min_commit_interval=time_delta)
"""
if isinstance(min_commit_interval, timedelta):
commit_interval = min_commit_interval.seconds
else:
commit_interval = min_commit_interval

metrics = self.table._table.compact_optimize(
partition_filters, target_size, max_concurrent_tasks
partition_filters, target_size, max_concurrent_tasks, commit_interval
)
self.table.update_incremental()
return json.loads(metrics)
Expand All @@ -723,6 +741,7 @@ def z_order(
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
max_spill_size: int = 20 * 1024 * 1024 * 1024,
min_commit_interval: Optional[int | timedelta] = None,
) -> Dict[str, Any]:
"""
Reorders the data using a Z-order curve to improve data skipping.
Expand All @@ -738,14 +757,32 @@ def z_order(
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
:param max_spill_size: the maximum number of bytes to spill to disk. Defaults to 20GB.
:param min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
want a commit per partition.
:return: the metrics from optimize

Examples:

Use a timedelta object to specify the seconds, minutes or hours of the interval.
>>> from deltalake import DeltaTable
>>> from datetime import timedelta
>>> dt = DeltaTable("tmp")
>>> time_delta = timedelta(minutes=10)
>>> dt.optimize.compact(min_commit_interval=time_delta)
"""
if isinstance(min_commit_interval, timedelta):
commit_interval = min_commit_interval.seconds
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved
else:
commit_interval = min_commit_interval

metrics = self.table._table.z_order_optimize(
list(columns),
partition_filters,
target_size,
max_concurrent_tasks,
max_spill_size,
commit_interval,
)
self.table.update_incremental()
return json.loads(metrics)
14 changes: 12 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::future::IntoFuture;
use std::sync::Arc;
use std::time;
use std::time::{SystemTime, UNIX_EPOCH};

use arrow::pyarrow::PyArrowType;
Expand Down Expand Up @@ -273,18 +274,22 @@ impl RawDeltaTable {
}

/// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing.
#[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None))]
#[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None, min_commit_interval = None))]
pub fn compact_optimize(
&mut self,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<i64>,
max_concurrent_tasks: Option<usize>,
min_commit_interval: Option<u64>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get));
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
if let Some(commit_interval) = min_commit_interval {
cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval));
}
let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);
Expand All @@ -297,14 +302,15 @@ impl RawDeltaTable {
}

/// Run z-order variation of optimize
#[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024))]
#[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024, min_commit_interval = None))]
pub fn z_order_optimize(
&mut self,
z_order_columns: Vec<String>,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<i64>,
max_concurrent_tasks: Option<usize>,
max_spill_size: usize,
min_commit_interval: Option<u64>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get))
Expand All @@ -313,6 +319,10 @@ impl RawDeltaTable {
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
if let Some(commit_interval) = min_commit_interval {
cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval));
}

let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);
Expand Down
22 changes: 22 additions & 0 deletions python/tests/test_optimize.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pathlib
from datetime import timedelta

import pyarrow as pa
import pytest
Expand Down Expand Up @@ -46,3 +47,24 @@ def test_z_order_optimize(
last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
assert dt.version() == old_version + 1


def test_optimize_min_commit_interval(
tmp_path: pathlib.Path,
sample_data: pa.Table,
):
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")

dt = DeltaTable(tmp_path)
old_version = dt.version()

dt.optimize.z_order(["date32", "timestamp"], min_commit_interval=timedelta(0))

last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
# The table has 5 distinct partitions, each of which are Z-ordered
# independently. So with min_commit_interval=0, each will get its
# own commit.
assert dt.version() == old_version + 5
ion-elgreco marked this conversation as resolved.
Show resolved Hide resolved