diff --git a/python/deltalake/_internal.pyi b/python/deltalake/_internal.pyi index 4aae9b9872..0f8eda3d07 100644 --- a/python/deltalake/_internal.pyi +++ b/python/deltalake/_internal.pyi @@ -722,10 +722,16 @@ class DeltaFileSystemHandler: def __init__( self, table_uri: str, - table: Optional[RawDeltaTable] = None, options: dict[str, str] | None = None, known_sizes: dict[str, int] | None = None, ) -> None: ... + @classmethod + def from_table( + cls, + table: RawDeltaTable, + options: dict[str, str] | None = None, + known_sizes: dict[str, int] | None = None, + ) -> "DeltaFileSystemHandler": ... def get_type_name(self) -> str: ... def copy_file(self, src: str, dst: str) -> None: """Copy a file. diff --git a/python/deltalake/fs.py b/python/deltalake/fs.py index 12e33f40e3..46b8ab9252 100644 --- a/python/deltalake/fs.py +++ b/python/deltalake/fs.py @@ -1,17 +1,102 @@ -from typing import Dict, List, Optional +from typing import Any, Dict, List, Optional import pyarrow as pa from pyarrow.fs import FileInfo, FileSelector, FileSystemHandler -from ._internal import DeltaFileSystemHandler +from ._internal import DeltaFileSystemHandler, RawDeltaTable # NOTE we need to inherit form FileSystemHandler to pass pyarrow's internal type checks. -class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler): +class DeltaStorageHandler(FileSystemHandler): """ DeltaStorageHandler is a concrete implementations of a PyArrow FileSystemHandler. """ + def __init__( + self, + table_uri: str, + options: dict[str, str] | None = None, + known_sizes: dict[str, int] | None = None, + ): + self._handler = DeltaFileSystemHandler( + table_uri=table_uri, options=options, known_sizes=known_sizes + ) + + @classmethod + def from_table( + cls, + table: RawDeltaTable, + options: dict[str, str] | None = None, + known_sizes: dict[str, int] | None = None, + ) -> "DeltaStorageHandler": + self = cls.__new__(cls) + self._handler = DeltaFileSystemHandler.from_table(table, options, known_sizes) + return self + + def get_type_name(self) -> str: + return self._handler.get_type_name() + + def copy_file(self, src: str, dst: str) -> None: + """Copy a file. + + If the destination exists and is a directory, an error is returned. Otherwise, it is replaced. + """ + return self._handler.copy_file(src=src, dst=dst) + + def create_dir(self, path: str, recursive: bool = True) -> None: + """Create a directory and subdirectories. + + This function succeeds if the directory already exists. + """ + return self._handler.create_dir(path, recursive) + + def delete_dir(self, path: str) -> None: + """Delete a directory and its contents, recursively.""" + return self._handler.delete_dir(path) + + def delete_file(self, path: str) -> None: + """Delete a file.""" + return self._handler.delete_file(path) + + def equals(self, other: Any) -> bool: + return self._handler.equals(other) + + def delete_dir_contents( + self, path: str, *, accept_root_dir: bool = False, missing_dir_ok: bool = False + ) -> None: + """Delete a directory's contents, recursively. + + Like delete_dir, but doesn't delete the directory itself. + """ + return self._handler.delete_dir_contents( + path=path, accept_root_dir=accept_root_dir, missing_dir_ok=missing_dir_ok + ) + + def delete_root_dir_contents(self) -> None: + """Delete the root directory contents, recursively.""" + return self._handler.delete_root_dir_contents() + + def get_file_info(self, paths: list[str]) -> list[FileInfo]: + """Get info for the given files. + + A non-existing or unreachable file returns a FileStat object and has a FileType of value NotFound. + An exception indicates a truly exceptional condition (low-level I/O error, etc.). + """ + return self._handler.get_file_info(paths) + + def move(self, src: str, dest: str) -> None: + """Move / rename a file or directory. + + If the destination exists: - if it is a non-empty directory, an error is returned - otherwise, + if it has the same type as the source, it is replaced - otherwise, behavior is + unspecified (implementation-dependent). + """ + self._handler.move_file(src=src, dest=dest) + + def normalize_path(self, path: str) -> str: + """Normalize filesystem path.""" + return self._handler.normalize_path(path) + def open_input_file(self, path: str) -> pa.PythonFile: """ Open an input file for random access reading. @@ -22,7 +107,7 @@ def open_input_file(self, path: str) -> pa.PythonFile: Returns: NativeFile """ - return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path)) + return pa.PythonFile(self._handler.open_input_file(path)) def open_input_stream(self, path: str) -> pa.PythonFile: """ @@ -34,7 +119,7 @@ def open_input_stream(self, path: str) -> pa.PythonFile: Returns: NativeFile """ - return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path)) + return pa.PythonFile(self._handler.open_input_file(path)) def open_output_stream( self, path: str, metadata: Optional[Dict[str, str]] = None @@ -51,9 +136,7 @@ def open_output_stream( Returns: NativeFile """ - return pa.PythonFile( - DeltaFileSystemHandler.open_output_stream(self, path, metadata) - ) + return pa.PythonFile(self._handler.open_output_stream(path, metadata)) def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # type: ignore """ @@ -65,6 +148,9 @@ def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # t Returns: list of file info objects """ - return DeltaFileSystemHandler.get_file_info_selector( - self, selector.base_dir, selector.allow_not_found, selector.recursive + return self._handler.get_file_info_selector( + selector.base_dir, selector.allow_not_found, selector.recursive ) + + def open_append_stream(path: str, metadata: dict): + raise NotImplementedError diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 505f14e308..31e71e8fc6 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1095,8 +1095,7 @@ def to_pyarrow_dataset( x: y for x, y in zip(file_sizes["path"], file_sizes["size_bytes"]) } filesystem = pa_fs.PyFileSystem( - DeltaStorageHandler( - self._table.table_uri(), + DeltaStorageHandler.from_table( self._table, self._storage_options, file_sizes, diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 2e10075e53..eaf95650ea 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -346,8 +346,8 @@ def sort_arrow_schema(schema: pa.schema) -> pa.schema: if table: # already exists filesystem = pa_fs.PyFileSystem( - DeltaStorageHandler( - table_uri, table=table._table, options=storage_options + DeltaStorageHandler.from_table( + table=table._table, options=storage_options ) ) diff --git a/python/src/filesystem.rs b/python/src/filesystem.rs index ed831b92b6..2825bf9092 100644 --- a/python/src/filesystem.rs +++ b/python/src/filesystem.rs @@ -5,7 +5,7 @@ use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreErr use deltalake::DeltaTableBuilder; use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError}; use pyo3::prelude::*; -use pyo3::types::{IntoPyDict, PyBytes}; +use pyo3::types::{IntoPyDict, PyBytes, PyType}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::sync::Arc; @@ -41,22 +41,17 @@ impl DeltaFileSystemHandler { #[pymethods] impl DeltaFileSystemHandler { #[new] - #[pyo3(signature = (table_uri, table = None, options = None, known_sizes = None))] + #[pyo3(signature = (table_uri, options = None, known_sizes = None))] fn new( table_uri: String, - table: Option<&RawDeltaTable>, options: Option>, known_sizes: Option>, ) -> PyResult { - let storage = if let Some(table) = table { - table._table.object_store() - } else { - DeltaTableBuilder::from_uri(&table_uri) - .with_storage_options(options.clone().unwrap_or_default()) - .build_storage() - .map_err(PythonError::from)? - .object_store() - }; + let storage = DeltaTableBuilder::from_uri(&table_uri) + .with_storage_options(options.clone().unwrap_or_default()) + .build_storage() + .map_err(PythonError::from)? + .object_store(); Ok(Self { inner: storage, @@ -68,6 +63,25 @@ impl DeltaFileSystemHandler { }) } + #[classmethod] + #[pyo3(signature = (table, options = None, known_sizes = None))] + fn from_table( + _cls: &PyType, + table: &RawDeltaTable, + options: Option>, + known_sizes: Option>, + ) -> PyResult { + let storage = table._table.object_store(); + Ok(Self { + inner: storage, + config: FsConfig { + root_url: table._table.table_uri(), + options: options.unwrap_or_default(), + }, + known_sizes, + }) + } + fn get_type_name(&self) -> String { "object-store".into() } diff --git a/python/tests/test_fs.py b/python/tests/test_fs.py index d23ddbf3e9..245d97a89a 100644 --- a/python/tests/test_fs.py +++ b/python/tests/test_fs.py @@ -1,3 +1,4 @@ +import pickle import urllib import pyarrow as pa @@ -240,3 +241,16 @@ def test_roundtrip_azure_decoded_sas(azurite_sas_creds, sample_data: pa.Table): table = dt.to_pyarrow_table() assert table == sample_data assert dt.version() == 0 + + +def test_pickle_roundtrip(tmp_path): + store = DeltaStorageHandler(str(tmp_path.absolute())) + + with (tmp_path / "asd.pkl").open("wb") as handle: + pickle.dump(store, handle) + + with (tmp_path / "asd.pkl").open("rb") as handle: + store_pkl = pickle.load(handle) + + infos = store_pkl.get_file_info(["asd.pkl"]) + assert infos[0].size > 0