Skip to content

Commit

Permalink
feat: expose min_commit_interval to optimize.compact and `optimize.…
Browse files Browse the repository at this point in the history
…z_order` (#1645)

# Description
Exposes min_commit_interval in the Python API to `optimize.compact` and
`optimize.z_order`. Added one test-case to verify the
min_commit_interval.

# Related Issue(s)
closes #1640

---------

Co-authored-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
ion-elgreco and wjones127 authored Sep 22, 2023
1 parent 81a5451 commit 72f8531
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 4 deletions.
37 changes: 35 additions & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import operator
import warnings
from dataclasses import dataclass
from datetime import datetime
from datetime import datetime, timedelta
from functools import reduce
from pathlib import Path
from typing import (
Expand Down Expand Up @@ -691,6 +691,7 @@ def compact(
partition_filters: Optional[FilterType] = None,
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
min_commit_interval: Optional[Union[int, timedelta]] = None,
) -> Dict[str, Any]:
"""
Compacts small files to reduce the total number of files in the table.
Expand All @@ -708,10 +709,25 @@ def compact(
:param max_concurrent_tasks: the maximum number of concurrent tasks to use for
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
:param min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
want a commit per partition.
:return: the metrics from optimize
Examples:
Use a timedelta object to specify the seconds, minutes or hours of the interval.
>>> from deltalake import DeltaTable
>>> from datetime import timedelta
>>> dt = DeltaTable("tmp")
>>> time_delta = timedelta(minutes=10)
>>> dt.optimize.z_order(["timestamp"], min_commit_interval=time_delta)
"""
if isinstance(min_commit_interval, timedelta):
min_commit_interval = int(min_commit_interval.total_seconds())

metrics = self.table._table.compact_optimize(
partition_filters, target_size, max_concurrent_tasks
partition_filters, target_size, max_concurrent_tasks, min_commit_interval
)
self.table.update_incremental()
return json.loads(metrics)
Expand All @@ -723,6 +739,7 @@ def z_order(
target_size: Optional[int] = None,
max_concurrent_tasks: Optional[int] = None,
max_spill_size: int = 20 * 1024 * 1024 * 1024,
min_commit_interval: Optional[Union[int, timedelta]] = None,
) -> Dict[str, Any]:
"""
Reorders the data using a Z-order curve to improve data skipping.
Expand All @@ -738,14 +755,30 @@ def z_order(
file compaction. Defaults to number of CPUs. More concurrent tasks can make compaction
faster, but will also use more memory.
:param max_spill_size: the maximum number of bytes to spill to disk. Defaults to 20GB.
:param min_commit_interval: minimum interval in seconds or as timedeltas before a new commit is
created. Interval is useful for long running executions. Set to 0 or timedelta(0), if you
want a commit per partition.
:return: the metrics from optimize
Examples:
Use a timedelta object to specify the seconds, minutes or hours of the interval.
>>> from deltalake import DeltaTable
>>> from datetime import timedelta
>>> dt = DeltaTable("tmp")
>>> time_delta = timedelta(minutes=10)
>>> dt.optimize.compact(min_commit_interval=time_delta)
"""
if isinstance(min_commit_interval, timedelta):
min_commit_interval = int(min_commit_interval.total_seconds())

metrics = self.table._table.z_order_optimize(
list(columns),
partition_filters,
target_size,
max_concurrent_tasks,
max_spill_size,
min_commit_interval,
)
self.table.update_incremental()
return json.loads(metrics)
1 change: 1 addition & 0 deletions python/docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def get_release_version() -> str:
("py:class", "pyarrow._dataset_parquet.ParquetFileWriteOptions"),
("py:class", "pathlib.Path"),
("py:class", "datetime.datetime"),
("py:class", "datetime.timedelta"),
]

# Add any paths that contain templates here, relative to this directory.
Expand Down
14 changes: 12 additions & 2 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::future::IntoFuture;
use std::sync::Arc;
use std::time;
use std::time::{SystemTime, UNIX_EPOCH};

use arrow::pyarrow::PyArrowType;
Expand Down Expand Up @@ -273,18 +274,22 @@ impl RawDeltaTable {
}

/// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing.
#[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None))]
#[pyo3(signature = (partition_filters = None, target_size = None, max_concurrent_tasks = None, min_commit_interval = None))]
pub fn compact_optimize(
&mut self,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<i64>,
max_concurrent_tasks: Option<usize>,
min_commit_interval: Option<u64>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get));
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
if let Some(commit_interval) = min_commit_interval {
cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval));
}
let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);
Expand All @@ -297,14 +302,15 @@ impl RawDeltaTable {
}

/// Run z-order variation of optimize
#[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024))]
#[pyo3(signature = (z_order_columns, partition_filters = None, target_size = None, max_concurrent_tasks = None, max_spill_size = 20 * 1024 * 1024 * 1024, min_commit_interval = None))]
pub fn z_order_optimize(
&mut self,
z_order_columns: Vec<String>,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<i64>,
max_concurrent_tasks: Option<usize>,
max_spill_size: usize,
min_commit_interval: Option<u64>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone())
.with_max_concurrent_tasks(max_concurrent_tasks.unwrap_or_else(num_cpus::get))
Expand All @@ -313,6 +319,10 @@ impl RawDeltaTable {
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
if let Some(commit_interval) = min_commit_interval {
cmd = cmd.with_min_commit_interval(time::Duration::from_secs(commit_interval));
}

let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PythonError::from)?;
cmd = cmd.with_filters(&converted_filters);
Expand Down
22 changes: 22 additions & 0 deletions python/tests/test_optimize.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import pathlib
from datetime import timedelta

import pyarrow as pa
import pytest
Expand Down Expand Up @@ -46,3 +47,24 @@ def test_z_order_optimize(
last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
assert dt.version() == old_version + 1


def test_optimize_min_commit_interval(
tmp_path: pathlib.Path,
sample_data: pa.Table,
):
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")
write_deltalake(tmp_path, sample_data, partition_by="utf8", mode="append")

dt = DeltaTable(tmp_path)
old_version = dt.version()

dt.optimize.z_order(["date32", "timestamp"], min_commit_interval=timedelta(0))

last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"
# The table has 5 distinct partitions, each of which are Z-ordered
# independently. So with min_commit_interval=0, each will get its
# own commit.
assert dt.version() == old_version + 5

0 comments on commit 72f8531

Please sign in to comment.