Skip to content

Commit

Permalink
wrap DeltaFileSystemHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed May 10, 2024
1 parent 31d207b commit 3fd6dbf
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 27 deletions.
8 changes: 7 additions & 1 deletion python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -722,10 +722,16 @@ class DeltaFileSystemHandler:
def __init__(
self,
table_uri: str,
table: Optional[RawDeltaTable] = None,
options: dict[str, str] | None = None,
known_sizes: dict[str, int] | None = None,
) -> None: ...
@classmethod
def from_table(
cls,
table: RawDeltaTable,
options: dict[str, str] | None = None,
known_sizes: dict[str, int] | None = None,
) -> "DeltaFileSystemHandler": ...
def get_type_name(self) -> str: ...
def copy_file(self, src: str, dst: str) -> None:
"""Copy a file.
Expand Down
106 changes: 96 additions & 10 deletions python/deltalake/fs.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,102 @@
from typing import Dict, List, Optional
from typing import Any, Dict, List, Optional

import pyarrow as pa
from pyarrow.fs import FileInfo, FileSelector, FileSystemHandler

from ._internal import DeltaFileSystemHandler
from ._internal import DeltaFileSystemHandler, RawDeltaTable


# NOTE we need to inherit form FileSystemHandler to pass pyarrow's internal type checks.
class DeltaStorageHandler(DeltaFileSystemHandler, FileSystemHandler):
class DeltaStorageHandler(FileSystemHandler):
"""
DeltaStorageHandler is a concrete implementations of a PyArrow FileSystemHandler.
"""

def __init__(
self,
table_uri: str,
options: dict[str, str] | None = None,
known_sizes: dict[str, int] | None = None,
):
self._handler = DeltaFileSystemHandler(
table_uri=table_uri, options=options, known_sizes=known_sizes
)

@classmethod
def from_table(
cls,
table: RawDeltaTable,
options: dict[str, str] | None = None,
known_sizes: dict[str, int] | None = None,
) -> "DeltaStorageHandler":
self = cls.__new__(cls)
self._handler = DeltaFileSystemHandler.from_table(table, options, known_sizes)
return self

def get_type_name(self) -> str:
return self._handler.get_type_name()

def copy_file(self, src: str, dst: str) -> None:
"""Copy a file.
If the destination exists and is a directory, an error is returned. Otherwise, it is replaced.
"""
return self._handler.copy_file(src=src, dst=dst)

def create_dir(self, path: str, recursive: bool = True) -> None:
"""Create a directory and subdirectories.
This function succeeds if the directory already exists.
"""
return self._handler.create_dir(path, recursive)

def delete_dir(self, path: str) -> None:
"""Delete a directory and its contents, recursively."""
return self._handler.delete_dir(path)

def delete_file(self, path: str) -> None:
"""Delete a file."""
return self._handler.delete_file(path)

def equals(self, other: Any) -> bool:
return self._handler.equals(other)

def delete_dir_contents(
self, path: str, *, accept_root_dir: bool = False, missing_dir_ok: bool = False
) -> None:
"""Delete a directory's contents, recursively.
Like delete_dir, but doesn't delete the directory itself.
"""
return self._handler.delete_dir_contents(
path=path, accept_root_dir=accept_root_dir, missing_dir_ok=missing_dir_ok
)

def delete_root_dir_contents(self) -> None:
"""Delete the root directory contents, recursively."""
return self._handler.delete_root_dir_contents()

def get_file_info(self, paths: list[str]) -> list[FileInfo]:
"""Get info for the given files.
A non-existing or unreachable file returns a FileStat object and has a FileType of value NotFound.
An exception indicates a truly exceptional condition (low-level I/O error, etc.).
"""
return self._handler.get_file_info(paths)

def move(self, src: str, dest: str) -> None:
"""Move / rename a file or directory.
If the destination exists: - if it is a non-empty directory, an error is returned - otherwise,
if it has the same type as the source, it is replaced - otherwise, behavior is
unspecified (implementation-dependent).
"""
self._handler.move_file(src=src, dest=dest)

def normalize_path(self, path: str) -> str:
"""Normalize filesystem path."""
return self._handler.normalize_path(path)

def open_input_file(self, path: str) -> pa.PythonFile:
"""
Open an input file for random access reading.
Expand All @@ -22,7 +107,7 @@ def open_input_file(self, path: str) -> pa.PythonFile:
Returns:
NativeFile
"""
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))
return pa.PythonFile(self._handler.open_input_file(path))

def open_input_stream(self, path: str) -> pa.PythonFile:
"""
Expand All @@ -34,7 +119,7 @@ def open_input_stream(self, path: str) -> pa.PythonFile:
Returns:
NativeFile
"""
return pa.PythonFile(DeltaFileSystemHandler.open_input_file(self, path))
return pa.PythonFile(self._handler.open_input_file(path))

def open_output_stream(
self, path: str, metadata: Optional[Dict[str, str]] = None
Expand All @@ -51,9 +136,7 @@ def open_output_stream(
Returns:
NativeFile
"""
return pa.PythonFile(
DeltaFileSystemHandler.open_output_stream(self, path, metadata)
)
return pa.PythonFile(self._handler.open_output_stream(path, metadata))

