From a90b31ae29c98baaff8c3702f0e5de4e4303bbdc Mon Sep 17 00:00:00 2001 From: ion-elgreco <15728914+ion-elgreco@users.noreply.github.com> Date: Mon, 5 Aug 2024 23:29:35 +0200 Subject: [PATCH] perf: grab file size in rust --- python/deltalake/_internal.pyi | 1 + python/deltalake/table.py | 6 +----- python/src/lib.rs | 14 ++++++++++++++ python/tests/test_table_read.py | 13 ++++++------- 4 files changed, 22 insertions(+), 12 deletions(-) diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index ff55d8f95e..27033cb9d8 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -40,6 +40,7 @@ class RawDeltaTable: ) -> bool: ... def table_uri(self) -> str: ... def version(self) -> int: ... + def get_add_file_sizes(self) -> Dict[str, int]: ... def get_latest_version(self) -> int: ... def get_num_index_cols(self) -> int: ... def get_stats_columns(self) -> Optional[List[str]]: ... diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 2ae17f5f9c..4c82b40cd0 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1046,15 +1046,11 @@ def to_pyarrow_dataset( ) if not filesystem: - file_sizes = self.get_add_actions().to_pydict() - file_sizes = { - x: y for x, y in zip(file_sizes["path"], file_sizes["size_bytes"]) - } filesystem = pa_fs.PyFileSystem( DeltaStorageHandler.from_table( self._table, self._storage_options, - file_sizes, + self._table.get_add_file_sizes(), ) ) format = ParquetFileFormat( diff --git a/python/src/lib.rs b/python/src/lib.rs index 41cdb37ccb..8d0ff3cc0c 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -24,6 +24,7 @@ use deltalake::datafusion::datasource::memory::MemTable; use deltalake::datafusion::datasource::provider::TableProvider; use deltalake::datafusion::physical_plan::ExecutionPlan; use deltalake::datafusion::prelude::SessionContext; +use deltalake::delta_datafusion::cdf::FileAction; use deltalake::delta_datafusion::DeltaDataChecker; use deltalake::errors::DeltaTableError; use deltalake::kernel::{ @@ -1236,6 +1237,19 @@ impl RawDeltaTable { )) } + pub fn get_add_file_sizes(&self) -> PyResult> { + let actions = self + ._table + .snapshot() + .map_err(PythonError::from)? + .file_actions() + .map_err(PythonError::from)?; + + Ok(actions + .iter() + .map(|action| (action.path(), action.size() as i64)) + .collect::>()) + } /// Run the delete command on the delta table: delete records following a predicate and return the delete metrics. #[pyo3(signature = (predicate = None, writer_properties=None, custom_metadata=None, post_commithook_properties=None))] pub fn delete( diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index 49e6974f38..9fb644e285 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -3,7 +3,6 @@ from pathlib import Path from random import random from threading import Barrier, Thread -from types import SimpleNamespace from typing import Any, List, Tuple from unittest.mock import Mock @@ -169,18 +168,18 @@ def test_read_simple_table_update_incremental(): assert dt.to_pyarrow_dataset().to_table().to_pydict() == {"id": [5, 7, 9]} -def test_read_simple_table_file_sizes_failure(): +def test_read_simple_table_file_sizes_failure(mocker): table_path = "../crates/test/tests/data/simple_table" dt = DeltaTable(table_path) add_actions = dt.get_add_actions().to_pydict() # set all sizes to -1, the idea is to break the reading, to check # that input file sizes are actually used - add_actions_modified = { - x: [-1 for item in x] if x == "size_bytes" else y - for x, y in add_actions.items() - } - dt.get_add_actions = lambda: SimpleNamespace(to_pydict=lambda: add_actions_modified) # type:ignore + add_actions_modified = {x: -1 for x in add_actions["path"]} + mocker.patch( + "deltalake._internal.RawDeltaTable.get_add_file_sizes", + return_value=add_actions_modified, + ) with pytest.raises(OSError, match="Cannot seek past end of file."): dt.to_pyarrow_dataset().to_table().to_pydict()