Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modify partition_values field type in Add/Remove actions. #354

Merged
merged 5 commits into from
Aug 8, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ struct RawDeltaTableMetaData {
#[pyo3(get)]
created_time: deltalake::DeltaDataTypeTimestamp,
#[pyo3(get)]
configuration: HashMap<String, String>,
configuration: HashMap<String, Option<String>>,
}

#[pymethods]
Expand Down
124 changes: 68 additions & 56 deletions rust/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ pub enum ActionError {
Generic(String),
}

fn populate_hashmap_from_parquet_map(
map: &mut HashMap<String, String>,
fn populate_hashmap_with_option_from_parquet_map(
map: &mut HashMap<String, Option<String>>,
pmap: &parquet::record::Map,
) -> Result<(), &'static str> {
let keys = pmap.get_keys();
Expand All @@ -36,12 +36,12 @@ fn populate_hashmap_from_parquet_map(
.map_err(|_| "key for HashMap in parquet has to be a string")?
.clone(),
)
.or_insert(
.or_insert(Some(
values
.get_string(j)
.map_err(|_| "value for HashMap in parquet has to be a string")?
.clone(),
);
));
}

Ok(())
Expand Down Expand Up @@ -150,7 +150,7 @@ pub struct Add {
/// The size of this file in bytes
pub size: DeltaDataTypeLong,
/// A map from partition column to value for this file
pub partition_values: HashMap<String, String>,
pub partition_values: HashMap<String, Option<String>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurred to me that the problem you found has a much bigger impact than just the partition_values field, it also applies to Add.tags, Remove.tags, MetaData.format.options, MetaData.configurations.

Let me know if you prefer to address them or only focus on partition_values in this PR. If you prefer the latter, I can file an issue to track the fix for the other fields.

Copy link
Contributor Author

@zijie0 zijie0 Aug 7, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey QP, I went through the spec doc but didn't found situations that those structs could also contain null values in the map. Is that because Map[String, String] allows null value in Scala, so that we also need to make the change to HashMap<String, Option<String>>?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct, based on the discussion in the dev list, all data types are annotated using scala syntax, so all Map[String, String] should map to HashMap<String, Option<String>> in rust.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

btw, I am perfectly fine with us addressing these other fields in an other PR if you want to keep the scope of this PR small. The fix for partition_values itself is already a big improvement :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me take a look on this :)

/// Partition values stored in raw parquet struct format. In this struct, the column names
/// correspond to the partition columns and the values are stored in their corresponding data
/// type. This is a required field when the table is partitioned and the table property
Expand Down Expand Up @@ -178,7 +178,7 @@ pub struct Add {
#[serde(skip_serializing, skip_deserializing)]
pub stats_parsed: Option<parquet::record::Row>,
/// Map containing metadata about this file
pub tags: Option<HashMap<String, String>>,
pub tags: Option<HashMap<String, Option<String>>>,
}

