diff --git a/.gitignore b/.gitignore index 5fe8f6cf0a..8642b9722a 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ Cargo.lock !/delta-inspect/Cargo.lock !/proofs/Cargo.lock +justfile \ No newline at end of file diff --git a/python/deltalake/_util.py b/python/deltalake/_util.py new file mode 100644 index 0000000000..32b8a20a1b --- /dev/null +++ b/python/deltalake/_util.py @@ -0,0 +1,20 @@ +from datetime import date, datetime +from typing import Any + + +def encode_partition_value(val: Any) -> str: + # Rules based on: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization + if isinstance(val, bool): + return str(val).lower() + if isinstance(val, str): + return val + elif isinstance(val, (int, float)): + return str(val) + elif isinstance(val, date): + return val.isoformat() + elif isinstance(val, datetime): + return val.isoformat(sep=" ") + elif isinstance(val, bytes): + return val.decode("unicode_escape", "backslashreplace") + else: + raise ValueError(f"Could not encode partition value for type: {val}") diff --git a/python/deltalake/table.py b/python/deltalake/table.py index af1cf090da..7f24145dca 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -31,6 +31,7 @@ import pandas from ._internal import RawDeltaTable +from ._util import encode_partition_value from .data_catalog import DataCatalog from .exceptions import DeltaProtocolError from .fs import DeltaStorageHandler @@ -625,9 +626,9 @@ def __stringify_partition_values( for field, op, value in partition_filters: str_value: Union[str, List[str]] if isinstance(value, (list, tuple)): - str_value = [str(val) for val in value] + str_value = [encode_partition_value(val) for val in value] else: - str_value = str(value) + str_value = encode_partition_value(value) out.append((field, op, str_value)) return out diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index f2754a760d..1cce4d3f68 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -17,9 +17,12 @@ Tuple, Union, ) +from urllib.parse import unquote from deltalake.fs import DeltaStorageHandler +from ._util import encode_partition_value + if TYPE_CHECKING: import pandas as pd @@ -262,7 +265,7 @@ def check_data_is_aligned_with_partition_filtering( for i in range(partition_values.num_rows): # Map will maintain order of partition_columns partition_map = { - column_name: __encode_partition_value( + column_name: encode_partition_value( batch.column(column_name)[i].as_py() ) for column_name in table.metadata().partition_columns @@ -422,7 +425,7 @@ def get_partitions_from_path(path: str) -> Tuple[str, Dict[str, Optional[str]]]: if value == "__HIVE_DEFAULT_PARTITION__": out[key] = None else: - out[key] = value + out[key] = unquote(value) return path, out @@ -489,21 +492,3 @@ def iter_groups(metadata: Any) -> Iterator[Any]: maximum for maximum in maximums if maximum is not None ) return stats - - -def __encode_partition_value(val: Any) -> str: - # Rules based on: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization - if isinstance(val, bool): - return str(val).lower() - if isinstance(val, str): - return val - elif isinstance(val, (int, float)): - return str(val) - elif isinstance(val, date): - return val.isoformat() - elif isinstance(val, datetime): - return val.isoformat(sep=" ") - elif isinstance(val, bytes): - return val.decode("unicode_escape", "backslashreplace") - else: - raise ValueError(f"Could not encode partition value for type: {val}") diff --git a/python/tests/pyspark_integration/test_writer_readable.py b/python/tests/pyspark_integration/test_writer_readable.py index f637255951..ea555074b8 100644 --- a/python/tests/pyspark_integration/test_writer_readable.py +++ b/python/tests/pyspark_integration/test_writer_readable.py @@ -12,6 +12,7 @@ import delta import delta.pip_utils import delta.tables + import pyspark.pandas as ps spark = get_spark() except ModuleNotFoundError: @@ -34,7 +35,7 @@ def test_basic_read(sample_data: pa.Table, existing_table: DeltaTable): @pytest.mark.pyspark @pytest.mark.integration def test_partitioned(tmp_path: pathlib.Path, sample_data: pa.Table): - partition_cols = ["date32", "utf8"] + partition_cols = ["date32", "utf8", "timestamp", "bool"] # Add null values to sample data to verify we can read null partitions sample_data_with_null = sample_data @@ -63,3 +64,34 @@ def test_overwrite( write_deltalake(path, sample_data, mode="overwrite") assert_spark_read_equal(sample_data, path) + + +@pytest.mark.pyspark +@pytest.mark.integration +def test_issue_1591_roundtrip_special_characters(tmp_path: pathlib.Path): + test_string = r'$%&/()=^"[]#*?.:_-{=}|`<>~/\r\n+' + poisoned = "}|`<>~" + for char in poisoned: + test_string = test_string.replace(char, "") + + data = pa.table( + { + "string": pa.array([test_string], type=pa.utf8()), + "data": pa.array(["python-module-test-write"]), + } + ) + + deltalake_path = tmp_path / "deltalake" + write_deltalake( + table_or_uri=deltalake_path, mode="append", data=data, partition_by=["string"] + ) + + loaded = ps.read_delta(str(deltalake_path), index_col=None).to_pandas() + assert loaded.shape == data.shape + + spark_path = tmp_path / "spark" + spark_df = spark.createDataFrame(data.to_pandas()) + spark_df.write.format("delta").partitionBy(["string"]).save(str(spark_path)) + + loaded = DeltaTable(spark_path).to_pandas() + assert loaded.shape == data.shape diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index db00d96f26..f56c5876cd 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -648,3 +648,49 @@ def assert_scan_equals(table, predicate, expected): assert_num_fragments(table, predicate, 2) expected = pa.table({"part": ["a", "a", "b", "b"], "value": [1, 1, None, None]}) assert_scan_equals(table, predicate, expected) + + +def test_issue_1653_filter_bool_partition(tmp_path: Path): + ta = pa.Table.from_pydict( + { + "bool_col": [True, False, True, False], + "int_col": [0, 1, 2, 3], + "str_col": ["a", "b", "c", "d"], + } + ) + write_deltalake( + tmp_path, ta, partition_by=["bool_col", "int_col"], mode="overwrite" + ) + dt = DeltaTable(tmp_path) + + assert ( + dt.to_pyarrow_table( + filters=[ + ("int_col", "=", 0), + ("bool_col", "=", True), + ] + ).num_rows + == 1 + ) + assert ( + len( + dt.file_uris( + partition_filters=[ + ("int_col", "=", 0), + ("bool_col", "=", "true"), + ] + ) + ) + == 1 + ) + assert ( + len( + dt.file_uris( + partition_filters=[ + ("int_col", "=", 0), + ("bool_col", "=", True), + ] + ) + ) + == 1 + ) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index e45d56539c..e72d0ac8cd 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -168,6 +168,7 @@ def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): "bool", "binary", "date32", + "timestamp", ], ) def test_roundtrip_partitioned( @@ -888,3 +889,19 @@ def comp(): "a concurrent transaction deleted the same data your transaction deletes" in str(exception) ) + + +def test_issue_1651_roundtrip_timestamp(tmp_path: pathlib.Path): + data = pa.table( + { + "id": pa.array([425], type=pa.int32()), + "data": pa.array(["python-module-test-write"]), + "t": pa.array([datetime(2023, 9, 15)]), + } + ) + + write_deltalake(table_or_uri=tmp_path, mode="append", data=data, partition_by=["t"]) + dt = DeltaTable(table_uri=tmp_path) + dataset = dt.to_pyarrow_dataset() + + assert dataset.count_rows() == 1 diff --git a/rust/Cargo.toml b/rust/Cargo.toml index c801e11526..caf3e7b295 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -185,6 +185,10 @@ harness = false name = "basic_operations" required-features = ["datafusion"] +[[example]] +name = "load_table" +required-features = ["datafusion"] + [[example]] name = "recordbatch-writer" required-features = ["arrow"] diff --git a/rust/examples/basic_operations.rs b/rust/examples/basic_operations.rs index 5c1fb46e86..d95aadfb78 100644 --- a/rust/examples/basic_operations.rs +++ b/rust/examples/basic_operations.rs @@ -1,6 +1,6 @@ use arrow::{ - array::{Int32Array, StringArray}, - datatypes::{DataType, Field, Schema as ArrowSchema}, + array::{Int32Array, StringArray, TimestampMicrosecondArray}, + datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}, record_batch::RecordBatch, }; use deltalake::operations::collect_sendable_stream; @@ -26,6 +26,12 @@ fn get_table_columns() -> Vec { true, Default::default(), ), + SchemaField::new( + String::from("timestamp"), + SchemaDataType::primitive(String::from("timestamp")), + true, + Default::default(), + ), ] } @@ -33,20 +39,38 @@ fn get_table_batches() -> RecordBatch { let schema = Arc::new(ArrowSchema::new(vec![ Field::new("int", DataType::Int32, false), Field::new("string", DataType::Utf8, true), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), ])); let int_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); let str_values = StringArray::from(vec!["A", "B", "A", "B", "A", "A", "A", "B", "B", "A", "A"]); - - RecordBatch::try_new(schema, vec![Arc::new(int_values), Arc::new(str_values)]).unwrap() + let ts_values = TimestampMicrosecondArray::from(vec![ + 1000000012, 1000000012, 1000000012, 1000000012, 500012305, 500012305, 500012305, 500012305, + 500012305, 500012305, 500012305, + ]); + RecordBatch::try_new( + schema, + vec![ + Arc::new(int_values), + Arc::new(str_values), + Arc::new(ts_values), + ], + ) + .unwrap() } #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), deltalake::errors::DeltaTableError> { - // Create a delta operations client pointing at an un-initialized in-memory location. - // In a production environment this would be created with "try_new" and point at - // a real storage location. - let ops = DeltaOps::new_in_memory(); + // Create a delta operations client pointing at an un-initialized location. + let ops = if let Ok(table_uri) = std::env::var("TABLE_URI") { + DeltaOps::try_from_uri(table_uri).await? + } else { + DeltaOps::new_in_memory() + }; // The operations module uses a builder pattern that allows specifying several options // on how the command behaves. The builders implement `Into`, so once @@ -54,6 +78,7 @@ async fn main() -> Result<(), deltalake::errors::DeltaTableError> { let table = ops .create() .with_columns(get_table_columns()) + .with_partition_columns(["timestamp"]) .with_table_name("my_table") .with_comment("A table to show how delta-rs works") .await?; diff --git a/rust/examples/load_table.rs b/rust/examples/load_table.rs new file mode 100644 index 0000000000..18a960eeb0 --- /dev/null +++ b/rust/examples/load_table.rs @@ -0,0 +1,20 @@ +use arrow::record_batch::RecordBatch; +use deltalake::operations::collect_sendable_stream; +use deltalake::DeltaOps; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), deltalake::errors::DeltaTableError> { + // Create a delta operations client pointing at an un-initialized location. + let ops = if let Ok(table_uri) = std::env::var("TABLE_URI") { + DeltaOps::try_from_uri(table_uri).await? + } else { + DeltaOps::try_from_uri("./rust/tests/data/delta-0.8.0").await? + }; + + let (_table, stream) = ops.load().await?; + let data: Vec = collect_sendable_stream(stream).await?; + + println!("{:?}", data); + + Ok(()) +} diff --git a/rust/examples/recordbatch-writer.rs b/rust/examples/recordbatch-writer.rs index 8bafa3b2ac..42e81ba1ac 100644 --- a/rust/examples/recordbatch-writer.rs +++ b/rust/examples/recordbatch-writer.rs @@ -36,7 +36,7 @@ async fn main() -> Result<(), DeltaTableError> { })?; info!("Using the location of: {:?}", table_uri); - let table_path = Path::from(table_uri.as_ref()); + let table_path = Path::parse(&table_uri)?; let maybe_table = deltalake::open_table(&table_path).await; let mut table = match maybe_table { diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index d411a502cd..105ebdcdab 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -8,6 +8,7 @@ pub mod checkpoints; pub mod parquet2_read; #[cfg(feature = "parquet")] mod parquet_read; +mod serde_path; #[cfg(feature = "arrow")] use arrow_schema::ArrowError; @@ -15,7 +16,6 @@ use futures::StreamExt; use lazy_static::lazy_static; use log::*; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; -use percent_encoding::percent_decode; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; @@ -105,13 +105,6 @@ pub enum ProtocolError { }, } -fn decode_path(raw_path: &str) -> Result { - percent_decode(raw_path.as_bytes()) - .decode_utf8() - .map(|c| c.to_string()) - .map_err(|e| ProtocolError::InvalidField(format!("Decode path failed for action: {e}"))) -} - /// Struct used to represent minValues and maxValues in add action statistics. #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(untagged)] @@ -255,6 +248,7 @@ pub struct StatsParsed { pub struct AddCDCFile { /// A relative path, from the root of the table, or an /// absolute path to a CDC file + #[serde(with = "serde_path")] pub path: String, /// The size of this file in bytes pub size: i64, @@ -351,6 +345,7 @@ impl Eq for DeletionVector {} #[serde(rename_all = "camelCase")] pub struct Add { /// A relative path, from the root of the table, to a file that should be added to the table + #[serde(with = "serde_path")] pub path: String, /// The size of this file in bytes pub size: i64, @@ -403,9 +398,11 @@ pub struct Add { #[serde(skip_serializing, skip_deserializing)] pub stats_parsed: Option, /// Map containing metadata about this file + #[serde(skip_serializing_if = "Option::is_none")] pub tags: Option>>, - ///Metadata about deletion vector + /// Metadata about deletion vector + #[serde(skip_serializing_if = "Option::is_none")] pub deletion_vector: Option, } @@ -431,11 +428,6 @@ impl PartialEq for Add { impl Eq for Add {} impl Add { - /// Returns the Add action with path decoded. - pub fn path_decoded(self) -> Result { - decode_path(&self.path).map(|path| Self { path, ..self }) - } - /// Get whatever stats are available. Uses (parquet struct) parsed_stats if present falling back to json stats. #[cfg(any(feature = "parquet", feature = "parquet2"))] pub fn get_stats(&self) -> Result, serde_json::error::Error> { @@ -569,6 +561,7 @@ impl TryFrom for MetaData { #[serde(rename_all = "camelCase")] pub struct Remove { /// The path of the file that is removed from the table. + #[serde(with = "serde_path")] pub path: String, /// The timestamp when the remove was added to table state. pub deletion_timestamp: Option, @@ -581,12 +574,16 @@ pub struct Remove { /// it's still nullable so we keep it as Option<> for compatibly. pub extended_file_metadata: Option, /// A map from partition column to value for this file. + #[serde(skip_serializing_if = "Option::is_none")] pub partition_values: Option>>, /// Size of this file in bytes + #[serde(skip_serializing_if = "Option::is_none")] pub size: Option, /// Map containing metadata about this file + #[serde(skip_serializing_if = "Option::is_none")] pub tags: Option>>, - ///Metadata about deletion vector + /// Metadata about deletion vector + #[serde(skip_serializing_if = "Option::is_none")] pub deletion_vector: Option, } @@ -617,13 +614,6 @@ impl PartialEq for Remove { } } -impl Remove { - /// Returns the Remove action with path decoded. - pub fn path_decoded(self) -> Result { - decode_path(&self.path).map(|path| Self { path, ..self }) - } -} - /// Action used by streaming systems to track progress using application-specific versions to /// enable idempotency. #[derive(Serialize, Deserialize, Debug, Default, Clone)] diff --git a/rust/src/action/serde_path.rs b/rust/src/action/serde_path.rs new file mode 100644 index 0000000000..9868523e81 --- /dev/null +++ b/rust/src/action/serde_path.rs @@ -0,0 +1,89 @@ +use std::str::Utf8Error; + +use percent_encoding::{percent_decode_str, percent_encode, AsciiSet, CONTROLS}; +use serde::{self, Deserialize, Deserializer, Serialize, Serializer}; + +pub fn deserialize<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + decode_path(&s).map_err(serde::de::Error::custom) +} + +pub fn serialize(value: &str, serializer: S) -> Result +where + S: Serializer, +{ + let encoded = encode_path(value); + String::serialize(&encoded, serializer) +} + +pub const _DELIMITER: &str = "/"; +/// The path delimiter as a single byte +pub const _DELIMITER_BYTE: u8 = _DELIMITER.as_bytes()[0]; + +/// Characters we want to encode. +const INVALID: &AsciiSet = &CONTROLS + // The delimiter we are reserving for internal hierarchy + // .add(DELIMITER_BYTE) + // Characters AWS recommends avoiding for object keys + // https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html + .add(b'\\') + .add(b'{') + .add(b'^') + .add(b'}') + .add(b'%') + .add(b'`') + .add(b']') + .add(b'"') + .add(b'>') + .add(b'[') + // .add(b'~') + .add(b'<') + .add(b'#') + .add(b'|') + // Characters Google Cloud Storage recommends avoiding for object names + // https://cloud.google.com/storage/docs/naming-objects + .add(b'\r') + .add(b'\n') + .add(b'*') + .add(b'?'); + +fn encode_path(path: &str) -> String { + percent_encode(path.as_bytes(), INVALID).to_string() +} + +fn decode_path(path: &str) -> Result { + Ok(percent_decode_str(path).decode_utf8()?.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_encode_path() { + let cases = [ + ( + "string=$%25&%2F()%3D%5E%22%5B%5D%23%2A%3F.%3A/part-00023-4b06bc90-0678-4a63-94a2-f09af1adb945.c000.snappy.parquet", + "string=$%2525&%252F()%253D%255E%2522%255B%255D%2523%252A%253F.%253A/part-00023-4b06bc90-0678-4a63-94a2-f09af1adb945.c000.snappy.parquet", + ), + ( + "string=$%25&%2F()%3D%5E%22<>~%5B%5D%7B}`%23|%2A%3F%2F%5Cr%5Cn.%3A/part-00023-e0a68495-8098-40a6-be5f-b502b111b789.c000.snappy.parquet", + "string=$%2525&%252F()%253D%255E%2522%3C%3E~%255B%255D%257B%7D%60%2523%7C%252A%253F%252F%255Cr%255Cn.%253A/part-00023-e0a68495-8098-40a6-be5f-b502b111b789.c000.snappy.parquet" + ), + ( + "string=$%25&%2F()%3D%5E%22<>~%5B%5D%7B}`%23|%2A%3F%2F%5Cr%5Cn.%3A_-/part-00023-346b6795-dafa-4948-bda5-ecdf4baa4445.c000.snappy.parquet", + "string=$%2525&%252F()%253D%255E%2522%3C%3E~%255B%255D%257B%7D%60%2523%7C%252A%253F%252F%255Cr%255Cn.%253A_-/part-00023-346b6795-dafa-4948-bda5-ecdf4baa4445.c000.snappy.parquet" + ) + ]; + + for (raw, expected) in cases { + let encoded = encode_path(raw); + assert_eq!(encoded, expected); + let decoded = decode_path(expected).unwrap(); + assert_eq!(decoded, raw); + } + } +} diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs index a5e51da05b..5800edd96f 100644 --- a/rust/src/operations/restore.rs +++ b/rust/src/operations/restore.rs @@ -252,7 +252,7 @@ async fn check_files_available( files: &Vec, ) -> DeltaResult<()> { for file in files { - let file_path = Path::from(file.path.clone()); + let file_path = Path::parse(file.path.clone())?; match object_store.head(&file_path).await { Ok(_) => {} Err(ObjectStoreError::NotFound { .. }) => { diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index 41dfeaae5a..ca96134935 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -552,7 +552,8 @@ fn cast_record_batch( #[cfg(test)] mod tests { use super::*; - use crate::operations::DeltaOps; + use crate::action::SaveMode; + use crate::operations::{collect_sendable_stream, DeltaOps}; use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::{ get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch, @@ -562,7 +563,7 @@ mod tests { use arrow::datatypes::Schema as ArrowSchema; use arrow_array::{Int32Array, StringArray, TimestampMicrosecondArray}; use arrow_schema::{DataType, TimeUnit}; - use datafusion::assert_batches_sorted_eq; + use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; use serde_json::{json, Value}; #[tokio::test] @@ -855,4 +856,47 @@ mod tests { &expected ); } + + #[tokio::test] + async fn test_special_characters_write_read() { + let tmp_dir = tempdir::TempDir::new("test").unwrap(); + let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("string", DataType::Utf8, true), + Field::new("data", DataType::Utf8, true), + ])); + + let str_values = StringArray::from(vec![r#"$%&/()=^"[]#*?.:_- {=}|`<>~/\r\n+"#]); + let data_values = StringArray::from(vec!["test"]); + + let batch = RecordBatch::try_new(schema, vec![Arc::new(str_values), Arc::new(data_values)]) + .unwrap(); + + let ops = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap()) + .await + .unwrap(); + + let _table = ops + .write([batch.clone()]) + .with_partition_columns(["string"]) + .await + .unwrap(); + + let table = crate::open_table(tmp_path.as_os_str().to_str().unwrap()) + .await + .unwrap(); + let (_table, stream) = DeltaOps(table).load().await.unwrap(); + let data: Vec = collect_sendable_stream(stream).await.unwrap(); + + let expected = vec![ + "+------+-----------------------------------+", + "| data | string |", + "+------+-----------------------------------+", + r#"| test | $%&/()=^"[]#*?.:_- {=}|`<>~/\r\n+ |"#, + "+------+-----------------------------------+", + ]; + + assert_batches_eq!(&expected, &data); + } } diff --git a/rust/src/operations/writer.rs b/rust/src/operations/writer.rs index 4fef892bf8..a72b832505 100644 --- a/rust/src/operations/writer.rs +++ b/rust/src/operations/writer.rs @@ -247,7 +247,7 @@ impl PartitionWriterConfig { .map_err(|err| WriteError::FileName { source: Box::new(err), })?; - let prefix = Path::from(part_path.as_ref()); + let prefix = Path::parse(part_path.as_ref())?; let writer_properties = writer_properties.unwrap_or_else(|| { WriterProperties::builder() .set_created_by(format!("delta-rs version {}", crate_version())) diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 9be2200d9e..2ac17032d4 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -327,12 +327,11 @@ impl DeltaTableState { action::Action::cdc(_v) => {} action::Action::add(v) => { if require_files { - self.files.push(v.path_decoded()?); + self.files.push(v); } } action::Action::remove(v) => { if require_tombstones && require_files { - let v = v.path_decoded()?; self.tombstones.insert(v); } } diff --git a/rust/src/writer/utils.rs b/rust/src/writer/utils.rs index a12809916f..cfc089c164 100644 --- a/rust/src/writer/utils.rs +++ b/rust/src/writer/utils.rs @@ -16,10 +16,12 @@ use arrow::datatypes::{ use arrow::json::ReaderBuilder; use arrow::record_batch::*; use object_store::path::Path; +use object_store::path::DELIMITER_BYTE; use parking_lot::RwLock; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; use parquet::schema::types::ColumnPath; +use percent_encoding::{percent_encode, AsciiSet, CONTROLS}; use serde_json::Value; use uuid::Uuid; @@ -45,13 +47,13 @@ impl PartitionPath { let partition_value = partition_values .get(k) .ok_or_else(|| DeltaWriterError::MissingPartitionColumn(k.to_string()))?; - - let partition_value = partition_value - .as_deref() - .unwrap_or(NULL_PARTITION_VALUE_DATA_PATH); - let part = format!("{k}={partition_value}"); - - path_parts.push(part); + let path_part = if let Some(val) = partition_value.as_deref() { + let encoded = percent_encode(val.as_bytes(), INVALID).to_string(); + format!("{k}={encoded}") + } else { + format!("{k}={NULL_PARTITION_VALUE_DATA_PATH}") + }; + path_parts.push(path_part); } Ok(PartitionPath { @@ -60,6 +62,30 @@ impl PartitionPath { } } +const INVALID: &AsciiSet = &CONTROLS + // everything object store needs encoded ... + .add(DELIMITER_BYTE) + .add(b'\\') + .add(b'{') + .add(b'^') + .add(b'}') + .add(b'%') + .add(b'`') + .add(b']') + .add(b'"') + .add(b'>') + .add(b'[') + .add(b'~') + .add(b'<') + .add(b'#') + .add(b'|') + .add(b'\r') + .add(b'\n') + .add(b'*') + .add(b'?') + //... and some more chars illegal on windows + .add(b':'); + impl From for String { fn from(path: PartitionPath) -> String { path.path