Skip to content

Commit

Permalink
feat: compare timestamp partition values as timestamps instead of str…
Browse files Browse the repository at this point in the history
…ings (delta-io#1895)

    Delta protocol specifies 2 possible formats for timestamp partitions:
    {year}-{month}-{day} {hour}:{minute}:{second} or {year}-{month}-{day}
    {hour}:{minute}:{second}.{microsecond}

    However, string comparison of partition filter value and partition
    values was performed, which rendered timestamps like 2020-12-31
    23:59:59.000000 and 2020-12-31 23:59:59 as different.

    This change uses timestamp comparison instead of string comparison.

    Co-authored-by: Igor Borodin <igborodi@microsoft.com>
  • Loading branch information
natinimni committed Jan 31, 2024
1 parent 0a344cf commit 36d7249
Show file tree
Hide file tree
Showing 4 changed files with 353 additions and 3 deletions.
38 changes: 36 additions & 2 deletions rust/src/schema/partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
use std::convert::TryFrom;

use super::SchemaDataType;
use chrono::{NaiveDateTime, ParseResult};

use crate::errors::DeltaTableError;
use std::cmp::Ordering;
use std::collections::HashMap;
Expand Down Expand Up @@ -37,6 +39,13 @@ pub struct PartitionFilter {
pub value: PartitionValue,
}

fn parse_timestamp(timestamp_str: &str) -> ParseResult<NaiveDateTime> {
// Timestamp format as per https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
let format = "%Y-%m-%d %H:%M:%S%.f";

NaiveDateTime::parse_from_str(timestamp_str, format)
}

fn compare_typed_value(
partition_value: &str,
filter_value: &str,
Expand All @@ -58,6 +67,13 @@ fn compare_typed_value(
}
_ => None,
},
"timestamp" => match parse_timestamp(filter_value) {
Ok(parsed_filter_value) => {
let parsed_partition_value = parse_timestamp(partition_value).unwrap();
parsed_partition_value.partial_cmp(&parsed_filter_value)
}
_ => None,
},
_ => partition_value.partial_cmp(filter_value),
},
_ => partition_value.partial_cmp(filter_value),
Expand All @@ -77,8 +93,26 @@ impl PartitionFilter {
}