impl Add {
Expand Down Expand Up @@ -214,13 +214,16 @@ impl Add {
let parquetMap = record
.get_map(i)
.map_err(|_| gen_action_type_error("add", "partitionValues", "map"))?;
populate_hashmap_from_parquet_map(&mut re.partition_values, parquetMap)
.map_err(|estr| {
ActionError::InvalidField(format!(
"Invalid partitionValues for add action: {}",
estr,
))
})?;
populate_hashmap_with_option_from_parquet_map(
&mut re.partition_values,
parquetMap,
)
.map_err(|estr| {
ActionError::InvalidField(format!(
"Invalid partitionValues for add action: {}",
estr,
))
})?;
}
"partitionValues_parsed" => {
re.partition_values_parsed = Some(
Expand All @@ -235,12 +238,13 @@ impl Add {
"tags" => match record.get_map(i) {
Ok(tags_map) => {
let mut tags = HashMap::new();
populate_hashmap_from_parquet_map(&mut tags, tags_map).map_err(|estr| {
ActionError::InvalidField(format!(
"Invalid tags for add action: {}",
estr,
))
})?;
populate_hashmap_with_option_from_parquet_map(&mut tags, tags_map)
.map_err(|estr| {
ActionError::InvalidField(format!(
"Invalid tags for add action: {}",
estr,
))
})?;
re.tags = Some(tags);
}
_ => {
Expand Down Expand Up @@ -302,7 +306,7 @@ impl Add {
}
"minValues" => match record.get_group(i) {
Ok(row) => {
for (name, field) in row.get_column_iter() {
for (name, field) in row.get_column_iter() {
stats.min_values.insert(name.clone(), field.clone());
}
}
Expand All @@ -312,7 +316,7 @@ impl Add {
}
"maxValues" => match record.get_group(i) {
Ok(row) => {
for (name, field) in row.get_column_iter() {
for (name, field) in row.get_column_iter() {
stats.max_values.insert(name.clone(), field.clone());
}
}
Expand All @@ -322,7 +326,7 @@ impl Add {
}
"nullCount" => match record.get_group(i) {
Ok(row) => {
for (i, (name, _)) in row.get_column_iter().enumerate() {
for (i, (name, _)) in row.get_column_iter().enumerate() {
match row.get_long(i) {
Ok(v) => {
stats.null_count.insert(name.clone(), v);
Expand All @@ -344,7 +348,6 @@ impl Add {
record,
);
}

}
}

Expand All @@ -359,12 +362,12 @@ pub struct Format {
/// Name of the encoding for files in this table.
provider: String,
/// A map containing configuration options for the format.
options: Option<HashMap<String, String>>,
options: Option<HashMap<String, Option<String>>>,
}

impl Format {
/// Allows creation of a new action::Format
pub fn new(provider: String, options: Option<HashMap<String, String>>) -> Self {
pub fn new(provider: String, options: Option<HashMap<String, Option<String>>>) -> Self {
Self { provider, options }
}

Expand Down Expand Up @@ -404,7 +407,7 @@ pub struct MetaData {
/// The time when this metadata action is created, in milliseconds since the Unix epoch
pub created_time: DeltaDataTypeTimestamp,
/// A map containing configuration options for the table
pub configuration: HashMap<String, String>,
pub configuration: HashMap<String, Option<String>>,
}

impl MetaData {
Expand Down Expand Up @@ -463,13 +466,16 @@ impl MetaData {
let configuration_map = record
.get_map(i)
.map_err(|_| gen_action_type_error("metaData", "configuration", "map"))?;
populate_hashmap_from_parquet_map(&mut re.configuration, configuration_map)
.map_err(|estr| {
ActionError::InvalidField(format!(
"Invalid configuration for metaData action: {}",
estr,
))
})?;
populate_hashmap_with_option_from_parquet_map(
&mut re.configuration,
configuration_map,
)
.map_err(|estr| {
ActionError::InvalidField(format!(
"Invalid configuration for metaData action: {}",
estr,
))
})?;
}
"format" => {
let format_record = record
Expand All @@ -485,14 +491,16 @@ impl MetaData {
match record.get_map(1) {
Ok(options_map) => {
let mut options = HashMap::new();
populate_hashmap_from_parquet_map(&mut options, options_map).map_err(
|estr| {
ActionError::InvalidField(format!(
"Invalid format.options for metaData action: {}",
estr,
))
},
)?;
populate_hashmap_with_option_from_parquet_map(
&mut options,
options_map,
)
.map_err(|estr| {
ActionError::InvalidField(format!(
"Invalid format.options for metaData action: {}",
estr,
))
})?;
re.format.options = Some(options);
}
_ => {
Expand Down Expand Up @@ -535,11 +543,11 @@ pub struct Remove {
/// When true the fields partitionValues, size, and tags are present
pub extended_file_metadata: Option<bool>,
/// A map from partition column to value for this file.
pub partition_values: Option<HashMap<String, String>>,
pub partition_values: Option<HashMap<String, Option<String>>>,
/// Size of this file in bytes
pub size: Option<DeltaDataTypeLong>,
/// Map containing metadata about this file
pub tags: Option<HashMap<String, String>>,
pub tags: Option<HashMap<String, Option<String>>>,
}

impl Remove {
Expand Down Expand Up @@ -577,26 +585,30 @@ impl Remove {
gen_action_type_error("remove", "partitionValues", "map")
})?;
let mut partitionValues = HashMap::new();
populate_hashmap_from_parquet_map(&mut partitionValues, parquetMap)
.map_err(|estr| {
ActionError::InvalidField(format!(
"Invalid partitionValues for remove action: {}",
estr,
))
})?;
populate_hashmap_with_option_from_parquet_map(
&mut partitionValues,
parquetMap,
)
.map_err(|estr| {
ActionError::InvalidField(format!(
"Invalid partitionValues for remove action: {}",
estr,
))
})?;
re.partition_values = Some(partitionValues);
}
_ => re.partition_values = None,
},
"tags" => match record.get_map(i) {
Ok(tags_map) => {
let mut tags = HashMap::new();
populate_hashmap_from_parquet_map(&mut tags, tags_map).map_err(|estr| {
ActionError::InvalidField(format!(
"Invalid tags for remove action: {}",
estr,
))
})?;
populate_hashmap_with_option_from_parquet_map(&mut tags, tags_map)
.map_err(|estr| {
ActionError::InvalidField(format!(
"Invalid tags for remove action: {}",
estr,
))
})?;
re.tags = Some(tags);
}
_ => {
Expand Down
36 changes: 26 additions & 10 deletions rust/src/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ fn checkpoint_add_from_state(

for (field_name, data_type) in partition_col_data_types.iter() {
if let Some(string_value) = add.partition_values.get(*field_name) {
let v = typed_partition_value_from_string(string_value, data_type)?;
let v = typed_partition_value_from_option_string(string_value, data_type)?;

partition_values_parsed.insert(field_name.to_string(), v);
}
Expand Down Expand Up @@ -324,6 +324,22 @@ fn typed_partition_value_from_string(
}
}

fn typed_partition_value_from_option_string(
string_value: &Option<String>,
data_type: &SchemaDataType,
) -> Result<Value, CheckPointWriterError> {
match string_value {
Some(s) => {
if s.is_empty() {
Ok(Value::Null) // empty string should be deserialized as null
} else {
typed_partition_value_from_string(s, data_type)
}
}
None => Ok(Value::Null),
}
}

type SchemaPath = Vec<String>;

fn collect_stats_conversions(
Expand Down Expand Up @@ -403,29 +419,29 @@ mod tests {
let string_value: Value = "Hello World!".into();
assert_eq!(
string_value,
typed_partition_value_from_string(
&"Hello World!".to_string(),
&SchemaDataType::primitive("string".to_string())
typed_partition_value_from_option_string(
&Some("Hello World!".to_string()),
&SchemaDataType::primitive("string".to_string()),
)
.unwrap()
);

let bool_value: Value = true.into();
assert_eq!(
bool_value,
typed_partition_value_from_string(
&"true".to_string(),
&SchemaDataType::primitive("boolean".to_string())
typed_partition_value_from_option_string(
&Some("true".to_string()),
&SchemaDataType::primitive("boolean".to_string()),
)
.unwrap()
);

let number_value: Value = 42.into();
assert_eq!(
number_value,
typed_partition_value_from_string(
&"42".to_string(),
&SchemaDataType::primitive("integer".to_string())
typed_partition_value_from_option_string(
&Some("42".to_string()),
&SchemaDataType::primitive("integer".to_string()),
)
.unwrap()
);
Expand Down
10 changes: 5 additions & 5 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ pub struct DeltaTableMetaData {
/// The time when this metadata action is created, in milliseconds since the Unix epoch
pub created_time: DeltaDataTypeTimestamp,
/// table properties
pub configuration: HashMap<String, String>,
pub configuration: HashMap<String, Option<String>>,
}

impl DeltaTableMetaData {
Expand All @@ -215,7 +215,7 @@ impl DeltaTableMetaData {
format: Option<action::Format>,
schema: Schema,
partition_columns: Vec<String>,
configuration: HashMap<String, String>,
configuration: HashMap<String, Option<String>>,
) -> Self {
// Reference implementation uses uuid v4 to create GUID:
// https://github.com/delta-io/delta/blob/master/core/src/main/scala/org/apache/spark/sql/delta/actions/actions.scala#L350
Expand All @@ -232,7 +232,7 @@ impl DeltaTableMetaData {
}

/// Return the configurations of the DeltaTableMetaData; could be empty
pub fn get_configuration(&self) -> &HashMap<String, String> {
pub fn get_configuration(&self) -> &HashMap<String, Option<String>> {
&self.configuration
}
}
Expand Down Expand Up @@ -1012,7 +1012,7 @@ impl DeltaTable {
}

/// Return the tables configurations that are encapsulated in the DeltaTableStates currentMetaData field
pub fn get_configurations(&self) -> Result<&HashMap<String, String>, DeltaTableError> {
pub fn get_configurations(&self) -> Result<&HashMap<String, Option<String>>, DeltaTableError> {
Ok(self
.state
.current_metadata
Expand Down Expand Up @@ -1305,7 +1305,7 @@ impl<'a> DeltaTransaction<'a> {
let mut partition_values = HashMap::new();
if let Some(partitions) = &partitions {
for (key, value) in partitions {
partition_values.insert(key.clone(), value.clone());
partition_values.insert(key.clone(), Some(value.clone()));
}
}

Expand Down
2 changes: 1 addition & 1 deletion rust/tests/checkpoint_writer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ async fn write_simple_checkpoint() {
cleanup_checkpoint_files(log_path.as_path());

// Load the delta table at version 5
let mut table = deltalake::open_table_with_version(table_location, 5)
let table = deltalake::open_table_with_version(table_location, 5)
.await
.unwrap();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{"commitInfo":{"timestamp":1627990388127,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[\"k\"]"},"isBlindAppend":false,"operationMetrics":{"numFiles":"2","numOutputBytes":"920","numOutputRows":"2"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"4c831cf5-cd89-40af-9828-c1de38c23b2a","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"k\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"v\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["k"],"configuration":{},"createdTime":1627990373830}}
{"add":{"path":"k=A/part-00000-b1f1dbbb-70bc-4970-893f-9bb772bf246e.c000.snappy.parquet","partitionValues":{"k":"A"},"size":460,"modificationTime":1627990384000,"dataChange":true}}
{"add":{"path":"k=__HIVE_DEFAULT_PARTITION__/part-00001-8474ac85-360b-4f58-b3ea-23990c71b932.c000.snappy.parquet","partitionValues":{"k":null},"size":460,"modificationTime":1627990384000,"dataChange":true}}
Binary file not shown.
Binary file not shown.
Loading