def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # type: ignore
"""
Expand All @@ -65,6 +148,9 @@ def get_file_info_selector(self, selector: FileSelector) -> List[FileInfo]: # t
Returns:
list of file info objects
"""
return DeltaFileSystemHandler.get_file_info_selector(
self, selector.base_dir, selector.allow_not_found, selector.recursive
return self._handler.get_file_info_selector(
selector.base_dir, selector.allow_not_found, selector.recursive
)

def open_append_stream(path: str, metadata: dict):
raise NotImplementedError
3 changes: 1 addition & 2 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1095,8 +1095,7 @@ def to_pyarrow_dataset(
x: y for x, y in zip(file_sizes["path"], file_sizes["size_bytes"])
}
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler(
self._table.table_uri(),
DeltaStorageHandler.from_table(
self._table,
self._storage_options,
file_sizes,
Expand Down
4 changes: 2 additions & 2 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ def sort_arrow_schema(schema: pa.schema) -> pa.schema:

if table: # already exists
filesystem = pa_fs.PyFileSystem(
DeltaStorageHandler(
table_uri, table=table._table, options=storage_options
DeltaStorageHandler.from_table(
table=table._table, options=storage_options
)
)

Expand Down
38 changes: 26 additions & 12 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreErr
use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
use pyo3::prelude::*;
use pyo3::types::{IntoPyDict, PyBytes};
use pyo3::types::{IntoPyDict, PyBytes, PyType};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
Expand Down Expand Up @@ -41,22 +41,17 @@ impl DeltaFileSystemHandler {
#[pymethods]
impl DeltaFileSystemHandler {
#[new]
#[pyo3(signature = (table_uri, table = None, options = None, known_sizes = None))]
#[pyo3(signature = (table_uri, options = None, known_sizes = None))]
fn new(
table_uri: String,
table: Option<&RawDeltaTable>,
options: Option<HashMap<String, String>>,
known_sizes: Option<HashMap<String, i64>>,
) -> PyResult<Self> {
let storage = if let Some(table) = table {
table._table.object_store()
} else {
DeltaTableBuilder::from_uri(&table_uri)
.with_storage_options(options.clone().unwrap_or_default())
.build_storage()
.map_err(PythonError::from)?
.object_store()
};
let storage = DeltaTableBuilder::from_uri(&table_uri)
.with_storage_options(options.clone().unwrap_or_default())
.build_storage()
.map_err(PythonError::from)?
.object_store();

Ok(Self {
inner: storage,
Expand All @@ -68,6 +63,25 @@ impl DeltaFileSystemHandler {
})
}

#[classmethod]
#[pyo3(signature = (table, options = None, known_sizes = None))]
fn from_table(
_cls: &PyType,
table: &RawDeltaTable,
options: Option<HashMap<String, String>>,
known_sizes: Option<HashMap<String, i64>>,
) -> PyResult<Self> {
let storage = table._table.object_store();
Ok(Self {
inner: storage,
config: FsConfig {
root_url: table._table.table_uri(),
options: options.unwrap_or_default(),
},
known_sizes,
})
}

fn get_type_name(&self) -> String {
"object-store".into()
}
Expand Down
14 changes: 14 additions & 0 deletions python/tests/test_fs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import pickle
import urllib

import pyarrow as pa
Expand Down Expand Up @@ -240,3 +241,16 @@ def test_roundtrip_azure_decoded_sas(azurite_sas_creds, sample_data: pa.Table):
table = dt.to_pyarrow_table()
assert table == sample_data
assert dt.version() == 0


def test_pickle_roundtrip(tmp_path):
store = DeltaStorageHandler(str(tmp_path.absolute()))

with (tmp_path / "asd.pkl").open("wb") as handle:
pickle.dump(store, handle)

with (tmp_path / "asd.pkl").open("rb") as handle:
store_pkl = pickle.load(handle)

infos = store_pkl.get_file_info(["asd.pkl"])
assert infos[0].size > 0

0 comments on commit 3fd6dbf

Please sign in to comment.