Skip to content

Commit

Permalink
fix: just make pyarrow 12 the max (#1603)
Browse files Browse the repository at this point in the history
# Description
This is a temporary fix until I have more time to finish #1602.

# Related Issue(s)
<!---
For example:

- closes #106
--->

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: R. Tyler Croy <rtyler@brokenco.de>
  • Loading branch information
wjones127 and rtyler authored Aug 28, 2023
1 parent 0f85033 commit 02b2b88
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 26 deletions.
15 changes: 8 additions & 7 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,13 @@ def write_deltalake(
if filesystem is not None:
raise NotImplementedError("Filesystem support is not yet implemented. #570")

__enforce_append_only(table=table, configuration=configuration, mode=mode)
if table is not None:
storage_options = table._storage_options or {}
storage_options.update(storage_options or {})

if filesystem is None:
if table is not None:
storage_options = table._storage_options or {}
storage_options.update(storage_options or {})
filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options))

filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri, storage_options))
__enforce_append_only(table=table, configuration=configuration, mode=mode)

if isinstance(partition_by, str):
partition_by = [partition_by]
Expand Down Expand Up @@ -219,8 +218,10 @@ def visitor(written_file: Any) -> None:
# PyArrow added support for written_file.size in 9.0.0
if PYARROW_MAJOR_VERSION >= 9:
size = written_file.size
else:
elif filesystem is not None:
size = filesystem.get_file_info([path])[0].size
else:
size = 0

