From 6ef0d7c2bf665f24fac1c6f878a7ec768281b5de Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sat, 23 Sep 2023 21:39:20 +0200 Subject: [PATCH 01/11] fix: percent encoding of partition values and paths --- python/deltalake/writer.py | 25 ++++++++++++++++--- .../test_writer_readable.py | 2 +- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index f2754a760d..8decb41f8e 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -17,6 +17,7 @@ Tuple, Union, ) +from urllib.parse import unquote, quote from deltalake.fs import DeltaStorageHandler @@ -207,12 +208,15 @@ def write_deltalake( partition_schema = pa.schema([schema.field(name) for name in partition_by]) partitioning = ds.partitioning(partition_schema, flavor="hive") else: + partition_schema = pa.schema([]) partitioning = None add_actions: List[AddAction] = [] def visitor(written_file: Any) -> None: - path, partition_values = get_partitions_from_path(written_file.path) + path, partition_values = get_partitions_from_path( + written_file.path, partition_schema=partition_schema + ) stats = get_file_stats_from_metadata(written_file.metadata) # PyArrow added support for written_file.size in 9.0.0 @@ -225,7 +229,7 @@ def visitor(written_file: Any) -> None: add_actions.append( AddAction( - path, + quote(path), size, partition_values, int(datetime.now().timestamp() * 1000), @@ -409,7 +413,17 @@ def try_get_deltatable( return None -def get_partitions_from_path(path: str) -> Tuple[str, Dict[str, Optional[str]]]: +quoted_types = [ + pa.timestamp("s"), + pa.timestamp("ms"), + pa.timestamp("us"), + pa.timestamp("ns"), +] + + +def get_partitions_from_path( + path: str, partition_schema: pa.Schema +) -> Tuple[str, Dict[str, Optional[str]]]: if path[0] == "/": path = path[1:] parts = path.split("/") @@ -422,7 +436,10 @@ def get_partitions_from_path(path: str) -> Tuple[str, Dict[str, Optional[str]]]: if value == "__HIVE_DEFAULT_PARTITION__": out[key] = None else: - out[key] = value + if partition_schema.field(key).type in quoted_types: + out[key] = unquote(value) + else: + out[key] = value return path, out diff --git a/python/tests/pyspark_integration/test_writer_readable.py b/python/tests/pyspark_integration/test_writer_readable.py index f637255951..e9d5603191 100644 --- a/python/tests/pyspark_integration/test_writer_readable.py +++ b/python/tests/pyspark_integration/test_writer_readable.py @@ -34,7 +34,7 @@ def test_basic_read(sample_data: pa.Table, existing_table: DeltaTable): @pytest.mark.pyspark @pytest.mark.integration def test_partitioned(tmp_path: pathlib.Path, sample_data: pa.Table): - partition_cols = ["date32", "utf8"] + partition_cols = ["date32", "utf8", "timestamp", "bool"] # Add null values to sample data to verify we can read null partitions sample_data_with_null = sample_data From 9fef678bc4931eefc671d9bf6c8d386b17b0c8fe Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 24 Sep 2023 09:56:02 +0200 Subject: [PATCH 02/11] feat: handle path encoding in serde and encode partition values in file names --- .gitignore | 1 + rust/Cargo.toml | 1 + rust/examples/basic_operations.rs | 41 +++++++++++++++++++++++------ rust/examples/recordbatch-writer.rs | 2 +- rust/src/action/mod.rs | 34 +++++++++--------------- rust/src/action/serde_path.rs | 19 +++++++++++++ rust/src/operations/restore.rs | 2 +- rust/src/operations/writer.rs | 2 +- rust/src/table_state.rs | 3 +-- rust/src/writer/utils.rs | 14 +++++----- 10 files changed, 77 insertions(+), 42 deletions(-) create mode 100644 rust/src/action/serde_path.rs diff --git a/.gitignore b/.gitignore index 5fe8f6cf0a..8642b9722a 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ Cargo.lock !/delta-inspect/Cargo.lock !/proofs/Cargo.lock +justfile \ No newline at end of file diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 90941f8fe8..77aeb6e940 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -80,6 +80,7 @@ parquet2 = { version = "0.17", optional = true } percent-encoding = "2" tracing = { version = "0.1", optional = true } rand = "0.8" +urlencoding = "2" # hdfs datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [ diff --git a/rust/examples/basic_operations.rs b/rust/examples/basic_operations.rs index 5c1fb46e86..d95aadfb78 100644 --- a/rust/examples/basic_operations.rs +++ b/rust/examples/basic_operations.rs @@ -1,6 +1,6 @@ use arrow::{ - array::{Int32Array, StringArray}, - datatypes::{DataType, Field, Schema as ArrowSchema}, + array::{Int32Array, StringArray, TimestampMicrosecondArray}, + datatypes::{DataType, Field, Schema as ArrowSchema, TimeUnit}, record_batch::RecordBatch, }; use deltalake::operations::collect_sendable_stream; @@ -26,6 +26,12 @@ fn get_table_columns() -> Vec { true, Default::default(), ), + SchemaField::new( + String::from("timestamp"), + SchemaDataType::primitive(String::from("timestamp")), + true, + Default::default(), + ), ] } @@ -33,20 +39,38 @@ fn get_table_batches() -> RecordBatch { let schema = Arc::new(ArrowSchema::new(vec![ Field::new("int", DataType::Int32, false), Field::new("string", DataType::Utf8, true), + Field::new( + "timestamp", + DataType::Timestamp(TimeUnit::Microsecond, None), + true, + ), ])); let int_values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]); let str_values = StringArray::from(vec!["A", "B", "A", "B", "A", "A", "A", "B", "B", "A", "A"]); - - RecordBatch::try_new(schema, vec![Arc::new(int_values), Arc::new(str_values)]).unwrap() + let ts_values = TimestampMicrosecondArray::from(vec![ + 1000000012, 1000000012, 1000000012, 1000000012, 500012305, 500012305, 500012305, 500012305, + 500012305, 500012305, 500012305, + ]); + RecordBatch::try_new( + schema, + vec![ + Arc::new(int_values), + Arc::new(str_values), + Arc::new(ts_values), + ], + ) + .unwrap() } #[tokio::main(flavor = "current_thread")] async fn main() -> Result<(), deltalake::errors::DeltaTableError> { - // Create a delta operations client pointing at an un-initialized in-memory location. - // In a production environment this would be created with "try_new" and point at - // a real storage location. - let ops = DeltaOps::new_in_memory(); + // Create a delta operations client pointing at an un-initialized location. + let ops = if let Ok(table_uri) = std::env::var("TABLE_URI") { + DeltaOps::try_from_uri(table_uri).await? + } else { + DeltaOps::new_in_memory() + }; // The operations module uses a builder pattern that allows specifying several options // on how the command behaves. The builders implement `Into`, so once @@ -54,6 +78,7 @@ async fn main() -> Result<(), deltalake::errors::DeltaTableError> { let table = ops .create() .with_columns(get_table_columns()) + .with_partition_columns(["timestamp"]) .with_table_name("my_table") .with_comment("A table to show how delta-rs works") .await?; diff --git a/rust/examples/recordbatch-writer.rs b/rust/examples/recordbatch-writer.rs index cca9c6e3fc..f08c33952b 100644 --- a/rust/examples/recordbatch-writer.rs +++ b/rust/examples/recordbatch-writer.rs @@ -36,7 +36,7 @@ async fn main() -> Result<(), DeltaTableError> { })?; info!("Using the location of: {:?}", table_uri); - let table_path = Path::from(table_uri.as_ref()); + let table_path = Path::parse(&table_uri)?; let maybe_table = deltalake::open_table(&table_path).await; let mut table = match maybe_table { diff --git a/rust/src/action/mod.rs b/rust/src/action/mod.rs index d411a502cd..105ebdcdab 100644 --- a/rust/src/action/mod.rs +++ b/rust/src/action/mod.rs @@ -8,6 +8,7 @@ pub mod checkpoints; pub mod parquet2_read; #[cfg(feature = "parquet")] mod parquet_read; +mod serde_path; #[cfg(feature = "arrow")] use arrow_schema::ArrowError; @@ -15,7 +16,6 @@ use futures::StreamExt; use lazy_static::lazy_static; use log::*; use object_store::{path::Path, Error as ObjectStoreError, ObjectStore}; -use percent_encoding::percent_decode; use regex::Regex; use serde::{Deserialize, Serialize}; use serde_json::{Map, Value}; @@ -105,13 +105,6 @@ pub enum ProtocolError { }, } -fn decode_path(raw_path: &str) -> Result { - percent_decode(raw_path.as_bytes()) - .decode_utf8() - .map(|c| c.to_string()) - .map_err(|e| ProtocolError::InvalidField(format!("Decode path failed for action: {e}"))) -} - /// Struct used to represent minValues and maxValues in add action statistics. #[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] #[serde(untagged)] @@ -255,6 +248,7 @@ pub struct StatsParsed { pub struct AddCDCFile { /// A relative path, from the root of the table, or an /// absolute path to a CDC file + #[serde(with = "serde_path")] pub path: String, /// The size of this file in bytes pub size: i64, @@ -351,6 +345,7 @@ impl Eq for DeletionVector {} #[serde(rename_all = "camelCase")] pub struct Add { /// A relative path, from the root of the table, to a file that should be added to the table + #[serde(with = "serde_path")] pub path: String, /// The size of this file in bytes pub size: i64, @@ -403,9 +398,11 @@ pub struct Add { #[serde(skip_serializing, skip_deserializing)] pub stats_parsed: Option, /// Map containing metadata about this file + #[serde(skip_serializing_if = "Option::is_none")] pub tags: Option>>, - ///Metadata about deletion vector + /// Metadata about deletion vector + #[serde(skip_serializing_if = "Option::is_none")] pub deletion_vector: Option, } @@ -431,11 +428,6 @@ impl PartialEq for Add { impl Eq for Add {} impl Add { - /// Returns the Add action with path decoded. - pub fn path_decoded(self) -> Result { - decode_path(&self.path).map(|path| Self { path, ..self }) - } - /// Get whatever stats are available. Uses (parquet struct) parsed_stats if present falling back to json stats. #[cfg(any(feature = "parquet", feature = "parquet2"))] pub fn get_stats(&self) -> Result, serde_json::error::Error> { @@ -569,6 +561,7 @@ impl TryFrom for MetaData { #[serde(rename_all = "camelCase")] pub struct Remove { /// The path of the file that is removed from the table. + #[serde(with = "serde_path")] pub path: String, /// The timestamp when the remove was added to table state. pub deletion_timestamp: Option, @@ -581,12 +574,16 @@ pub struct Remove { /// it's still nullable so we keep it as Option<> for compatibly. pub extended_file_metadata: Option, /// A map from partition column to value for this file. + #[serde(skip_serializing_if = "Option::is_none")] pub partition_values: Option>>, /// Size of this file in bytes + #[serde(skip_serializing_if = "Option::is_none")] pub size: Option, /// Map containing metadata about this file + #[serde(skip_serializing_if = "Option::is_none")] pub tags: Option>>, - ///Metadata about deletion vector + /// Metadata about deletion vector + #[serde(skip_serializing_if = "Option::is_none")] pub deletion_vector: Option, } @@ -617,13 +614,6 @@ impl PartialEq for Remove { } } -impl Remove { - /// Returns the Remove action with path decoded. - pub fn path_decoded(self) -> Result { - decode_path(&self.path).map(|path| Self { path, ..self }) - } -} - /// Action used by streaming systems to track progress using application-specific versions to /// enable idempotency. #[derive(Serialize, Deserialize, Debug, Default, Clone)] diff --git a/rust/src/action/serde_path.rs b/rust/src/action/serde_path.rs new file mode 100644 index 0000000000..ddc51335e8 --- /dev/null +++ b/rust/src/action/serde_path.rs @@ -0,0 +1,19 @@ +use serde::{self, Deserialize, Deserializer, Serialize, Serializer}; +use urlencoding::{decode, encode}; + +pub fn deserialize<'de, D>(deserializer: D) -> Result +where + D: Deserializer<'de>, +{ + let s = String::deserialize(deserializer)?; + let decoded = decode(&s).map_err(serde::de::Error::custom)?.into_owned(); + Ok(decoded) +} + +pub fn serialize(value: &String, serializer: S) -> Result +where + S: Serializer, +{ + let decoded = encode(value).into_owned(); + String::serialize(&decoded, serializer) +} diff --git a/rust/src/operations/restore.rs b/rust/src/operations/restore.rs index a5e51da05b..5800edd96f 100644 --- a/rust/src/operations/restore.rs +++ b/rust/src/operations/restore.rs @@ -252,7 +252,7 @@ async fn check_files_available( files: &Vec, ) -> DeltaResult<()> { for file in files { - let file_path = Path::from(file.path.clone()); + let file_path = Path::parse(file.path.clone())?; match object_store.head(&file_path).await { Ok(_) => {} Err(ObjectStoreError::NotFound { .. }) => { diff --git a/rust/src/operations/writer.rs b/rust/src/operations/writer.rs index 4fef892bf8..a72b832505 100644 --- a/rust/src/operations/writer.rs +++ b/rust/src/operations/writer.rs @@ -247,7 +247,7 @@ impl PartitionWriterConfig { .map_err(|err| WriteError::FileName { source: Box::new(err), })?; - let prefix = Path::from(part_path.as_ref()); + let prefix = Path::parse(part_path.as_ref())?; let writer_properties = writer_properties.unwrap_or_else(|| { WriterProperties::builder() .set_created_by(format!("delta-rs version {}", crate_version())) diff --git a/rust/src/table_state.rs b/rust/src/table_state.rs index 9be2200d9e..2ac17032d4 100644 --- a/rust/src/table_state.rs +++ b/rust/src/table_state.rs @@ -327,12 +327,11 @@ impl DeltaTableState { action::Action::cdc(_v) => {} action::Action::add(v) => { if require_files { - self.files.push(v.path_decoded()?); + self.files.push(v); } } action::Action::remove(v) => { if require_tombstones && require_files { - let v = v.path_decoded()?; self.tombstones.insert(v); } } diff --git a/rust/src/writer/utils.rs b/rust/src/writer/utils.rs index a12809916f..c7f01846fb 100644 --- a/rust/src/writer/utils.rs +++ b/rust/src/writer/utils.rs @@ -21,6 +21,7 @@ use parquet::basic::Compression; use parquet::file::properties::WriterProperties; use parquet::schema::types::ColumnPath; use serde_json::Value; +use urlencoding::encode; use uuid::Uuid; use crate::errors::DeltaResult; @@ -45,13 +46,12 @@ impl PartitionPath { let partition_value = partition_values .get(k) .ok_or_else(|| DeltaWriterError::MissingPartitionColumn(k.to_string()))?; - - let partition_value = partition_value - .as_deref() - .unwrap_or(NULL_PARTITION_VALUE_DATA_PATH); - let part = format!("{k}={partition_value}"); - - path_parts.push(part); + let partition_value = if let Some(val) = partition_value.as_deref() { + encode(val).into_owned() + } else { + NULL_PARTITION_VALUE_DATA_PATH.to_string() + }; + path_parts.push(format!("{k}={partition_value}")); } Ok(PartitionPath { From 18d7a33627b720826f7e1b6555d1141734f38d2e Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 24 Sep 2023 10:44:01 +0200 Subject: [PATCH 03/11] fix: always unquote partition values extracted from path --- python/deltalake/writer.py | 22 +++------------------- 1 file changed, 3 insertions(+), 19 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 8decb41f8e..63befd7619 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -208,15 +208,12 @@ def write_deltalake( partition_schema = pa.schema([schema.field(name) for name in partition_by]) partitioning = ds.partitioning(partition_schema, flavor="hive") else: - partition_schema = pa.schema([]) partitioning = None add_actions: List[AddAction] = [] def visitor(written_file: Any) -> None: - path, partition_values = get_partitions_from_path( - written_file.path, partition_schema=partition_schema - ) + path, partition_values = get_partitions_from_path(written_file.path) stats = get_file_stats_from_metadata(written_file.metadata) # PyArrow added support for written_file.size in 9.0.0 @@ -413,17 +410,7 @@ def try_get_deltatable( return None -quoted_types = [ - pa.timestamp("s"), - pa.timestamp("ms"), - pa.timestamp("us"), - pa.timestamp("ns"), -] - - -def get_partitions_from_path( - path: str, partition_schema: pa.Schema -) -> Tuple[str, Dict[str, Optional[str]]]: +def get_partitions_from_path(path: str) -> Tuple[str, Dict[str, Optional[str]]]: if path[0] == "/": path = path[1:] parts = path.split("/") @@ -436,10 +423,7 @@ def get_partitions_from_path( if value == "__HIVE_DEFAULT_PARTITION__": out[key] = None else: - if partition_schema.field(key).type in quoted_types: - out[key] = unquote(value) - else: - out[key] = value + out[key] = unquote(value) return path, out From 1aa84291fadb986839aa374126ce18c3247503d3 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 24 Sep 2023 11:10:46 +0200 Subject: [PATCH 04/11] test: add tests for related issues --- python/tests/test_table_read.py | 46 +++++++++++++++++++++++++++++++++ python/tests/test_writer.py | 16 ++++++++++++ 2 files changed, 62 insertions(+) diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index db00d96f26..f56c5876cd 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -648,3 +648,49 @@ def assert_scan_equals(table, predicate, expected): assert_num_fragments(table, predicate, 2) expected = pa.table({"part": ["a", "a", "b", "b"], "value": [1, 1, None, None]}) assert_scan_equals(table, predicate, expected) + + +def test_issue_1653_filter_bool_partition(tmp_path: Path): + ta = pa.Table.from_pydict( + { + "bool_col": [True, False, True, False], + "int_col": [0, 1, 2, 3], + "str_col": ["a", "b", "c", "d"], + } + ) + write_deltalake( + tmp_path, ta, partition_by=["bool_col", "int_col"], mode="overwrite" + ) + dt = DeltaTable(tmp_path) + + assert ( + dt.to_pyarrow_table( + filters=[ + ("int_col", "=", 0), + ("bool_col", "=", True), + ] + ).num_rows + == 1 + ) + assert ( + len( + dt.file_uris( + partition_filters=[ + ("int_col", "=", 0), + ("bool_col", "=", "true"), + ] + ) + ) + == 1 + ) + assert ( + len( + dt.file_uris( + partition_filters=[ + ("int_col", "=", 0), + ("bool_col", "=", True), + ] + ) + ) + == 1 + ) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index e45d56539c..ed3ca98ff3 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -888,3 +888,19 @@ def comp(): "a concurrent transaction deleted the same data your transaction deletes" in str(exception) ) + + +def test_issue_1651_roundtrip_timestamp(tmp_path: pathlib.Path): + data = pa.table( + { + "id": pa.array([425], type=pa.int32()), + "data": pa.array(["python-module-test-write"]), + "t": pa.array([datetime(2023, 9, 15)]), + } + ) + + write_deltalake(table_or_uri=tmp_path, mode="append", data=data, partition_by=["t"]) + dt = DeltaTable(table_uri=tmp_path) + dataset = dt.to_pyarrow_dataset() + + assert dataset.count_rows() == 1 From 2a6492c06e9580705456d66493668d457fa29b9e Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 24 Sep 2023 11:16:41 +0200 Subject: [PATCH 05/11] fix: consistent serialization of partition values --- python/deltalake/_util.py | 21 +++++++++++++++++++++ python/deltalake/table.py | 5 +++-- python/deltalake/writer.py | 21 ++------------------- 3 files changed, 26 insertions(+), 21 deletions(-) create mode 100644 python/deltalake/_util.py diff --git a/python/deltalake/_util.py b/python/deltalake/_util.py new file mode 100644 index 0000000000..21fe22d9d7 --- /dev/null +++ b/python/deltalake/_util.py @@ -0,0 +1,21 @@ +from typing import Any + +from datetime import date, datetime + + +def encode_partition_value(val: Any) -> str: + # Rules based on: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization + if isinstance(val, bool): + return str(val).lower() + if isinstance(val, str): + return val + elif isinstance(val, (int, float)): + return str(val) + elif isinstance(val, date): + return val.isoformat() + elif isinstance(val, datetime): + return val.isoformat(sep=" ") + elif isinstance(val, bytes): + return val.decode("unicode_escape", "backslashreplace") + else: + raise ValueError(f"Could not encode partition value for type: {val}") diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 367debbf18..4a9faf7920 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -35,6 +35,7 @@ from .exceptions import DeltaProtocolError from .fs import DeltaStorageHandler from .schema import Schema +from ._util import encode_partition_value MAX_SUPPORTED_READER_VERSION = 1 MAX_SUPPORTED_WRITER_VERSION = 2 @@ -620,9 +621,9 @@ def __stringify_partition_values( for field, op, value in partition_filters: str_value: Union[str, List[str]] if isinstance(value, (list, tuple)): - str_value = [str(val) for val in value] + str_value = [encode_partition_value(val) for val in value] else: - str_value = str(value) + str_value = encode_partition_value(value) out.append((field, op, str_value)) return out diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 63befd7619..9f87ec338a 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -20,6 +20,7 @@ from urllib.parse import unquote, quote from deltalake.fs import DeltaStorageHandler +from ._util import encode_partition_value if TYPE_CHECKING: import pandas as pd @@ -263,7 +264,7 @@ def check_data_is_aligned_with_partition_filtering( for i in range(partition_values.num_rows): # Map will maintain order of partition_columns partition_map = { - column_name: __encode_partition_value( + column_name: encode_partition_value( batch.column(column_name)[i].as_py() ) for column_name in table.metadata().partition_columns @@ -490,21 +491,3 @@ def iter_groups(metadata: Any) -> Iterator[Any]: maximum for maximum in maximums if maximum is not None ) return stats - - -def __encode_partition_value(val: Any) -> str: - # Rules based on: https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization - if isinstance(val, bool): - return str(val).lower() - if isinstance(val, str): - return val - elif isinstance(val, (int, float)): - return str(val) - elif isinstance(val, date): - return val.isoformat() - elif isinstance(val, datetime): - return val.isoformat(sep=" ") - elif isinstance(val, bytes): - return val.decode("unicode_escape", "backslashreplace") - else: - raise ValueError(f"Could not encode partition value for type: {val}") From dda2d9d53b9726a70473cc6e5b8f99788eb43549 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 24 Sep 2023 16:00:42 +0200 Subject: [PATCH 06/11] fix: rounbdtrip special characters --- python/deltalake/writer.py | 2 +- .../test_writer_readable.py | 32 ++++++++ rust/Cargo.toml | 1 - rust/examples/load_table.rs | 20 +++++ rust/src/action/serde_path.rs | 82 +++++++++++++++++-- rust/src/writer/utils.rs | 13 +-- 6 files changed, 136 insertions(+), 14 deletions(-) create mode 100644 rust/examples/load_table.rs diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 9f87ec338a..fc7978213b 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -227,7 +227,7 @@ def visitor(written_file: Any) -> None: add_actions.append( AddAction( - quote(path), + path, size, partition_values, int(datetime.now().timestamp() * 1000), diff --git a/python/tests/pyspark_integration/test_writer_readable.py b/python/tests/pyspark_integration/test_writer_readable.py index e9d5603191..ea555074b8 100644 --- a/python/tests/pyspark_integration/test_writer_readable.py +++ b/python/tests/pyspark_integration/test_writer_readable.py @@ -12,6 +12,7 @@ import delta import delta.pip_utils import delta.tables + import pyspark.pandas as ps spark = get_spark() except ModuleNotFoundError: @@ -63,3 +64,34 @@ def test_overwrite( write_deltalake(path, sample_data, mode="overwrite") assert_spark_read_equal(sample_data, path) + + +@pytest.mark.pyspark +@pytest.mark.integration +def test_issue_1591_roundtrip_special_characters(tmp_path: pathlib.Path): + test_string = r'$%&/()=^"[]#*?.:_-{=}|`<>~/\r\n+' + poisoned = "}|`<>~" + for char in poisoned: + test_string = test_string.replace(char, "") + + data = pa.table( + { + "string": pa.array([test_string], type=pa.utf8()), + "data": pa.array(["python-module-test-write"]), + } + ) + + deltalake_path = tmp_path / "deltalake" + write_deltalake( + table_or_uri=deltalake_path, mode="append", data=data, partition_by=["string"] + ) + + loaded = ps.read_delta(str(deltalake_path), index_col=None).to_pandas() + assert loaded.shape == data.shape + + spark_path = tmp_path / "spark" + spark_df = spark.createDataFrame(data.to_pandas()) + spark_df.write.format("delta").partitionBy(["string"]).save(str(spark_path)) + + loaded = DeltaTable(spark_path).to_pandas() + assert loaded.shape == data.shape diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 77aeb6e940..90941f8fe8 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -80,7 +80,6 @@ parquet2 = { version = "0.17", optional = true } percent-encoding = "2" tracing = { version = "0.1", optional = true } rand = "0.8" -urlencoding = "2" # hdfs datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [ diff --git a/rust/examples/load_table.rs b/rust/examples/load_table.rs new file mode 100644 index 0000000000..18a960eeb0 --- /dev/null +++ b/rust/examples/load_table.rs @@ -0,0 +1,20 @@ +use arrow::record_batch::RecordBatch; +use deltalake::operations::collect_sendable_stream; +use deltalake::DeltaOps; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), deltalake::errors::DeltaTableError> { + // Create a delta operations client pointing at an un-initialized location. + let ops = if let Ok(table_uri) = std::env::var("TABLE_URI") { + DeltaOps::try_from_uri(table_uri).await? + } else { + DeltaOps::try_from_uri("./rust/tests/data/delta-0.8.0").await? + }; + + let (_table, stream) = ops.load().await?; + let data: Vec = collect_sendable_stream(stream).await?; + + println!("{:?}", data); + + Ok(()) +} diff --git a/rust/src/action/serde_path.rs b/rust/src/action/serde_path.rs index ddc51335e8..9868523e81 100644 --- a/rust/src/action/serde_path.rs +++ b/rust/src/action/serde_path.rs @@ -1,19 +1,89 @@ +use std::str::Utf8Error; + +use percent_encoding::{percent_decode_str, percent_encode, AsciiSet, CONTROLS}; use serde::{self, Deserialize, Deserializer, Serialize, Serializer}; -use urlencoding::{decode, encode}; pub fn deserialize<'de, D>(deserializer: D) -> Result where D: Deserializer<'de>, { let s = String::deserialize(deserializer)?; - let decoded = decode(&s).map_err(serde::de::Error::custom)?.into_owned(); - Ok(decoded) + decode_path(&s).map_err(serde::de::Error::custom) } -pub fn serialize(value: &String, serializer: S) -> Result +pub fn serialize(value: &str, serializer: S) -> Result where S: Serializer, { - let decoded = encode(value).into_owned(); - String::serialize(&decoded, serializer) + let encoded = encode_path(value); + String::serialize(&encoded, serializer) +} + +pub const _DELIMITER: &str = "/"; +/// The path delimiter as a single byte +pub const _DELIMITER_BYTE: u8 = _DELIMITER.as_bytes()[0]; + +/// Characters we want to encode. +const INVALID: &AsciiSet = &CONTROLS + // The delimiter we are reserving for internal hierarchy + // .add(DELIMITER_BYTE) + // Characters AWS recommends avoiding for object keys + // https://docs.aws.amazon.com/AmazonS3/latest/dev/UsingMetadata.html + .add(b'\\') + .add(b'{') + .add(b'^') + .add(b'}') + .add(b'%') + .add(b'`') + .add(b']') + .add(b'"') + .add(b'>') + .add(b'[') + // .add(b'~') + .add(b'<') + .add(b'#') + .add(b'|') + // Characters Google Cloud Storage recommends avoiding for object names + // https://cloud.google.com/storage/docs/naming-objects + .add(b'\r') + .add(b'\n') + .add(b'*') + .add(b'?'); + +fn encode_path(path: &str) -> String { + percent_encode(path.as_bytes(), INVALID).to_string() +} + +fn decode_path(path: &str) -> Result { + Ok(percent_decode_str(path).decode_utf8()?.to_string()) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_encode_path() { + let cases = [ + ( + "string=$%25&%2F()%3D%5E%22%5B%5D%23%2A%3F.%3A/part-00023-4b06bc90-0678-4a63-94a2-f09af1adb945.c000.snappy.parquet", + "string=$%2525&%252F()%253D%255E%2522%255B%255D%2523%252A%253F.%253A/part-00023-4b06bc90-0678-4a63-94a2-f09af1adb945.c000.snappy.parquet", + ), + ( + "string=$%25&%2F()%3D%5E%22<>~%5B%5D%7B}`%23|%2A%3F%2F%5Cr%5Cn.%3A/part-00023-e0a68495-8098-40a6-be5f-b502b111b789.c000.snappy.parquet", + "string=$%2525&%252F()%253D%255E%2522%3C%3E~%255B%255D%257B%7D%60%2523%7C%252A%253F%252F%255Cr%255Cn.%253A/part-00023-e0a68495-8098-40a6-be5f-b502b111b789.c000.snappy.parquet" + ), + ( + "string=$%25&%2F()%3D%5E%22<>~%5B%5D%7B}`%23|%2A%3F%2F%5Cr%5Cn.%3A_-/part-00023-346b6795-dafa-4948-bda5-ecdf4baa4445.c000.snappy.parquet", + "string=$%2525&%252F()%253D%255E%2522%3C%3E~%255B%255D%257B%7D%60%2523%7C%252A%253F%252F%255Cr%255Cn.%253A_-/part-00023-346b6795-dafa-4948-bda5-ecdf4baa4445.c000.snappy.parquet" + ) + ]; + + for (raw, expected) in cases { + let encoded = encode_path(raw); + assert_eq!(encoded, expected); + let decoded = decode_path(expected).unwrap(); + assert_eq!(decoded, raw); + } + } } diff --git a/rust/src/writer/utils.rs b/rust/src/writer/utils.rs index c7f01846fb..ca86ca3a64 100644 --- a/rust/src/writer/utils.rs +++ b/rust/src/writer/utils.rs @@ -15,13 +15,12 @@ use arrow::datatypes::{ }; use arrow::json::ReaderBuilder; use arrow::record_batch::*; -use object_store::path::Path; +use object_store::path::{Path, PathPart}; use parking_lot::RwLock; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; use parquet::schema::types::ColumnPath; use serde_json::Value; -use urlencoding::encode; use uuid::Uuid; use crate::errors::DeltaResult; @@ -46,12 +45,14 @@ impl PartitionPath { let partition_value = partition_values .get(k) .ok_or_else(|| DeltaWriterError::MissingPartitionColumn(k.to_string()))?; - let partition_value = if let Some(val) = partition_value.as_deref() { - encode(val).into_owned() + let path_part = if let Some(val) = partition_value.as_deref() { + let part = PathPart::from(val); + let encoded = part.as_ref(); + format!("{k}={encoded}") } else { - NULL_PARTITION_VALUE_DATA_PATH.to_string() + format!("{k}={NULL_PARTITION_VALUE_DATA_PATH}") }; - path_parts.push(format!("{k}={partition_value}")); + path_parts.push(path_part); } Ok(PartitionPath { From 839d1d73b6c5ae9bfc8af970750cdef02674fde9 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 24 Sep 2023 16:05:16 +0200 Subject: [PATCH 07/11] chore: format --- python/deltalake/_util.py | 3 +-- python/deltalake/table.py | 2 +- python/deltalake/writer.py | 3 ++- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/deltalake/_util.py b/python/deltalake/_util.py index 21fe22d9d7..32b8a20a1b 100644 --- a/python/deltalake/_util.py +++ b/python/deltalake/_util.py @@ -1,6 +1,5 @@ -from typing import Any - from datetime import date, datetime +from typing import Any def encode_partition_value(val: Any) -> str: diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 4a9faf7920..9214c60994 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -31,11 +31,11 @@ import pandas from ._internal import RawDeltaTable +from ._util import encode_partition_value from .data_catalog import DataCatalog from .exceptions import DeltaProtocolError from .fs import DeltaStorageHandler from .schema import Schema -from ._util import encode_partition_value MAX_SUPPORTED_READER_VERSION = 1 MAX_SUPPORTED_WRITER_VERSION = 2 diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index fc7978213b..1cce4d3f68 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -17,9 +17,10 @@ Tuple, Union, ) -from urllib.parse import unquote, quote +from urllib.parse import unquote from deltalake.fs import DeltaStorageHandler + from ._util import encode_partition_value if TYPE_CHECKING: From 4ed5ac0fec098c7d8c6490247923f7f7a5001592 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 24 Sep 2023 16:08:41 +0200 Subject: [PATCH 08/11] fix: add feature requirement to load example --- rust/Cargo.toml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 90941f8fe8..9a20c8e864 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -185,6 +185,10 @@ harness = false name = "basic_operations" required-features = ["datafusion"] +[[example]] +name = "load_table" +required-features = ["datafusion"] + [[example]] name = "recordbatch-writer" required-features = ["arrow"] From 815b075ef629762ea2ac3a3c81b49382583ad0db Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 24 Sep 2023 17:56:08 +0200 Subject: [PATCH 09/11] test: add timestamp col to partitioned roundtrip tests --- python/tests/test_writer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index ed3ca98ff3..e72d0ac8cd 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -168,6 +168,7 @@ def test_roundtrip_metadata(tmp_path: pathlib.Path, sample_data: pa.Table): "bool", "binary", "date32", + "timestamp", ], ) def test_roundtrip_partitioned( From c02c4c838ca198336b2b5a32786b131130164442 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 24 Sep 2023 18:50:54 +0200 Subject: [PATCH 10/11] test: add rust roundtip test for special characters --- rust/src/operations/write.rs | 45 ++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index 41dfeaae5a..a933700cac 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -552,7 +552,8 @@ fn cast_record_batch( #[cfg(test)] mod tests { use super::*; - use crate::operations::DeltaOps; + use crate::action::SaveMode; + use crate::operations::{collect_sendable_stream, DeltaOps}; use crate::writer::test_utils::datafusion::get_data; use crate::writer::test_utils::{ get_delta_schema, get_delta_schema_with_nested_struct, get_record_batch, @@ -562,7 +563,7 @@ mod tests { use arrow::datatypes::Schema as ArrowSchema; use arrow_array::{Int32Array, StringArray, TimestampMicrosecondArray}; use arrow_schema::{DataType, TimeUnit}; - use datafusion::assert_batches_sorted_eq; + use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; use serde_json::{json, Value}; #[tokio::test] @@ -855,4 +856,44 @@ mod tests { &expected ); } + + #[tokio::test] + async fn test_special_characters_write_read() { + let tmp_dir = tempdir::TempDir::new("test").unwrap(); + let tmp_path = std::fs::canonicalize(tmp_dir.path()).unwrap(); + + let schema = Arc::new(ArrowSchema::new(vec![ + Field::new("string", DataType::Utf8, true), + Field::new("data", DataType::Utf8, true), + ])); + + let str_values = StringArray::from(vec![r#"$%&/()=^"[]#*?.:_- {=}|`<>~/\r\n+"#]); + let data_values = StringArray::from(vec!["test"]); + + let batch = RecordBatch::try_new(schema, vec![Arc::new(str_values), Arc::new(data_values)]) + .unwrap(); + + let ops = DeltaOps::try_from_uri(tmp_path.as_os_str().to_str().unwrap()) + .await + .unwrap(); + + let table = ops + .write([batch.clone()]) + .with_partition_columns(["string"]) + .await + .unwrap(); + + let (_table, stream) = DeltaOps(table).load().await.unwrap(); + let data: Vec = collect_sendable_stream(stream).await.unwrap(); + + let expected = vec![ + "+------+-----------------------------------+", + "| data | string |", + "+------+-----------------------------------+", + r#"| test | $%&/()=^"[]#*?.:_- {=}|`<>~/\r\n+ |"#, + "+------+-----------------------------------+", + ]; + + assert_batches_eq!(&expected, &data); + } } From f9842cdc30691be2772917efaaa79b7df6d2a473 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Sun, 24 Sep 2023 19:08:17 +0200 Subject: [PATCH 11/11] fix: encode characters illegal on windows --- rust/src/operations/write.rs | 5 ++++- rust/src/writer/utils.rs | 31 ++++++++++++++++++++++++++++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/rust/src/operations/write.rs b/rust/src/operations/write.rs index a933700cac..ca96134935 100644 --- a/rust/src/operations/write.rs +++ b/rust/src/operations/write.rs @@ -877,12 +877,15 @@ mod tests { .await .unwrap(); - let table = ops + let _table = ops .write([batch.clone()]) .with_partition_columns(["string"]) .await .unwrap(); + let table = crate::open_table(tmp_path.as_os_str().to_str().unwrap()) + .await + .unwrap(); let (_table, stream) = DeltaOps(table).load().await.unwrap(); let data: Vec = collect_sendable_stream(stream).await.unwrap(); diff --git a/rust/src/writer/utils.rs b/rust/src/writer/utils.rs index ca86ca3a64..cfc089c164 100644 --- a/rust/src/writer/utils.rs +++ b/rust/src/writer/utils.rs @@ -15,11 +15,13 @@ use arrow::datatypes::{ }; use arrow::json::ReaderBuilder; use arrow::record_batch::*; -use object_store::path::{Path, PathPart}; +use object_store::path::Path; +use object_store::path::DELIMITER_BYTE; use parking_lot::RwLock; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; use parquet::schema::types::ColumnPath; +use percent_encoding::{percent_encode, AsciiSet, CONTROLS}; use serde_json::Value; use uuid::Uuid; @@ -46,8 +48,7 @@ impl PartitionPath { .get(k) .ok_or_else(|| DeltaWriterError::MissingPartitionColumn(k.to_string()))?; let path_part = if let Some(val) = partition_value.as_deref() { - let part = PathPart::from(val); - let encoded = part.as_ref(); + let encoded = percent_encode(val.as_bytes(), INVALID).to_string(); format!("{k}={encoded}") } else { format!("{k}={NULL_PARTITION_VALUE_DATA_PATH}") @@ -61,6 +62,30 @@ impl PartitionPath { } } +const INVALID: &AsciiSet = &CONTROLS + // everything object store needs encoded ... + .add(DELIMITER_BYTE) + .add(b'\\') + .add(b'{') + .add(b'^') + .add(b'}') + .add(b'%') + .add(b'`') + .add(b']') + .add(b'"') + .add(b'>') + .add(b'[') + .add(b'~') + .add(b'<') + .add(b'#') + .add(b'|') + .add(b'\r') + .add(b'\n') + .add(b'*') + .add(b'?') + //... and some more chars illegal on windows + .add(b':'); + impl From for String { fn from(path: PartitionPath) -> String { path.path