diff --git a/python/deltalake/table.py b/python/deltalake/table.py index d2462e546c..97e98c000a 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -313,6 +313,29 @@ def vacuum( return self._table.vacuum(dry_run, retention_hours, enforce_retention_duration) + def optimize( + self, + partition_filters: Optional[List[Tuple[str, str, Any]]] = None, + target_size: Optional[int] = None, + ) -> Dict[str, Any]: + """ + Compacts small files to reduce the total number of files in the table. + + This operation is idempotent; if run twice on the same table (assuming it has + not been updated) it will do nothing the second time. + + If this operation happens concurrently with any operations other than append, + it will fail. + + :param partition_filters: the partition filters that will be used for getting the matched files + :param target_size: desired file size after bin-packing files, in bytes. If not + provided, will attempt to read the table configuration value ``delta.targetFileSize``. + If that value isn't set, will use default value of 256MB. + :return: the metrics from optimize + """ + metrics = self._table.optimize(partition_filters, target_size) + return json.loads(metrics) + def pyarrow_schema(self) -> pyarrow.Schema: """ Get the current schema of the DeltaTable with the Parquet PyArrow format. diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index debd3e50d9..3282481d7d 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -399,7 +399,23 @@ only list the files to be deleted. Pass ``dry_run=False`` to actually delete fil Optimizing tables ~~~~~~~~~~~~~~~~~ -Optimizing tables is not currently supported. +Optimizing a table will perform bin-packing on a Delta Table which merges small files +into a large file. Bin-packing reduces the number of API calls required for read operations. +Optimizing will increments the table's version and creates remove actions for optimized files. +Optimize does not delete files from storage. To delete files that were removed, call :meth:`DeltaTable.vacuum`. + +Use :meth:`DeltaTable.optimize` to perform the optimize operation. Note that this method will fail if a +concurrent writer performs an operation that removes any files (such as an overwrite). + +.. code-block:: python + + >>> dt = DeltaTable("../rust/tests/data/simple_table") + >>> dt.optimize() + {'numFilesAdded': 1, 'numFilesRemoved': 5, + 'filesAdded': {'min': 555, 'max': 555, 'avg': 555.0, 'totalFiles': 1, 'totalSize': 555}, + 'filesRemoved': {'min': 262, 'max': 429, 'avg': 362.2, 'totalFiles': 5, 'totalSize': 1811}, + 'partitionsOptimized': 1, 'numBatches': 1, 'totalConsideredFiles': 5, + 'totalFilesSkipped': 0, 'preserveInsertionOrder': True} Writing Delta Tables -------------------- diff --git a/python/src/lib.rs b/python/src/lib.rs index 98d8c0a92a..bf4c307dba 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -16,6 +16,7 @@ use deltalake::builder::DeltaTableBuilder; use deltalake::checkpoints::create_checkpoint; use deltalake::datafusion::prelude::SessionContext; use deltalake::delta_datafusion::DeltaDataChecker; +use deltalake::operations::optimize::OptimizeBuilder; use deltalake::operations::transaction::commit; use deltalake::operations::vacuum::VacuumBuilder; use deltalake::partitions::PartitionFilter; @@ -314,6 +315,28 @@ impl RawDeltaTable { Ok(metrics.files_deleted) } + // Run the optimize command on the Delta Table: merge small files into a large file by bin-packing. + #[pyo3(signature = (partition_filters = None, target_size = None))] + pub fn optimize( + &mut self, + partition_filters: Option>, + target_size: Option, + ) -> PyResult { + let mut cmd = OptimizeBuilder::new(self._table.object_store(), self._table.state.clone()); + if let Some(size) = target_size { + cmd = cmd.with_target_size(size); + } + let converted_filters = convert_partition_filters(partition_filters.unwrap_or_default()) + .map_err(PyDeltaTableError::from_raw)?; + cmd = cmd.with_filters(&converted_filters); + + let (table, metrics) = rt()? + .block_on(async { cmd.await }) + .map_err(PyDeltaTableError::from_raw)?; + self._table.state = table.state; + Ok(serde_json::to_string(&metrics).unwrap()) + } + // Run the History command on the Delta Table: Returns provenance information, including the operation, user, and so on, for each write to a table. pub fn history(&mut self, limit: Option) -> PyResult> { let history = rt()? diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py new file mode 100644 index 0000000000..1c04aa5545 --- /dev/null +++ b/python/tests/test_optimize.py @@ -0,0 +1,27 @@ +import pathlib + +import pyarrow as pa +import pytest + +from deltalake import DeltaTable, write_deltalake + + +@pytest.mark.parametrize("use_relative", [True, False]) +def test_optimize_run_table( + tmp_path: pathlib.Path, sample_data: pa.Table, monkeypatch, use_relative: bool +): + if use_relative: + monkeypatch.chdir(tmp_path) # Make tmp_path the working directory + (tmp_path / "path/to/table").mkdir(parents=True) + table_path = "./path/to/table" + else: + table_path = str(tmp_path) + + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + write_deltalake(table_path, sample_data, mode="append") + + dt = DeltaTable(table_path) + dt.optimize() + last_action = dt.history(1)[0] + assert last_action["operation"] == "OPTIMIZE"