add_actions.append(
AddAction(
Expand Down
2 changes: 1 addition & 1 deletion python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ classifiers = [
"Programming Language :: Python :: 3 :: Only"
]
dependencies = [
"pyarrow>=8",
"pyarrow>=8,<=12",
'typing-extensions;python_version<"3.8"',
]

Expand Down
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ impl RawDeltaTable {
.into_iter()
.map(|part| PyFrozenSet::new(py, part.iter()))
.collect::<Result<_, PyErr>>()?;
PyFrozenSet::new(py, active_partitions.into_iter())
PyFrozenSet::new(py, active_partitions)
}

fn create_write_transaction(
Expand Down
9 changes: 7 additions & 2 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import pathlib
import subprocess
import time
from datetime import date, datetime, timedelta
from decimal import Decimal
from time import sleep
Expand All @@ -11,11 +12,15 @@
from deltalake import DeltaTable, write_deltalake


def wait_till_host_is_available(host: str, timeout_sec: int = 30):
def wait_till_host_is_available(host: str, timeout_sec: int = 0.5):
spacing = 2
start = time.monotonic()
while True:
if time.monotonic() - start > timeout_sec:
raise TimeoutError(f"Host {host} is not available after {timeout_sec} sec")

try:
subprocess.run(["curl", host], timeout=500, check=True)
subprocess.run(["curl", host], timeout=timeout_sec * 1000, check=True)
except Exception:
pass
else:
Expand Down
2 changes: 1 addition & 1 deletion rust/src/action/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ pub async fn cleanup_expired_logs_for(
) -> Result<i32, ProtocolError> {
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r#"_delta_log/(\d{20})\.(json|checkpoint).*$"#).unwrap();
Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint).*$").unwrap();
}

let mut deleted_log_num = 0;
Expand Down
4 changes: 2 additions & 2 deletions rust/src/action/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -881,9 +881,9 @@ pub(crate) async fn find_latest_check_point_for_version(
) -> Result<Option<CheckPoint>, ProtocolError> {
lazy_static! {
static ref CHECKPOINT_REGEX: Regex =
Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.parquet$"#).unwrap();
Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.parquet$").unwrap();
static ref CHECKPOINT_PARTS_REGEX: Regex =
Regex::new(r#"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$"#).unwrap();
Regex::new(r"^_delta_log/(\d{20})\.checkpoint\.\d{10}\.(\d{10})\.parquet$").unwrap();
}

let mut cp: Option<CheckPoint> = None;
Expand Down
2 changes: 1 addition & 1 deletion rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ impl DeltaTable {
// TODO check if regex matches against path
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r#"^_delta_log/(\d{20})\.(json|checkpoint)*$"#).unwrap();
Regex::new(r"^_delta_log/(\d{20})\.(json|checkpoint)*$").unwrap();
}

let mut current_delta_log_ver = i64::MAX;
Expand Down
4 changes: 2 additions & 2 deletions rust/src/delta_arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ mod tests {
delta_log_schema_for_table(table_schema.clone(), partition_columns.as_slice(), false);

// verify top-level schema contains all expected fields and they are named correctly.
let expected_fields = vec!["metaData", "protocol", "txn", "remove", "add"];
let expected_fields = ["metaData", "protocol", "txn", "remove", "add"];
for f in log_schema.fields().iter() {
assert!(expected_fields.contains(&f.name().as_str()));
}
Expand Down Expand Up @@ -771,7 +771,7 @@ mod tests {
})
.collect();
assert_eq!(7, remove_fields.len());
let expected_fields = vec![
let expected_fields = [
"path",
"deletionTimestamp",
"dataChange",
Expand Down
9 changes: 4 additions & 5 deletions rust/src/delta_datafusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ impl DeltaTableState {
let stats = self
.files()
.iter()
.fold(
Some(Statistics {
.try_fold(
Statistics {
num_rows: Some(0),
total_byte_size: Some(0),
column_statistics: Some(vec![
Expand All @@ -120,9 +120,8 @@ impl DeltaTableState {
self.schema().unwrap().get_fields().len()
]),
is_exact: true,
}),
},
|acc, action| {
let acc = acc?;
let new_stats = action
.get_stats()
.unwrap_or_else(|_| Some(action::Stats::default()))?;
Expand Down Expand Up @@ -468,7 +467,7 @@ impl TableProvider for DeltaTable {
self.get_state()
.files()
.iter()
.zip(files_to_prune.into_iter())
.zip(files_to_prune)
.filter_map(
|(action, keep)| {
if keep {
Expand Down
4 changes: 2 additions & 2 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ impl DeltaTableState {
Ok(Either::Left(
self.files()
.iter()
.zip(pruning_predicate.prune(self)?.into_iter())
.zip(pruning_predicate.prune(self)?)
.filter_map(
|(action, keep_file)| {
if keep_file {
Expand Down Expand Up @@ -234,7 +234,7 @@ impl<'a> AddContainer<'a> {
Ok(self
.inner
.iter()
.zip(pruning_predicate.prune(self)?.into_iter())
.zip(pruning_predicate.prune(self)?)
.filter_map(
|(action, keep_file)| {
if keep_file {
Expand Down
25 changes: 23 additions & 2 deletions rust/src/storage/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,58 @@ const STORE_NAME: &str = "DeltaLocalObjectStore";
/// Error raised by storage lock client
#[derive(thiserror::Error, Debug)]
#[allow(dead_code)]
pub(self) enum LocalFileSystemError {
pub enum LocalFileSystemError {
/// Object exists already at path
#[error("Object exists already at path: {} ({:?})", path, source)]
AlreadyExists {
/// Path of the already existing file
path: String,
/// Originating error
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},

/// Object not found at the given path
#[error("Object not found at path: {} ({:?})", path, source)]
NotFound {
/// Provided path which does not exist
path: String,
/// Originating error
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},

/// Invalid argument sent to OS call
#[error("Invalid argument in OS call for path: {} ({:?})", path, source)]
InvalidArgument { path: String, source: errno::Errno },
InvalidArgument {
/// Provided path
path: String,
/// Originating error
source: errno::Errno,
},

/// Null error for path for FFI
#[error("Null error in FFI for path: {} ({:?})", path, source)]
NullError {
/// Given path
path: String,
/// Originating error
source: std::ffi::NulError,
},

/// Generic catch-all error for this store
#[error("Generic error in store: {} ({:?})", store, source)]
Generic {
/// String name of the object store
store: &'static str,
/// Originating error
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},

/// Errors from the Tokio runtime
#[error("Error executing async task for path: {} ({:?})", path, source)]
Tokio {
/// Path
path: String,
/// Originating error
source: tokio::task::JoinError,
},
}
Expand Down

0 comments on commit 02b2b88

Please sign in to comment.