Skip to content

Commit

Permalink
feat: explicit python exceptions (#1409)
Browse files Browse the repository at this point in the history
# Description

Working on an integration, I felt that our error handling in python was
a bit opaque and handling errors on the python side would have to rely
on inspecting the error message, which can easily silently break, if not
covered by tests.

The proposed approach would be to introduce a few more python exceptions
and - as we already do - rely on existing python exceptions where
appropriate.

As this would be a bigger breaking change on the python side I guess,
looking for some feedback, if you think this is worthwhile.

@wjones127 @fvaleye @rtyler 

# Related Issue(s)
<!---
For example:

- closes #106
--->

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
  • Loading branch information
roeap and rtyler authored Jun 2, 2023
1 parent fa96a5a commit fc61ea7
Show file tree
Hide file tree
Showing 13 changed files with 243 additions and 194 deletions.
1 change: 1 addition & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ lazy_static = "1"
regex = "1"
serde = "1"
serde_json = "1"
thiserror = "1"
tokio = { version = "1", features = ["rt-multi-thread"] }

# reqwest is pulled in by azure sdk, but not used by python binding itself
Expand Down
1 change: 0 additions & 1 deletion python/deltalake/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from ._internal import PyDeltaTableError as PyDeltaTableError
from ._internal import __version__ as __version__
from ._internal import rust_core_version as rust_core_version
from .data_catalog import DataCatalog as DataCatalog
Expand Down
22 changes: 20 additions & 2 deletions python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ __version__: str
RawDeltaTable: Any
rust_core_version: Callable[[], str]

class PyDeltaTableError(BaseException): ...

write_new_deltalake: Callable[
[
str,
Expand Down Expand Up @@ -266,3 +264,23 @@ class DeltaFileSystemHandler:
class DeltaDataChecker:
def __init__(self, invariants: List[Tuple[str, str]]) -> None: ...
def check_batch(self, batch: pa.RecordBatch) -> None: ...

class DeltaError(Exception):
"""The base class for Delta-specific errors."""

pass

class TableNotFoundError(DeltaError):
"""Raised when a Delta table cannot be loaded from a location."""

pass

class CommitFailedError(DeltaError):
"""Raised when a commit to a Delta table fails."""

pass

class DeltaProtocolError(DeltaError):
"""Raised when a violation with the Delta protocol specs ocurred."""

pass
4 changes: 4 additions & 0 deletions python/deltalake/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
from ._internal import CommitFailedError as CommitFailedError
from ._internal import DeltaError as DeltaError
from ._internal import DeltaProtocolError as DeltaProtocolError
from ._internal import TableNotFoundError as TableNotFoundError
10 changes: 3 additions & 7 deletions python/deltalake/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,12 @@
if TYPE_CHECKING:
import pandas

from ._internal import PyDeltaTableError, RawDeltaTable
from ._internal import RawDeltaTable
from .data_catalog import DataCatalog
from .exceptions import DeltaProtocolError
from .fs import DeltaStorageHandler
from .schema import Schema


class DeltaTableProtocolError(PyDeltaTableError):
pass


MAX_SUPPORTED_READER_VERSION = 1
MAX_SUPPORTED_WRITER_VERSION = 2

Expand Down Expand Up @@ -487,7 +483,7 @@ def to_pyarrow_dataset(
:return: the PyArrow dataset in PyArrow
"""
if self.protocol().min_reader_version > MAX_SUPPORTED_READER_VERSION:
raise DeltaTableProtocolError(
raise DeltaProtocolError(
f"The table's minimum reader version is {self.protocol().min_reader_version}"
f"but deltalake only supports up to version {MAX_SUPPORTED_READER_VERSION}."
)
Expand Down
21 changes: 7 additions & 14 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,10 @@
from deltalake.schema import delta_arrow_schema_from_pandas

from ._internal import DeltaDataChecker as _DeltaDataChecker
from ._internal import PyDeltaTableError, batch_distinct
from ._internal import batch_distinct
from ._internal import write_new_deltalake as _write_new_deltalake
from .table import MAX_SUPPORTED_WRITER_VERSION, DeltaTable, DeltaTableProtocolError
from .exceptions import DeltaProtocolError, TableNotFoundError
from .table import MAX_SUPPORTED_WRITER_VERSION, DeltaTable

try:
import pandas as pd # noqa: F811
Expand Down Expand Up @@ -95,7 +96,7 @@ def write_deltalake(
This function only supports writer protocol version 2 currently. When
attempting to write to an existing table with a higher min_writer_version,
this function will throw DeltaTableProtocolError.
this function will throw DeltaProtocolError.
Note that this function does NOT register this table in a data catalog.
Expand Down Expand Up @@ -195,7 +196,7 @@ def write_deltalake(
partition_by = table.metadata().partition_columns

if table.protocol().min_writer_version > MAX_SUPPORTED_WRITER_VERSION:
raise DeltaTableProtocolError(
raise DeltaProtocolError(
"This table's min_writer_version is "
f"{table.protocol().min_writer_version}, "
"but this method only supports version 2."
Expand Down Expand Up @@ -403,16 +404,8 @@ def try_get_deltatable(
) -> Optional[DeltaTable]:
try:
return DeltaTable(table_uri, storage_options=storage_options)
except PyDeltaTableError as err:
# TODO: There has got to be a better way...
if "Not a Delta table" in str(err):
return None
elif "cannot find" in str(err):
return None
elif "No such file or directory" in str(err):
return None
else:
raise
except TableNotFoundError:
return None


def get_partitions_from_path(path: str) -> Tuple[str, Dict[str, Optional[str]]]:
Expand Down
96 changes: 96 additions & 0 deletions python/src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
use arrow_schema::ArrowError;
use deltalake::checkpoints::CheckpointError;
use deltalake::{DeltaTableError, ObjectStoreError};
use pyo3::exceptions::{
PyException, PyFileNotFoundError, PyIOError, PyNotImplementedError, PyValueError,
};
use pyo3::{create_exception, PyErr};

create_exception!(_internal, DeltaError, PyException);
create_exception!(_internal, TableNotFoundError, DeltaError);
create_exception!(_internal, DeltaProtocolError, DeltaError);
create_exception!(_internal, CommitFailedError, DeltaError);

fn inner_to_py_err(err: DeltaTableError) -> PyErr {
match err {
DeltaTableError::NotATable(msg) => TableNotFoundError::new_err(msg),
DeltaTableError::InvalidTableLocation(msg) => TableNotFoundError::new_err(msg),

// protocol errors
DeltaTableError::InvalidJsonLog { .. } => DeltaProtocolError::new_err(err.to_string()),
DeltaTableError::InvalidStatsJson { .. } => DeltaProtocolError::new_err(err.to_string()),
DeltaTableError::InvalidData { violations } => {
DeltaProtocolError::new_err(format!("Inaviant violations: {:?}", violations))
}

// commit errors
DeltaTableError::Transaction { source } => CommitFailedError::new_err(source.to_string()),

// python exceptions
DeltaTableError::ObjectStore { source } => object_store_to_py(source),
DeltaTableError::Io { source } => PyIOError::new_err(source.to_string()),

DeltaTableError::Arrow { source } => arrow_to_py(source),

// catch all
_ => DeltaError::new_err(err.to_string()),
}
}

fn object_store_to_py(err: ObjectStoreError) -> PyErr {
match err {
ObjectStoreError::NotFound { .. } => PyFileNotFoundError::new_err(err.to_string()),
ObjectStoreError::Generic { source, .. }
if source.to_string().contains("AWS_S3_ALLOW_UNSAFE_RENAME") =>
{
DeltaProtocolError::new_err(source.to_string())
}
_ => PyIOError::new_err(err.to_string()),
}
}

fn arrow_to_py(err: ArrowError) -> PyErr {
match err {
ArrowError::IoError(msg) => PyIOError::new_err(msg),
ArrowError::DivideByZero => PyValueError::new_err("division by zero"),
ArrowError::InvalidArgumentError(msg) => PyValueError::new_err(msg),
ArrowError::NotYetImplemented(msg) => PyNotImplementedError::new_err(msg),
other => PyException::new_err(other.to_string()),
}
}

fn checkpoint_to_py(err: CheckpointError) -> PyErr {
match err {
CheckpointError::Io { source } => PyIOError::new_err(source.to_string()),
CheckpointError::Arrow { source } => arrow_to_py(source),
CheckpointError::DeltaTable { source } => inner_to_py_err(source),
CheckpointError::ObjectStore { source } => object_store_to_py(source),
CheckpointError::MissingMetaData => DeltaProtocolError::new_err("Table metadata missing"),
CheckpointError::PartitionValueNotParseable(err) => PyValueError::new_err(err),
CheckpointError::JSONSerialization { source } => PyValueError::new_err(source.to_string()),
CheckpointError::Parquet { source } => PyIOError::new_err(source.to_string()),
}
}

#[derive(thiserror::Error, Debug)]
pub enum PythonError {
#[error("Error in delta table")]
DeltaTable(#[from] DeltaTableError),
#[error("Error in object store")]
ObjectStore(#[from] ObjectStoreError),
#[error("Error in arrow")]
Arrow(#[from] ArrowError),
#[error("Error in checkpoint")]
Checkpoint(#[from] CheckpointError),
}

impl From<PythonError> for pyo3::PyErr {
fn from(value: PythonError) -> Self {
match value {
PythonError::DeltaTable(err) => inner_to_py_err(err),
PythonError::ObjectStore(err) => object_store_to_py(err),
PythonError::Arrow(err) => arrow_to_py(err),
PythonError::Checkpoint(err) => checkpoint_to_py(err),
}
}
}
40 changes: 20 additions & 20 deletions python/src/filesystem.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;

use crate::utils::{delete_dir, rt, walk_tree};
use crate::PyDeltaTableError;

use deltalake::storage::{DynObjectStore, ListResult, MultipartId, ObjectStoreError, Path};
use deltalake::DeltaTableBuilder;
use pyo3::exceptions::{PyIOError, PyNotImplementedError, PyValueError};
Expand All @@ -13,6 +10,9 @@ use serde::{Deserialize, Serialize};
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::runtime::Runtime;

use crate::error::PythonError;
use crate::utils::{delete_dir, rt, walk_tree};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub(crate) struct FsConfig {
pub(crate) root_url: String,
Expand All @@ -35,7 +35,7 @@ impl DeltaFileSystemHandler {
let storage = DeltaTableBuilder::from_uri(table_uri)
.with_storage_options(options.clone().unwrap_or_default())
.build_storage()
.map_err(PyDeltaTableError::from_raw)?;
.map_err(PythonError::from)?;
Ok(Self {
inner: storage,
rt: Arc::new(rt()?),
Expand All @@ -61,7 +61,7 @@ impl DeltaFileSystemHandler {
let to_path = Path::from(dest);
self.rt
.block_on(self.inner.copy(&from_path, &to_path))
.map_err(PyDeltaTableError::from_object_store)?;
.map_err(PythonError::from)?;
Ok(())
}

Expand All @@ -74,15 +74,15 @@ impl DeltaFileSystemHandler {
let path = Path::from(path);
self.rt
.block_on(delete_dir(self.inner.as_ref(), &path))
.map_err(PyDeltaTableError::from_object_store)?;
.map_err(PythonError::from)?;
Ok(())
}

fn delete_file(&self, path: String) -> PyResult<()> {
let path = Path::from(path);
self.rt
.block_on(self.inner.delete(&path))
.map_err(PyDeltaTableError::from_object_store)?;
.map_err(PythonError::from)?;
Ok(())
}

Expand All @@ -104,7 +104,7 @@ impl DeltaFileSystemHandler {
let listed = self
.rt
.block_on(self.inner.list_with_delimiter(Some(&path)))
.map_err(PyDeltaTableError::from_object_store)?;
.map_err(PythonError::from)?;

// TODO is there a better way to figure out if we are in a directory?
if listed.objects.is_empty() && listed.common_prefixes.is_empty() {
Expand All @@ -129,7 +129,7 @@ impl DeltaFileSystemHandler {
)?);
}
Err(err) => {
return Err(PyDeltaTableError::from_object_store(err));
return Err(PythonError::from(err).into());
}
}
} else {
Expand Down Expand Up @@ -177,7 +177,7 @@ impl DeltaFileSystemHandler {
}
Err(err) => Err(err),
}
.map_err(PyDeltaTableError::from_object_store)?;
.map_err(PythonError::from)?;

let mut infos = vec![];
infos.extend(
Expand Down Expand Up @@ -220,7 +220,7 @@ impl DeltaFileSystemHandler {
// TODO check the if not exists semantics
self.rt
.block_on(self.inner.rename(&from_path, &to_path))
.map_err(PyDeltaTableError::from_object_store)?;
.map_err(PythonError::from)?;
Ok(())
}

Expand All @@ -233,7 +233,7 @@ impl DeltaFileSystemHandler {
self.inner.clone(),
path,
))
.map_err(PyDeltaTableError::from_object_store)?;
.map_err(PythonError::from)?;
Ok(file)
}

Expand All @@ -251,7 +251,7 @@ impl DeltaFileSystemHandler {
self.inner.clone(),
path,
))
.map_err(PyDeltaTableError::from_object_store)?;
.map_err(PythonError::from)?;
Ok(file)
}

Expand Down Expand Up @@ -405,7 +405,7 @@ impl ObjectInputFile {
let data = if nbytes > 0 {
self.rt
.block_on(self.store.get_range(&self.path, range))
.map_err(PyDeltaTableError::from_object_store)?
.map_err(PythonError::from)?
} else {
"".into()
};
Expand Down Expand Up @@ -484,8 +484,8 @@ impl ObjectOutputStream {
Err(err) => {
self.rt
.block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
.map_err(PyDeltaTableError::from_object_store)?;
Err(PyDeltaTableError::from_io(err))
.map_err(PythonError::from)?;
Err(PyIOError::new_err(err.to_string()))
}
}
}
Expand Down Expand Up @@ -538,8 +538,8 @@ impl ObjectOutputStream {
Err(err) => {
self.rt
.block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
.map_err(PyDeltaTableError::from_object_store)?;
Err(PyDeltaTableError::from_io(err))
.map_err(PythonError::from)?;
Err(PyIOError::new_err(err.to_string()))
}
}
}
Expand All @@ -550,8 +550,8 @@ impl ObjectOutputStream {
Err(err) => {
self.rt
.block_on(self.store.abort_multipart(&self.path, &self.multipart_id))
.map_err(PyDeltaTableError::from_object_store)?;
Err(PyDeltaTableError::from_io(err))
.map_err(PythonError::from)?;
Err(PyIOError::new_err(err.to_string()))
}
}
}
Expand Down
Loading

0 comments on commit fc61ea7

Please sign in to comment.