Skip to content

Commit

Permalink
chore: migrate to pyo3 Bounds API (#2596)
Browse files Browse the repository at this point in the history
# Description
This migrates the Python package to use the new pyo3 bounds-based API,
which allows more control over memory management on the library side and
theoretical performance improvements (I benchmarked, and didn't notice
anything substantial). The old API will be removed in 0.22.

# Related Issue(s)
<!---
For example:

- closes #106
--->

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
abhiaagarwal authored Jun 14, 2024
1 parent f041692 commit 5317aeb
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 167 deletions.
1 change: 1 addition & 0 deletions python/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ __pycache__/
# Unit test / coverage reports
.coverage
.pytest_cache/
.benchmarks/

# mypy
.mypy_cache/
Expand Down
2 changes: 1 addition & 1 deletion python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ deltalake-mount = { path = "../crates/mount" }

[dependencies.pyo3]
version = "0.21.1"
features = ["extension-module", "abi3", "abi3-py38", "gil-refs"]
features = ["extension-module", "abi3", "abi3-py38"]

[dependencies.deltalake]
path = "../crates/deltalake"
Expand Down
48 changes: 30 additions & 18 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl DeltaFileSystemHandler {
#[classmethod]
#[pyo3(signature = (table, options = None, known_sizes = None))]
fn from_table(
_cls: &PyType,
_cls: &Bound<'_, PyType>,
table: &RawDeltaTable,
options: Option<HashMap<String, String>>,
known_sizes: Option<HashMap<String, i64>>,
Expand Down Expand Up @@ -123,12 +123,20 @@ impl DeltaFileSystemHandler {
Ok(format!("{self:?}") == format!("{other:?}"))
}

fn get_file_info<'py>(&self, paths: Vec<String>, py: Python<'py>) -> PyResult<Vec<&'py PyAny>> {
let fs = PyModule::import(py, "pyarrow.fs")?;
fn get_file_info<'py>(
&self,
paths: Vec<String>,
py: Python<'py>,
) -> PyResult<Vec<Bound<'py, PyAny>>> {
let fs = PyModule::import_bound(py, "pyarrow.fs")?;
let file_types = fs.getattr("FileType")?;

let to_file_info = |loc: &str, type_: &PyAny, kwargs: &HashMap<&str, i64>| {
fs.call_method("FileInfo", (loc, type_), Some(kwargs.into_py_dict(py)))
let to_file_info = |loc: &str, type_: &Bound<'py, PyAny>, kwargs: &HashMap<&str, i64>| {
fs.call_method(
"FileInfo",
(loc, type_),
Some(&kwargs.into_py_dict_bound(py)),
)
};

let mut infos = Vec::new();
Expand All @@ -155,14 +163,14 @@ impl DeltaFileSystemHandler {
]);
infos.push(to_file_info(
meta.location.as_ref(),
file_types.getattr("File")?,
&file_types.getattr("File")?,
&kwargs,
)?);
}
Err(ObjectStoreError::NotFound { .. }) => {
infos.push(to_file_info(
path.as_ref(),
file_types.getattr("NotFound")?,
&file_types.getattr("NotFound")?,
&HashMap::new(),
)?);
}
Expand All @@ -173,7 +181,7 @@ impl DeltaFileSystemHandler {
} else {
infos.push(to_file_info(
path.as_ref(),
file_types.getattr("Directory")?,
&file_types.getattr("Directory")?,
&HashMap::new(),
)?);
}
Expand All @@ -189,12 +197,16 @@ impl DeltaFileSystemHandler {
allow_not_found: bool,
recursive: bool,
py: Python<'py>,
) -> PyResult<Vec<&'py PyAny>> {
let fs = PyModule::import(py, "pyarrow.fs")?;
) -> PyResult<Vec<Bound<'py, PyAny>>> {
let fs = PyModule::import_bound(py, "pyarrow.fs")?;
let file_types = fs.getattr("FileType")?;

let to_file_info = |loc: String, type_: &PyAny, kwargs: HashMap<&str, i64>| {
fs.call_method("FileInfo", (loc, type_), Some(kwargs.into_py_dict(py)))
let to_file_info = |loc: String, type_: &Bound<'py, PyAny>, kwargs: HashMap<&str, i64>| {
fs.call_method(
"FileInfo",
(loc, type_),
Some(&kwargs.into_py_dict_bound(py)),
)
};

let path = Self::parse_path(&base_dir);
Expand Down Expand Up @@ -222,7 +234,7 @@ impl DeltaFileSystemHandler {
.map(|p| {
to_file_info(
p.to_string(),
file_types.getattr("Directory")?,
&file_types.getattr("Directory")?,
HashMap::new(),
)
})
Expand All @@ -244,7 +256,7 @@ impl DeltaFileSystemHandler {
]);
to_file_info(
meta.location.to_string(),
file_types.getattr("File")?,
&file_types.getattr("File")?,
kwargs,
)
})
Expand Down Expand Up @@ -438,7 +450,7 @@ impl ObjectInputFile {
}

#[pyo3(signature = (nbytes = None))]
fn read(&mut self, nbytes: Option<i64>, py: Python<'_>) -> PyResult<Py<PyBytes>> {
fn read<'py>(&mut self, nbytes: Option<i64>, py: Python<'py>) -> PyResult<Bound<'py, PyBytes>> {
self.check_closed()?;
let range = match nbytes {
Some(len) => {
Expand Down Expand Up @@ -466,7 +478,7 @@ impl ObjectInputFile {
// TODO: PyBytes copies the buffer. If we move away from the limited CPython
// API (the stable C API), we could implement the buffer protocol for
// bytes::Bytes and return this zero-copy.
Ok(PyBytes::new(py, data.as_ref()).into_py(py))
Ok(PyBytes::new_bound(py, data.as_ref()))
}

fn fileno(&self) -> PyResult<()> {
Expand Down Expand Up @@ -580,7 +592,7 @@ impl ObjectOutputStream {
Err(PyNotImplementedError::new_err("'read' not implemented"))
}

fn write(&mut self, data: &PyBytes) -> PyResult<i64> {
fn write(&mut self, data: &Bound<'_, PyBytes>) -> PyResult<i64> {
self.check_closed()?;
let py = data.py();
let bytes = data.as_bytes();
Expand All @@ -598,7 +610,7 @@ impl ObjectOutputStream {
Ok(_) => Ok(()),
Err(err) => {
rt().block_on(self.upload.abort())
.map_err(|err| PythonError::from(err))?;
.map_err(PythonError::from)?;
Err(PyIOError::new_err(err.to_string()))
}
})
Expand Down
Loading

0 comments on commit 5317aeb

Please sign in to comment.