Skip to content

Commit

Permalink
feat: add optimize command in python binding (#1313)
Browse files Browse the repository at this point in the history
# Description
This is a implementation of the Optimize Command for python binding.

# Related Issue(s)
#622

---------

Co-authored-by: dengkai02 <dengkai02@appledeMacBook-Pro-3.local>
  • Loading branch information
loleek and dengkai02 authored May 4, 2023
1 parent 338208d commit fdb5e7b
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 1 deletion.
23 changes: 23 additions & 0 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,29 @@ def vacuum(

return self._table.vacuum(dry_run, retention_hours, enforce_retention_duration)

def optimize(
self,
partition_filters: Optional[List[Tuple[str, str, Any]]] = None,
target_size: Optional[int] = None,
) -> Dict[str, Any]:
"""
Compacts small files to reduce the total number of files in the table.
This operation is idempotent; if run twice on the same table (assuming it has
not been updated) it will do nothing the second time.
If this operation happens concurrently with any operations other than append,
it will fail.
:param partition_filters: the partition filters that will be used for getting the matched files
:param target_size: desired file size after bin-packing files, in bytes. If not
provided, will attempt to read the table configuration value ``delta.targetFileSize``.
If that value isn't set, will use default value of 256MB.
:return: the metrics from optimize
"""
metrics = self._table.optimize(partition_filters, target_size)
return json.loads(metrics)

def pyarrow_schema(self) -> pyarrow.Schema:
"""
Get the current schema of the DeltaTable with the Parquet PyArrow format.
Expand Down
18 changes: 17 additions & 1 deletion python/docs/source/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,23 @@ only list the files to be deleted. Pass ``dry_run=False`` to actually delete fil
Optimizing tables
~~~~~~~~~~~~~~~~~

Optimizing tables is not currently supported.
Optimizing a table will perform bin-packing on a Delta Table which merges small files
into a large file. Bin-packing reduces the number of API calls required for read operations.
Optimizing will increments the table's version and creates remove actions for optimized files.
Optimize does not delete files from storage. To delete files that were removed, call :meth:`DeltaTable.vacuum`.

Use :meth:`DeltaTable.optimize` to perform the optimize operation. Note that this method will fail if a
concurrent writer performs an operation that removes any files (such as an overwrite).

.. code-block:: python
>>> dt = DeltaTable("../rust/tests/data/simple_table")
>>> dt.optimize()
{'numFilesAdded': 1, 'numFilesRemoved': 5,
'filesAdded': {'min': 555, 'max': 555, 'avg': 555.0, 'totalFiles': 1, 'totalSize': 555},
'filesRemoved': {'min': 262, 'max': 429, 'avg': 362.2, 'totalFiles': 5, 'totalSize': 1811},
'partitionsOptimized': 1, 'numBatches': 1, 'totalConsideredFiles': 5,
'totalFilesSkipped': 0, 'preserveInsertionOrder': True}
Writing Delta Tables
--------------------
Expand Down
23 changes: 23 additions & 0 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use deltalake::builder::DeltaTableBuilder;
use deltalake::checkpoints::create_checkpoint;
use deltalake::datafusion::prelude::SessionContext;
use deltalake::delta_datafusion::DeltaDataChecker;
use deltalake::operations::optimize::OptimizeBuilder;
use deltalake::operations::transaction::commit;
use deltalake::operations::vacuum::VacuumBuilder;
use deltalake::partitions::PartitionFilter;
Expand Down Expand Up @@ -314,6 +315,28 @@ impl RawDeltaTable {
Ok(metrics.files_deleted)
}

// Run the optimize command on the Delta Table: merge small files into a large file by bin-packing.
#[pyo3(signature = (partition_filters = None, target_size = None))]
pub fn optimize(
&mut self,
partition_filters: Option<Vec<(&str, &str, PartitionFilterValue)>>,
target_size: Option<DeltaDataTypeLong>,
) -> PyResult<String> {
let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone());
if let Some(size) = target_size {
cmd = cmd.with_target_size(size);
}
let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default())
.map_err(PyDeltaTableError::from_raw)?;
cmd = cmd.with_filters(&converted_filters);

let (table, metrics) = rt()?
.block_on(async { cmd.await })
.map_err(PyDeltaTableError::from_raw)?;
self._table.state = table.state;
Ok(serde_json::to_string(&metrics).unwrap())
}

// Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table.
pub fn history(&mut self, limit: Option<usize>) -> PyResult<Vec<String>> {
let history = rt()?
Expand Down
27 changes: 27 additions & 0 deletions python/tests/test_optimize.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import pathlib

import pyarrow as pa
import pytest

from deltalake import DeltaTable, write_deltalake


@pytest.mark.parametrize("use_relative", [True, False])
def test_optimize_run_table(
tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool
):
if use_relative:
monkeypatch.chdir(tmp_path) # Make tmp_path the working directory
(tmp_path / "path/to/table").mkdir(parents=True)
table_path = "./path/to/table"
else:
table_path = str(tmp_path)

write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")
write_deltalake(table_path, sample_data, mode="append")

dt = DeltaTable(table_path)
dt.optimize()
last_action = dt.history(1)[0]
assert last_action["operation"] == "OPTIMIZE"

0 comments on commit fdb5e7b

Please sign in to comment.