From 80518512aad3f001ee8c34a9e695d37b8f7ca304 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 18 Nov 2022 15:31:20 +0800 Subject: [PATCH 1/2] feat: impl De/Serialize for RowGroupMetaData. use thrift for types in parquet_format_safe. --- Cargo.toml | 3 ++ src/metadata/column_chunk_metadata.rs | 49 +++++++++++++++++++++++++++ src/metadata/column_descriptor.rs | 4 +++ src/metadata/column_order.rs | 3 ++ src/metadata/row_metadata.rs | 3 ++ src/metadata/schema_descriptor.rs | 4 +++ src/metadata/sort.rs | 4 +++ src/parquet_bridge.rs | 7 ++++ src/schema/types/basic_type.rs | 4 +++ src/schema/types/converted_type.rs | 4 +++ src/schema/types/parquet_type.rs | 4 +++ src/schema/types/physical_type.rs | 3 ++ 12 files changed, 92 insertions(+) diff --git a/Cargo.toml b/Cargo.toml index c28a1eaac..fae222a33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,6 +32,8 @@ flate2 = { version = "^1.0", optional = true, default-features = false } lz4 = { version = "1.23.3", optional = true } zstd = { version = "^0.11", optional = true, default-features = false } lz4_flex = { version = "^0.9.2", optional = true } +serde = { version = "^1.0", optional = true } +serde_derive = { version = "^1.0", optional = true } xxhash-rust = { version="0.8.3", optional = true, features = ["xxh64"] } @@ -48,6 +50,7 @@ snappy = ["snap"] gzip = ["flate2/rust_backend"] gzip_zlib_ng = ["flate2/zlib-ng"] bloom_filter = ["xxhash-rust"] +serde_types = ["serde", "serde_derive"] [[bench]] name = "decode_bitpacking" diff --git a/src/metadata/column_chunk_metadata.rs b/src/metadata/column_chunk_metadata.rs index 2233a7ce1..9affc6393 100644 --- a/src/metadata/column_chunk_metadata.rs +++ b/src/metadata/column_chunk_metadata.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "serde_types")] +use std::io::Cursor; use std::sync::Arc; use parquet_format_safe::{ColumnChunk, ColumnMetaData, Encoding}; @@ -7,16 +9,63 @@ use crate::compression::Compression; use crate::error::{Error, Result}; use crate::schema::types::PhysicalType; use crate::statistics::{deserialize_statistics, Statistics}; +#[cfg(feature = "serde_types")] +use parquet_format_safe::thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol}; +#[cfg(feature = "serde_types")] +use serde::de::Error as DeserializeError; +#[cfg(feature = "serde_types")] +use serde::ser::Error as SerializeError; +#[cfg(feature = "serde_types")] +use serde::{Deserialize, Deserializer, Serializer}; +#[cfg(feature = "serde_types")] +use serde_derive::{Deserialize, Serialize}; /// Metadata for a column chunk. // This contains the `ColumnDescriptor` associated with the chunk so that deserializers have // access to the descriptor (e.g. physical, converted, logical). #[derive(Debug, Clone)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub struct ColumnChunkMetaData { + #[cfg_attr( + feature = "serde_types", + serde(serialize_with = "serialize_column_chunk") + )] + #[cfg_attr( + feature = "serde_types", + serde(deserialize_with = "deserialize_column_chunk") + )] column_chunk: ColumnChunk, column_descr: ColumnDescriptor, } +#[cfg(feature = "serde_types")] +fn serialize_column_chunk( + column_chunk: &ColumnChunk, + serializer: S, +) -> std::result::Result +where + S: Serializer, +{ + let mut buf = vec![]; + let cursor = Cursor::new(&mut buf[..]); + let mut protocol = TCompactOutputProtocol::new(cursor); + column_chunk + .write_to_out_protocol(&mut protocol) + .map_err(S::Error::custom)?; + serializer.serialize_bytes(&buf) +} + +#[cfg(feature = "serde_types")] +fn deserialize_column_chunk<'de, D>(deserializer: D) -> std::result::Result +where + D: Deserializer<'de>, +{ + let buf = Vec::::deserialize(deserializer)?; + let mut cursor = Cursor::new(&buf[..]); + let mut protocol = TCompactInputProtocol::new(&mut cursor, usize::MAX); + ColumnChunk::read_from_in_protocol(&mut protocol).map_err(D::Error::custom) +} + // Represents common operations for a column chunk. impl ColumnChunkMetaData { /// Returns a new [`ColumnChunkMetaData`] diff --git a/src/metadata/column_descriptor.rs b/src/metadata/column_descriptor.rs index a321b1154..f5594b42e 100644 --- a/src/metadata/column_descriptor.rs +++ b/src/metadata/column_descriptor.rs @@ -1,8 +1,11 @@ use crate::schema::types::{ParquetType, PrimitiveType}; +#[cfg(feature = "serde_types")] +use serde_derive::{Deserialize, Serialize}; /// A descriptor of a parquet column. It contains the necessary information to deserialize /// a parquet column. #[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub struct Descriptor { /// The [`PrimitiveType`] of this column pub primitive_type: PrimitiveType, @@ -18,6 +21,7 @@ pub struct Descriptor { /// This encapsulates information such as definition and repetition levels and is used to /// re-assemble nested data. #[derive(Debug, PartialEq, Clone)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub struct ColumnDescriptor { /// The descriptor this columns' leaf. pub descriptor: Descriptor, diff --git a/src/metadata/column_order.rs b/src/metadata/column_order.rs index 8e8d3f96c..1766ffab1 100644 --- a/src/metadata/column_order.rs +++ b/src/metadata/column_order.rs @@ -1,4 +1,6 @@ use super::sort::SortOrder; +#[cfg(feature = "serde_types")] +use serde_derive::{Deserialize, Serialize}; /// Column order that specifies what method was used to aggregate min/max values for /// statistics. @@ -6,6 +8,7 @@ use super::sort::SortOrder; /// If column order is undefined, then it is the legacy behaviour and all values should /// be compared as signed values/bytes. #[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub enum ColumnOrder { /// Column uses the order defined by its logical or physical type /// (if there is no logical type), parquet-format 2.4.0+. diff --git a/src/metadata/row_metadata.rs b/src/metadata/row_metadata.rs index 0cb23e0c5..c86545a6d 100644 --- a/src/metadata/row_metadata.rs +++ b/src/metadata/row_metadata.rs @@ -5,9 +5,12 @@ use crate::{ error::{Error, Result}, write::ColumnOffsetsMetadata, }; +#[cfg(feature = "serde_types")] +use serde_derive::{Deserialize, Serialize}; /// Metadata for a row group. #[derive(Debug, Clone)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub struct RowGroupMetaData { columns: Vec, num_rows: usize, diff --git a/src/metadata/schema_descriptor.rs b/src/metadata/schema_descriptor.rs index ce156b199..528502889 100644 --- a/src/metadata/schema_descriptor.rs +++ b/src/metadata/schema_descriptor.rs @@ -8,9 +8,13 @@ use crate::{error::Result, schema::types::FieldInfo}; use super::column_descriptor::{ColumnDescriptor, Descriptor}; +#[cfg(feature = "serde_types")] +use serde_derive::{Deserialize, Serialize}; + /// A schema descriptor. This encapsulates the top-level schemas for all the columns, /// as well as all descriptors for all the primitive columns. #[derive(Debug, Clone)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub struct SchemaDescriptor { name: String, // The top-level schema (the "message" type). diff --git a/src/metadata/sort.rs b/src/metadata/sort.rs index f81400e8b..28dc8f61d 100644 --- a/src/metadata/sort.rs +++ b/src/metadata/sort.rs @@ -2,6 +2,9 @@ use crate::schema::types::{ IntegerType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType, }; +#[cfg(feature = "serde_types")] +use serde_derive::{Deserialize, Serialize}; + /// Sort order for page and column statistics. /// /// Types are associated with sort orders and column stats are aggregated using a sort @@ -11,6 +14,7 @@ use crate::schema::types::{ /// See reference in /// #[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub enum SortOrder { /// Signed (either value or legacy byte-wise) comparison. Signed, diff --git a/src/parquet_bridge.rs b/src/parquet_bridge.rs index 9bb454ffe..b9a42bd23 100644 --- a/src/parquet_bridge.rs +++ b/src/parquet_bridge.rs @@ -9,9 +9,12 @@ use super::thrift_format::{ }; use crate::error::Error; +#[cfg(feature = "serde_types")] +use serde_derive::{Deserialize, Serialize}; /// The repetition of a parquet field #[derive(Debug, Eq, PartialEq, Hash, Clone, Copy)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub enum Repetition { /// When the field has no null values Required, @@ -433,6 +436,7 @@ impl DataPageHeaderExt for DataPageHeaderV2 { } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub enum TimeUnit { Milliseconds, Microseconds, @@ -461,6 +465,7 @@ impl From for ParquetTimeUnit { /// Enum of all valid logical integer types #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub enum IntegerType { Int8, Int16, @@ -473,6 +478,7 @@ pub enum IntegerType { } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub enum PrimitiveLogicalType { String, Enum, @@ -494,6 +500,7 @@ pub enum PrimitiveLogicalType { } #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub enum GroupLogicalType { Map, List, diff --git a/src/schema/types/basic_type.rs b/src/schema/types/basic_type.rs index c96dd7150..9923d6ea2 100644 --- a/src/schema/types/basic_type.rs +++ b/src/schema/types/basic_type.rs @@ -1,7 +1,11 @@ use super::super::Repetition; +#[cfg(feature = "serde_types")] +use serde_derive::{Deserialize, Serialize}; + /// Common type information. #[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub struct FieldInfo { /// The field name pub name: String, diff --git a/src/schema/types/converted_type.rs b/src/schema/types/converted_type.rs index 494ab4c0b..96fc70d0b 100644 --- a/src/schema/types/converted_type.rs +++ b/src/schema/types/converted_type.rs @@ -1,7 +1,10 @@ use crate::error::Error; use parquet_format_safe::ConvertedType; +#[cfg(feature = "serde_types")] +use serde_derive::{Deserialize, Serialize}; #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub enum PrimitiveConvertedType { Utf8, /// an enum is converted into a binary field @@ -89,6 +92,7 @@ pub enum PrimitiveConvertedType { } #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub enum GroupConvertedType { /// a map is converted as an optional field containing a repeated key/value pair Map, diff --git a/src/schema/types/parquet_type.rs b/src/schema/types/parquet_type.rs index e50ba7c67..511d0f1b6 100644 --- a/src/schema/types/parquet_type.rs +++ b/src/schema/types/parquet_type.rs @@ -7,9 +7,12 @@ use super::{ spec, FieldInfo, GroupConvertedType, GroupLogicalType, PhysicalType, PrimitiveConvertedType, PrimitiveLogicalType, }; +#[cfg(feature = "serde_types")] +use serde_derive::{Deserialize, Serialize}; /// The complete description of a parquet column #[derive(Debug, Clone, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub struct PrimitiveType { /// The fields' generic information pub field_info: FieldInfo, @@ -41,6 +44,7 @@ impl PrimitiveType { /// Representation of a Parquet type describing primitive and nested fields, /// including the top-level schema of the parquet file. #[derive(Clone, Debug, PartialEq)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub enum ParquetType { PrimitiveType(PrimitiveType), GroupType { diff --git a/src/schema/types/physical_type.rs b/src/schema/types/physical_type.rs index 4b44cc495..fec5ad64e 100644 --- a/src/schema/types/physical_type.rs +++ b/src/schema/types/physical_type.rs @@ -1,9 +1,12 @@ use parquet_format_safe::Type; use crate::error::Error; +#[cfg(feature = "serde_types")] +use serde_derive::{Deserialize, Serialize}; /// The set of all physical types representable in Parquet #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[cfg_attr(feature = "serde_types", derive(Deserialize, Serialize))] pub enum PhysicalType { Boolean, Int32, From b1678c93049e335bcc588556770cd86443774c82 Mon Sep 17 00:00:00 2001 From: Yang Xiufeng Date: Fri, 18 Nov 2022 20:27:56 +0800 Subject: [PATCH 2/2] refactor: grouping imports added for feature serde_types. --- src/metadata/column_chunk_metadata.rs | 22 ++++++++++++---------- 1 file changed, 12 insertions(+), 10 deletions(-) diff --git a/src/metadata/column_chunk_metadata.rs b/src/metadata/column_chunk_metadata.rs index 9affc6393..1899c2596 100644 --- a/src/metadata/column_chunk_metadata.rs +++ b/src/metadata/column_chunk_metadata.rs @@ -1,5 +1,3 @@ -#[cfg(feature = "serde_types")] -use std::io::Cursor; use std::sync::Arc; use parquet_format_safe::{ColumnChunk, ColumnMetaData, Encoding}; @@ -9,16 +7,20 @@ use crate::compression::Compression; use crate::error::{Error, Result}; use crate::schema::types::PhysicalType; use crate::statistics::{deserialize_statistics, Statistics}; + #[cfg(feature = "serde_types")] -use parquet_format_safe::thrift::protocol::{TCompactInputProtocol, TCompactOutputProtocol}; -#[cfg(feature = "serde_types")] -use serde::de::Error as DeserializeError; -#[cfg(feature = "serde_types")] -use serde::ser::Error as SerializeError; -#[cfg(feature = "serde_types")] -use serde::{Deserialize, Deserializer, Serializer}; +mod serde_types { + pub use parquet_format_safe::thrift::protocol::{ + TCompactInputProtocol, TCompactOutputProtocol, + }; + pub use serde::de::Error as DeserializeError; + pub use serde::ser::Error as SerializeError; + pub use serde::{Deserialize, Deserializer, Serializer}; + pub use serde_derive::{Deserialize, Serialize}; + pub use std::io::Cursor; +} #[cfg(feature = "serde_types")] -use serde_derive::{Deserialize, Serialize}; +use serde_types::*; /// Metadata for a column chunk. // This contains the `ColumnDescriptor` associated with the chunk so that deserializers have