match &self.value {
PartitionValue::Equal(value) => value == &partition.value,
PartitionValue::NotEqual(value) => value != &partition.value,
PartitionValue::Equal(value) => {
match data_type {
SchemaDataType::primitive(type_name) if type_name == "timestamp" => {
compare_typed_value(&partition.value, value, data_type)
.map(|x| x.is_eq())
.unwrap_or(false)
},
_ => value == &partition.value
}
}
PartitionValue::NotEqual(value) => {
match data_type {
SchemaDataType::primitive(type_name) if type_name == "timestamp" => {
compare_typed_value(&partition.value, value, data_type)
.map(|x| !x.is_eq())
.unwrap_or(false)
},
_ => value != &partition.value
}
}
PartitionValue::GreaterThan(value) => {
compare_typed_value(&partition.value, value, data_type)
.map(|x| x.is_gt())
Expand Down
299 changes: 299 additions & 0 deletions rust/src/schema/partitions.rs.bak
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
//! Delta Table partition handling logic.

use std::convert::TryFrom;

use super::SchemaDataType;
use chrono::{NaiveDateTime, ParseResult};

use crate::errors::DeltaTableError;
use std::cmp::Ordering;
use std::collections::HashMap;

/// A Enum used for selecting the partition value operation when filtering a DeltaTable partition.
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum PartitionValue {
/// The partition value with the equal operator
Equal(String),
/// The partition value with the not equal operator
NotEqual(String),
/// The partition value with the greater than operator
GreaterThan(String),
/// The partition value with the greater than or equal operator
GreaterThanOrEqual(String),
/// The partition value with the less than operator
LessThan(String),
/// The partition value with the less than or equal operator
LessThanOrEqual(String),
/// The partition values with the in operator
In(Vec<String>),
/// The partition values with the not in operator
NotIn(Vec<String>),
}

/// A Struct used for filtering a DeltaTable partition by key and value.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PartitionFilter {
/// The key of the PartitionFilter
pub key: String,
/// The value of the PartitionFilter
pub value: PartitionValue,
}

fn parse_timestamp(timestamp_str: &str) -> ParseResult<NaiveDateTime> {
// Timestamp format as per https://github.com/delta-io/delta/blob/master/PROTOCOL.md#partition-value-serialization
let format = "%Y-%m-%d %H:%M:%S%.f";

NaiveDateTime::parse_from_str(timestamp_str, format)
}

fn compare_typed_value(
partition_value: &str,
filter_value: &str,
data_type: &SchemaDataType,
) -> Option<Ordering> {
match data_type {
SchemaDataType::primitive(primitive_type) => match primitive_type.as_str() {
"long" | "integer" | "short" | "byte" => match filter_value.parse::<i64>() {
Ok(parsed_filter_value) => {
let parsed_partition_value = partition_value.parse::<i64>().unwrap();
parsed_partition_value.partial_cmp(&parsed_filter_value)
}
_ => None,
},
"float" | "double" => match filter_value.parse::<f64>() {
Ok(parsed_filter_value) => {
let parsed_partition_value = partition_value.parse::<f64>().unwrap();
parsed_partition_value.partial_cmp(&parsed_filter_value)
}
_ => None,
},
"timestamp" => match parse_timestamp(filter_value) {
Ok(parsed_filter_value) => {
let parsed_partition_value = parse_timestamp(partition_value).unwrap();
parsed_partition_value.partial_cmp(&parsed_filter_value)
}
_ => None,
},
_ => partition_value.partial_cmp(filter_value),
},
_ => partition_value.partial_cmp(filter_value),
}
}

/// Partition filters methods for filtering the DeltaTable partitions.
impl PartitionFilter {
/// Indicates if a DeltaTable partition matches with the partition filter by key and value.
pub fn match_partition(
&self,
partition: &DeltaTablePartition,
data_type: &SchemaDataType,
) -> bool {
if self.key != partition.key {
return false;
}

match &self.value {
PartitionValue::Equal(value) => {
match data_type {
SchemaDataType::primitive(type_name) if type_name == "timestamp" => {
compare_typed_value(&partition.value, value, data_type)
.map(|x| x.is_eq())
.unwrap_or(false)
},
_ => {
value == &partition.value
}
}
PartitionValue::NotEqual(value) => {
if let SchemaDataType::primitive(type_name) = data_type && type_name == "timestamp" {
compare_typed_value(&partition.value, value, data_type)
.map(|x| !x.is_eq())
.unwrap_or(false)
} else {
value != &partition.value
}
}
PartitionValue::GreaterThan(value) => {
compare_typed_value(&partition.value, value, data_type)
.map(|x| x.is_gt())
.unwrap_or(false)
}
PartitionValue::GreaterThanOrEqual(value) => {
compare_typed_value(&partition.value, value, data_type)
.map(|x| x.is_ge())
.unwrap_or(false)
}
PartitionValue::LessThan(value) => {
compare_typed_value(&partition.value, value, data_type)
.map(|x| x.is_lt())
.unwrap_or(false)
}
PartitionValue::LessThanOrEqual(value) => {
compare_typed_value(&partition.value, value, data_type)
.map(|x| x.is_le())
.unwrap_or(false)
}
PartitionValue::In(value) => value.contains(&partition.value),
PartitionValue::NotIn(value) => !value.contains(&partition.value),
}
}

/// Indicates if one of the DeltaTable partition among the list
/// matches with the partition filter.
pub fn match_partitions(
&self,
partitions: &[DeltaTablePartition],
partition_col_data_types: &HashMap<&str, &SchemaDataType>,
) -> bool {
let data_type = partition_col_data_types
.get(self.key.as_str())
.unwrap()
.to_owned();
partitions
.iter()
.any(|partition| self.match_partition(partition, data_type))
}
}

/// Create a PartitionFilter from a filter Tuple with the structure (key, operation, value).
impl TryFrom<(&str, &str, &str)> for PartitionFilter {
type Error = DeltaTableError;

/// Try to create a PartitionFilter from a Tuple of (key, operation, value).
/// Returns a DeltaTableError in case of a malformed filter.
fn try_from(filter: (&str, &str, &str)) -> Result<Self, DeltaTableError> {
match filter {
(key, "=", value) if !key.is_empty() => Ok(PartitionFilter {
key: key.to_owned(),
value: PartitionValue::Equal(value.to_owned()),
}),
(key, "!=", value) if !key.is_empty() => Ok(PartitionFilter {
key: key.to_owned(),
value: PartitionValue::NotEqual(value.to_owned()),
}),
(key, ">", value) if !key.is_empty() => Ok(PartitionFilter {
key: key.to_owned(),
value: PartitionValue::GreaterThan(value.to_owned()),
}),
(key, ">=", value) if !key.is_empty() => Ok(PartitionFilter {
key: key.to_owned(),
value: PartitionValue::GreaterThanOrEqual(value.to_owned()),
}),
(key, "<", value) if !key.is_empty() => Ok(PartitionFilter {
key: key.to_owned(),
value: PartitionValue::LessThan(value.to_owned()),
}),
(key, "<=", value) if !key.is_empty() => Ok(PartitionFilter {
key: key.to_owned(),
value: PartitionValue::LessThanOrEqual(value.to_owned()),
}),
(_, _, _) => Err(DeltaTableError::InvalidPartitionFilter {
partition_filter: format!("{filter:?}"),
}),
}
}
}

/// Create a PartitionFilter from a filter Tuple with the structure (key, operation, list(value)).
impl TryFrom<(&str, &str, &[&str])> for PartitionFilter {
type Error = DeltaTableError;

/// Try to create a PartitionFilter from a Tuple of (key, operation, list(value)).
/// Returns a DeltaTableError in case of a malformed filter.
fn try_from(filter: (&str, &str, &[&str])) -> Result<Self, DeltaTableError> {
match filter {
(key, "in", value) if !key.is_empty() => Ok(PartitionFilter {
key: key.to_owned(),
value: PartitionValue::In(value.iter().map(|x| x.to_string()).collect()),
}),
(key, "not in", value) if !key.is_empty() => Ok(PartitionFilter {
key: key.to_owned(),
value: PartitionValue::NotIn(value.iter().map(|x| x.to_string()).collect()),
}),
(_, _, _) => Err(DeltaTableError::InvalidPartitionFilter {
partition_filter: format!("{filter:?}"),
}),
}
}
}

/// A Struct DeltaTablePartition used to represent a partition of a DeltaTable.
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DeltaTablePartition {
/// The key of the DeltaTable partition.
pub key: String,
/// The value of the DeltaTable partition.
pub value: String,
}

/// Create a DeltaTable partition from a HivePartition string.
///
/// A HivePartition string is represented by a "key=value" format.
///
/// ```rust
/// use deltalake::DeltaTablePartition;
///
/// let hive_part = "ds=2023-01-01";
/// let partition = DeltaTablePartition::try_from(hive_part).unwrap();
/// assert_eq!("ds", partition.key);
/// assert_eq!("2023-01-01", partition.value);
/// ```
impl TryFrom<&str> for DeltaTablePartition {
type Error = DeltaTableError;

/// Try to create a DeltaTable partition from a HivePartition string.
/// Returns a DeltaTableError if the string is not in the form of a HivePartition.
fn try_from(partition: &str) -> Result<Self, DeltaTableError> {
let partition_splitted: Vec<&str> = partition.split('=').collect();
match partition_splitted {
partition_splitted if partition_splitted.len() == 2 => Ok(DeltaTablePartition {
key: partition_splitted[0].to_owned(),
value: partition_splitted[1].to_owned(),
}),
_ => Err(DeltaTableError::PartitionError {
partition: partition.to_string(),
}),
}
}
}

impl DeltaTablePartition {
/// Try to create a DeltaTable partition from a partition value kv pair.
///
/// ```rust
/// use deltalake::DeltaTablePartition;
///
/// let value = ("ds", &Some("2023-01-01".to_string()));
/// let null_default = "1979-01-01";
/// let partition = DeltaTablePartition::from_partition_value(value, null_default);
///
/// assert_eq!("ds", partition.key);
/// assert_eq!("2023-01-01", partition.value);
/// ```
pub fn from_partition_value(
partition_value: (&str, &Option<String>),
default_for_null: &str,
) -> Self {
let (k, v) = partition_value;
let v = match v {
Some(s) => s,
None => default_for_null,
};
DeltaTablePartition {
key: k.to_owned(),
value: v.to_owned(),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn tryfrom_invalid() {
let buf = "this-is-not-a-partition";
let partition = DeltaTablePartition::try_from(buf);
assert!(partition.is_err());
}
}
2 changes: 1 addition & 1 deletion rust/tests/read_delta_log_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ async fn test_read_liquid_table() -> DeltaResult<()> {

#[tokio::test]
async fn test_read_table_features() -> DeltaResult<()> {
let mut _table = deltalake_core::open_table("./tests/data/simple_table_features").await?;
let mut _table = deltalake::open_table("./tests/data/simple_table_features").await?;
let rf = _table.get_reader_features();
let wf = _table.get_writer_features();

Expand Down
17 changes: 17 additions & 0 deletions rust/tests/read_delta_partitions_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@ fn test_match_partition() {
assert!(partition_year_2020_filter.match_partition(&partition_2020, &string_type));
assert!(!partition_year_2020_filter.match_partition(&partition_2019, &string_type));
assert!(!partition_month_12_filter.match_partition(&partition_2019, &string_type));

let partition_2020_12_31_23_59_59 = deltalake::DeltaTablePartition {
key: "time".to_string(),
value: "2020-12-31 23:59:59".to_string(),
};

let partition_time_2020_12_31_23_59_59_filter = deltalake::PartitionFilter {
key: "time".to_string(),
value: deltalake::PartitionValue::Equal("2020-12-31 23:59:59.000000".to_string()),
};

assert!(partition_time_2020_12_31_23_59_59_filter.match_partition(
&partition_2020_12_31_23_59_59,
&SchemaDataType::primitive(String::from("timestamp"))
));
assert!(!partition_time_2020_12_31_23_59_59_filter
.match_partition(&partition_2020_12_31_23_59_59, &string_type));
}

#[test]
Expand Down

0 comments on commit 36d7249

Please sign in to comment.