diff --git a/crates/core/src/delta_datafusion/mod.rs b/crates/core/src/delta_datafusion/mod.rs index dc742b2f2d..8d64f85fb2 100644 --- a/crates/core/src/delta_datafusion/mod.rs +++ b/crates/core/src/delta_datafusion/mod.rs @@ -76,9 +76,7 @@ use url::Url; use crate::delta_datafusion::expr::parse_predicate_expression; use crate::delta_datafusion::schema_adapter::DeltaSchemaAdapterFactory; use crate::errors::{DeltaResult, DeltaTableError}; -use crate::kernel::{ - Add, DataCheck, EagerSnapshot, Invariant, LogicalFile, Snapshot, StructTypeExt, -}; +use crate::kernel::{Add, DataCheck, EagerSnapshot, Invariant, Snapshot, StructTypeExt}; use crate::logstore::LogStoreRef; use crate::table::builder::ensure_table_uri; use crate::table::state::DeltaTableState; diff --git a/crates/core/src/operations/merge/mod.rs b/crates/core/src/operations/merge/mod.rs index b081b387ca..7f87d30d35 100644 --- a/crates/core/src/operations/merge/mod.rs +++ b/crates/core/src/operations/merge/mod.rs @@ -3099,7 +3099,6 @@ mod tests { use crate::kernel::Protocol; use crate::operations::merge::Action; - let _ = pretty_env_logger::try_init(); let schema = get_delta_schema(); let actions = vec![Action::Protocol(Protocol::new(1, 4))]; @@ -3194,7 +3193,6 @@ mod tests { use crate::kernel::Protocol; use crate::operations::merge::Action; - let _ = pretty_env_logger::try_init(); let schema = get_delta_schema(); let actions = vec![Action::Protocol(Protocol::new(1, 4))]; diff --git a/crates/core/src/operations/optimize.rs b/crates/core/src/operations/optimize.rs index e00fd6451e..cf096d56d1 100644 --- a/crates/core/src/operations/optimize.rs +++ b/crates/core/src/operations/optimize.rs @@ -39,7 +39,8 @@ use parquet::basic::{Compression, ZstdLevel}; use parquet::errors::ParquetError; use parquet::file::properties::WriterProperties; use serde::{de::Error as DeError, Deserialize, Deserializer, Serialize, Serializer}; -use tracing::debug; +use tracing::*; +use url::Url; use super::transaction::PROTOCOL; use super::writer::{PartitionWriter, PartitionWriterConfig}; @@ -137,6 +138,7 @@ impl fmt::Display for MetricDetails { } } +#[derive(Debug)] /// Metrics for a single partition pub struct PartialMetrics { /// Number of optimized files added @@ -345,6 +347,7 @@ impl From for DeltaOperation { } } +/// Generate an appropriate remove action for the optimization task fn create_remove( path: &str, partitions: &IndexMap, @@ -606,12 +609,26 @@ impl MergePlan { use datafusion_expr::expr::ScalarFunction; use datafusion_expr::{Expr, ScalarUDF}; - let locations = files + // This code is ... not ideal. Essentially `read_parquet` expects Strings that it will then + // parse as URLs and then pass back to the object store (x_x). This can cause problems when + // paths in object storage have special characters like spaces, etc. + // + // This [str::replace] i kind of a hack to address + // + let locations: Vec = files .iter() - .map(|file| format!("delta-rs:///{}", file.location)) - .collect_vec(); + .map(|om| { + format!( + "delta-rs:///{}", + str::replace(om.location.as_ref(), "%", "%25") + ) + }) + .collect(); + debug!("Reading z-order with locations are: {locations:?}"); + let df = context .ctx + // TODO: should read options have the partition columns .read_parquet(locations, ParquetReadOptions::default()) .await?; @@ -712,6 +729,7 @@ impl MergePlan { bins.len() <= num_cpus::get(), )); + debug!("Starting zorder with the columns: {zorder_columns:?} {bins:?}"); #[cfg(feature = "datafusion")] let exec_context = Arc::new(zorder::ZOrderExecContext::new( zorder_columns, @@ -719,6 +737,7 @@ impl MergePlan { max_spill_size, )?); let task_parameters = self.task_parameters.clone(); + let log_store = log_store.clone(); futures::stream::iter(bins) .map(move |(_, (partition, files))| { @@ -891,9 +910,7 @@ impl MergeBin { self.size_bytes += meta.size as i64; self.files.push(meta); } -} -impl MergeBin { fn iter(&self) -> impl Iterator { self.files.iter() } @@ -1036,6 +1053,7 @@ fn build_zorder_plan( .or_insert_with(|| (partition_values, MergeBin::new())) .1 .add(object_meta); + error!("partition_files inside the zorder plan: {partition_files:?}"); } let operation = OptimizeOperations::ZOrder(zorder_columns, partition_files); @@ -1229,7 +1247,6 @@ pub(super) mod zorder { let runtime = Arc::new(RuntimeEnv::new(config)?); runtime.register_object_store(&Url::parse("delta-rs://").unwrap(), object_store); - use url::Url; let ctx = SessionContext::new_with_config_rt(SessionConfig::default(), runtime); ctx.register_udf(ScalarUDF::from(datafusion::ZOrderUDF)); Ok(Self { columns, ctx }) @@ -1269,6 +1286,7 @@ pub(super) mod zorder { fn zorder_key_datafusion( columns: &[ColumnarValue], ) -> Result { + debug!("zorder_key_datafusion: {columns:#?}"); let length = columns .iter() .map(|col| match col { @@ -1423,6 +1441,94 @@ pub(super) mod zorder { .await; assert!(res.is_ok()); } + + /// Issue + #[tokio::test] + async fn test_zorder_space_in_partition_value() { + use arrow_schema::Schema as ArrowSchema; + let _ = pretty_env_logger::try_init(); + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("modified", DataType::Utf8, true), + Field::new("country", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-01", + "2021-02-01", + "2021-02-02", + "2021-02-02", + ])), + Arc::new(arrow::array::StringArray::from(vec![ + "Germany", + "China", + "Canada", + "Dominican Republic", + ])), + Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])), + //Arc::new(arrow::array::StringArray::from(vec!["Dominican Republic"])), + //Arc::new(arrow::array::Int32Array::from(vec![100])), + ], + ) + .unwrap(); + // write some data + let table = crate::DeltaOps::new_in_memory() + .write(vec![batch.clone()]) + .with_partition_columns(vec!["country"]) + .with_save_mode(crate::protocol::SaveMode::Overwrite) + .await + .unwrap(); + + let res = crate::DeltaOps(table) + .optimize() + .with_type(OptimizeType::ZOrder(vec!["modified".into()])) + .await; + assert!(res.is_ok(), "Failed to optimize: {res:#?}"); + } + + #[tokio::test] + async fn test_zorder_space_in_partition_value_garbage() { + use arrow_schema::Schema as ArrowSchema; + let _ = pretty_env_logger::try_init(); + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("modified", DataType::Utf8, true), + Field::new("country", DataType::Utf8, true), + Field::new("value", DataType::Int32, true), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(arrow::array::StringArray::from(vec![ + "2021-02-01", + "2021-02-01", + "2021-02-02", + "2021-02-02", + ])), + Arc::new(arrow::array::StringArray::from(vec![ + "Germany", "China", "Canada", "USA$$!", + ])), + Arc::new(arrow::array::Int32Array::from(vec![1, 10, 20, 100])), + ], + ) + .unwrap(); + // write some data + let table = crate::DeltaOps::new_in_memory() + .write(vec![batch.clone()]) + .with_partition_columns(vec!["country"]) + .with_save_mode(crate::protocol::SaveMode::Overwrite) + .await + .unwrap(); + + let res = crate::DeltaOps(table) + .optimize() + .with_type(OptimizeType::ZOrder(vec!["modified".into()])) + .await; + assert!(res.is_ok(), "Failed to optimize: {res:#?}"); + } } } diff --git a/python/tests/test_optimize.py b/python/tests/test_optimize.py index 2c9685e116..3bce0ad35f 100644 --- a/python/tests/test_optimize.py +++ b/python/tests/test_optimize.py @@ -4,6 +4,13 @@ import pyarrow as pa import pytest +try: + import pandas as pd +except ModuleNotFoundError: + _has_pandas = False +else: + _has_pandas = True + from deltalake import DeltaTable, write_deltalake from deltalake.table import CommitProperties @@ -132,3 +139,31 @@ def test_optimize_schema_evolved_table( assert dt.to_pyarrow_table().sort_by([("foo", "ascending")]) == data.sort_by( [("foo", "ascending")] ) + + +@pytest.mark.pandas +def test_zorder_with_space_partition(tmp_path: pathlib.Path): + df = pd.DataFrame( + { + "user": ["James", "Anna", "Sara", "Martin"], + "country": ["United States", "Canada", "Costa Rica", "South Africa"], + "age": [34, 23, 45, 26], + } + ) + + write_deltalake( + table_or_uri=tmp_path, + data=df, + mode="overwrite", + partition_by=["country"], + ) + + test_table = DeltaTable(tmp_path) + + # retrieve by partition works fine + partitioned_df = test_table.to_pandas( + partitions=[("country", "=", "United States")], + ) + print(partitioned_df) + + test_table.optimize.z_order(columns=["user"])