From 5c76c6a730299fc0e31748b2a58e2ed0b54fd446 Mon Sep 17 00:00:00 2001 From: Raphael Taylor-Davies Date: Thu, 17 Aug 2023 13:35:21 +0100 Subject: [PATCH] Support Field ID in ArrowWriter (#4702) --- parquet/src/arrow/mod.rs | 7 ++ parquet/src/arrow/schema/mod.rs | 200 ++++++++++++++++++++++++-------- 2 files changed, 156 insertions(+), 51 deletions(-) diff --git a/parquet/src/arrow/mod.rs b/parquet/src/arrow/mod.rs index aad4925c7c70..8cca79b40e93 100644 --- a/parquet/src/arrow/mod.rs +++ b/parquet/src/arrow/mod.rs @@ -130,6 +130,13 @@ pub use self::schema::{ /// Schema metadata key used to store serialized Arrow IPC schema pub const ARROW_SCHEMA_META_KEY: &str = "ARROW:schema"; +/// The value of this metadata key, if present on [`Field::metadata`], will be used +/// to populate [`BasicTypeInfo::id`] +/// +/// [`Field::metadata`]: arrow_schema::Field::metadata +/// [`BasicTypeInfo::id`]: crate::schema::types::BasicTypeInfo::id +pub const PARQUET_FIELD_ID_META_KEY: &str = "PARQUET:field_id"; + /// A [`ProjectionMask`] identifies a set of columns within a potentially nested schema to project /// /// In particular, a [`ProjectionMask`] can be constructed from a list of leaf column indices diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index bcfc2f884cac..3f1994d10829 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -295,14 +295,17 @@ fn arrow_to_parquet_type(field: &Field) -> Result { } else { Repetition::REQUIRED }; + let id = field_id(field); // create type from field match field.data_type() { DataType::Null => Type::primitive_type_builder(name, PhysicalType::INT32) .with_logical_type(Some(LogicalType::Unknown)) .with_repetition(repetition) + .with_id(id) .build(), DataType::Boolean => Type::primitive_type_builder(name, PhysicalType::BOOLEAN) .with_repetition(repetition) + .with_id(id) .build(), DataType::Int8 => Type::primitive_type_builder(name, PhysicalType::INT32) .with_logical_type(Some(LogicalType::Integer { @@ -310,6 +313,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { is_signed: true, })) .with_repetition(repetition) + .with_id(id) .build(), DataType::Int16 => Type::primitive_type_builder(name, PhysicalType::INT32) .with_logical_type(Some(LogicalType::Integer { @@ -317,12 +321,15 @@ fn arrow_to_parquet_type(field: &Field) -> Result { is_signed: true, })) .with_repetition(repetition) + .with_id(id) .build(), DataType::Int32 => Type::primitive_type_builder(name, PhysicalType::INT32) .with_repetition(repetition) + .with_id(id) .build(), DataType::Int64 => Type::primitive_type_builder(name, PhysicalType::INT64) .with_repetition(repetition) + .with_id(id) .build(), DataType::UInt8 => Type::primitive_type_builder(name, PhysicalType::INT32) .with_logical_type(Some(LogicalType::Integer { @@ -330,6 +337,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { is_signed: false, })) .with_repetition(repetition) + .with_id(id) .build(), DataType::UInt16 => Type::primitive_type_builder(name, PhysicalType::INT32) .with_logical_type(Some(LogicalType::Integer { @@ -337,6 +345,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { is_signed: false, })) .with_repetition(repetition) + .with_id(id) .build(), DataType::UInt32 => Type::primitive_type_builder(name, PhysicalType::INT32) .with_logical_type(Some(LogicalType::Integer { @@ -344,6 +353,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { is_signed: false, })) .with_repetition(repetition) + .with_id(id) .build(), DataType::UInt64 => Type::primitive_type_builder(name, PhysicalType::INT64) .with_logical_type(Some(LogicalType::Integer { @@ -351,18 +361,22 @@ fn arrow_to_parquet_type(field: &Field) -> Result { is_signed: false, })) .with_repetition(repetition) + .with_id(id) .build(), DataType::Float16 => Err(arrow_err!("Float16 arrays not supported")), DataType::Float32 => Type::primitive_type_builder(name, PhysicalType::FLOAT) .with_repetition(repetition) + .with_id(id) .build(), DataType::Float64 => Type::primitive_type_builder(name, PhysicalType::DOUBLE) .with_repetition(repetition) + .with_id(id) .build(), DataType::Timestamp(TimeUnit::Second, _) => { // Cannot represent seconds in LogicalType Type::primitive_type_builder(name, PhysicalType::INT64) .with_repetition(repetition) + .with_id(id) .build() } DataType::Timestamp(time_unit, tz) => { @@ -384,21 +398,25 @@ fn arrow_to_parquet_type(field: &Field) -> Result { }, })) .with_repetition(repetition) + .with_id(id) .build() } DataType::Date32 => Type::primitive_type_builder(name, PhysicalType::INT32) .with_logical_type(Some(LogicalType::Date)) .with_repetition(repetition) + .with_id(id) .build(), // date64 is cast to date32 (#1666) DataType::Date64 => Type::primitive_type_builder(name, PhysicalType::INT32) .with_logical_type(Some(LogicalType::Date)) .with_repetition(repetition) + .with_id(id) .build(), DataType::Time32(TimeUnit::Second) => { // Cannot represent seconds in LogicalType Type::primitive_type_builder(name, PhysicalType::INT32) .with_repetition(repetition) + .with_id(id) .build() } DataType::Time32(unit) => Type::primitive_type_builder(name, PhysicalType::INT32) @@ -410,6 +428,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { }, })) .with_repetition(repetition) + .with_id(id) .build(), DataType::Time64(unit) => Type::primitive_type_builder(name, PhysicalType::INT64) .with_logical_type(Some(LogicalType::Time { @@ -421,6 +440,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { }, })) .with_repetition(repetition) + .with_id(id) .build(), DataType::Duration(_) => { Err(arrow_err!("Converting Duration to parquet not supported",)) @@ -429,17 +449,20 @@ fn arrow_to_parquet_type(field: &Field) -> Result { Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_converted_type(ConvertedType::INTERVAL) .with_repetition(repetition) + .with_id(id) .with_length(12) .build() } DataType::Binary | DataType::LargeBinary => { Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) .with_repetition(repetition) + .with_id(id) .build() } DataType::FixedSizeBinary(length) => { Type::primitive_type_builder(name, PhysicalType::FIXED_LEN_BYTE_ARRAY) .with_repetition(repetition) + .with_id(id) .with_length(*length) .build() } @@ -459,6 +482,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { }; Type::primitive_type_builder(name, physical_type) .with_repetition(repetition) + .with_id(id) .with_length(length) .with_logical_type(Some(LogicalType::Decimal { scale: *scale as i32, @@ -472,6 +496,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { Type::primitive_type_builder(name, PhysicalType::BYTE_ARRAY) .with_logical_type(Some(LogicalType::String)) .with_repetition(repetition) + .with_id(id) .build() } DataType::List(f) | DataType::FixedSizeList(f, _) | DataType::LargeList(f) => { @@ -484,6 +509,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { )]) .with_logical_type(Some(LogicalType::List)) .with_repetition(repetition) + .with_id(id) .build() } DataType::Struct(fields) => { @@ -500,6 +526,7 @@ fn arrow_to_parquet_type(field: &Field) -> Result { Type::group_type_builder(name) .with_fields(fields) .with_repetition(repetition) + .with_id(id) .build() } DataType::Map(field, _) => { @@ -508,22 +535,15 @@ fn arrow_to_parquet_type(field: &Field) -> Result { .with_fields(vec![Arc::new( Type::group_type_builder(field.name()) .with_fields(vec![ - Arc::new(arrow_to_parquet_type(&Field::new( - struct_fields[0].name(), - struct_fields[0].data_type().clone(), - false, - ))?), - Arc::new(arrow_to_parquet_type(&Field::new( - struct_fields[1].name(), - struct_fields[1].data_type().clone(), - struct_fields[1].is_nullable(), - ))?), + Arc::new(arrow_to_parquet_type(&struct_fields[0])?), + Arc::new(arrow_to_parquet_type(&struct_fields[1])?), ]) .with_repetition(Repetition::REPEATED) .build()?, )]) .with_logical_type(Some(LogicalType::Map)) .with_repetition(repetition) + .with_id(id) .build() } else { Err(arrow_err!( @@ -543,6 +563,11 @@ fn arrow_to_parquet_type(field: &Field) -> Result { } } +fn field_id(field: &Field) -> Option { + let value = field.metadata().get(super::PARQUET_FIELD_ID_META_KEY)?; + value.parse().ok() // Fail quietly if not a valid integer +} + #[cfg(test)] mod tests { use super::*; @@ -551,6 +576,7 @@ mod tests { use arrow::datatypes::{DataType, Field, IntervalUnit, TimeUnit}; + use crate::arrow::PARQUET_FIELD_ID_META_KEY; use crate::file::metadata::KeyValue; use crate::{ arrow::{arrow_reader::ParquetRecordBatchReaderBuilder, ArrowWriter}, @@ -1555,17 +1581,18 @@ mod tests { #[test] fn test_arrow_schema_roundtrip() -> Result<()> { - // This tests the roundtrip of an Arrow schema - // Fields that are commented out fail roundtrip tests or are unsupported by the writer - let metadata: HashMap = - [("Key".to_string(), "Value".to_string())] - .iter() - .cloned() - .collect(); + let meta = |a: &[(&str, &str)]| -> HashMap { + a.iter() + .map(|(a, b)| (a.to_string(), b.to_string())) + .collect() + }; let schema = Schema::new_with_metadata( vec![ - Field::new("c1", DataType::Utf8, false), + Field::new("c1", DataType::Utf8, false).with_metadata(meta(&[ + ("Key", "Foo"), + (PARQUET_FIELD_ID_META_KEY, "2"), + ])), Field::new("c2", DataType::Binary, false), Field::new("c3", DataType::FixedSizeBinary(3), false), Field::new("c4", DataType::Boolean, false), @@ -1598,24 +1625,40 @@ mod tests { Field::new("c20", DataType::Interval(IntervalUnit::YearMonth), false), Field::new_list( "c21", - Field::new("list", DataType::Boolean, true), + Field::new("item", DataType::Boolean, true).with_metadata(meta(&[ + ("Key", "Bar"), + (PARQUET_FIELD_ID_META_KEY, "5"), + ])), + false, + ) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "4")])), + Field::new( + "c22", + DataType::FixedSizeList( + Arc::new(Field::new("item", DataType::Boolean, true)), + 5, + ), + false, + ), + Field::new_list( + "c23", + Field::new_large_list( + "inner", + Field::new( + "item", + DataType::Struct( + vec![ + Field::new("a", DataType::Int16, true), + Field::new("b", DataType::Float64, false), + ] + .into(), + ), + false, + ), + true, + ), false, ), - // Field::new( - // "c22", - // DataType::FixedSizeList(Box::new(DataType::Boolean), 5), - // false, - // ), - // Field::new( - // "c23", - // DataType::List(Box::new(DataType::LargeList(Box::new( - // DataType::Struct(vec![ - // Field::new("a", DataType::Int16, true), - // Field::new("b", DataType::Float64, false), - // ]), - // )))), - // true, - // ), Field::new( "c24", DataType::Struct(Fields::from(vec![ @@ -1626,6 +1669,7 @@ mod tests { ), Field::new("c25", DataType::Interval(IntervalUnit::YearMonth), true), Field::new("c26", DataType::Interval(IntervalUnit::DayTime), true), + // Duration types not supported // Field::new("c27", DataType::Duration(TimeUnit::Second), false), // Field::new("c28", DataType::Duration(TimeUnit::Millisecond), false), // Field::new("c29", DataType::Duration(TimeUnit::Microsecond), false), @@ -1639,19 +1683,29 @@ mod tests { true, 123, true, - ), + ) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "6")])), Field::new("c32", DataType::LargeBinary, true), Field::new("c33", DataType::LargeUtf8, true), - // Field::new( - // "c34", - // DataType::LargeList(Box::new(DataType::List(Box::new( - // DataType::Struct(vec![ - // Field::new("a", DataType::Int16, true), - // Field::new("b", DataType::Float64, true), - // ]), - // )))), - // true, - // ), + Field::new_large_list( + "c34", + Field::new_list( + "inner", + Field::new( + "item", + DataType::Struct( + vec![ + Field::new("a", DataType::Int16, true), + Field::new("b", DataType::Float64, true), + ] + .into(), + ), + true, + ), + true, + ), + true, + ), Field::new("c35", DataType::Null, true), Field::new("c36", DataType::Decimal128(2, 1), false), Field::new("c37", DataType::Decimal256(50, 20), false), @@ -1671,29 +1725,34 @@ mod tests { Field::new_map( "c40", "my_entries", - Field::new("my_key", DataType::Utf8, false), + Field::new("my_key", DataType::Utf8, false) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "8")])), Field::new_list( "my_value", - Field::new("item", DataType::Utf8, true), + Field::new("item", DataType::Utf8, true) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "10")])), true, - ), + ) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "9")])), false, // fails to roundtrip keys_sorted true, - ), + ) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "7")])), Field::new_map( "c41", "my_entries", Field::new("my_key", DataType::Utf8, false), Field::new_list( "my_value", - Field::new("item", DataType::Utf8, true), + Field::new("item", DataType::Utf8, true) + .with_metadata(meta(&[(PARQUET_FIELD_ID_META_KEY, "11")])), true, ), false, // fails to roundtrip keys_sorted false, ), ], - metadata, + meta(&[("Key", "Value")]), ); // write to an empty parquet file so that schema is serialized @@ -1707,9 +1766,48 @@ mod tests { // read file back let arrow_reader = ParquetRecordBatchReaderBuilder::try_new(file).unwrap(); + + // Check arrow schema let read_schema = arrow_reader.schema(); assert_eq!(&schema, read_schema.as_ref()); + // Walk schema finding field IDs + let mut stack = Vec::with_capacity(10); + let mut out = Vec::with_capacity(10); + + let root = arrow_reader.parquet_schema().root_schema_ptr(); + stack.push((root.name().to_string(), root)); + + while let Some((p, t)) = stack.pop() { + if t.is_group() { + for f in t.get_fields() { + stack.push((format!("{p}.{}", f.name()), f.clone())) + } + } + + let info = t.get_basic_info(); + if info.has_id() { + out.push(format!("{p} -> {}", info.id())) + } + } + out.sort_unstable(); + let out: Vec<_> = out.iter().map(|x| x.as_str()).collect(); + + assert_eq!( + &out, + &[ + "arrow_schema.c1 -> 2", + "arrow_schema.c21 -> 4", + "arrow_schema.c21.list.item -> 5", + "arrow_schema.c31 -> 6", + "arrow_schema.c40 -> 7", + "arrow_schema.c40.my_entries.my_key -> 8", + "arrow_schema.c40.my_entries.my_value -> 9", + "arrow_schema.c40.my_entries.my_value.list.item -> 10", + "arrow_schema.c41.my_entries.my_value.list.item -> 11", + ] + ); + Ok(()) }