Skip to content

Commit

Permalink
Improved typing to reduce clones and use of unwraps (jorgecarleitao#106)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored and dantengsky committed Apr 1, 2022
1 parent b6c952f commit 4aed8f4
Show file tree
Hide file tree
Showing 32 changed files with 285 additions and 450 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@ target
Cargo.lock
.idea
venv
fixtures/
45 changes: 19 additions & 26 deletions examples/read_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,35 +4,29 @@ use parquet2::indexes;

// ANCHOR: deserialize
use parquet2::encoding::Encoding;
use parquet2::metadata::ColumnDescriptor;
use parquet2::page::{split_buffer, DataPage};
use parquet2::schema::types::ParquetType;

fn deserialize(page: &DataPage, descriptor: &ColumnDescriptor) {
let (_rep_levels, _def_levels, _values_buffer) = split_buffer(page, descriptor);

if let ParquetType::PrimitiveType {
physical_type,
converted_type,
logical_type,
..
} = descriptor.type_()
{
// map the types to your physical typing system (e.g. this usually adds
// casting, tz conversions, int96 to timestamp)
} else {
// column chunks are always primitive types
unreachable!()
}

// finally, decode and deserialize.
match (&page.encoding(), page.dictionary_page()) {
(Encoding::PlainDictionary | Encoding::RleDictionary, Some(_dict_page)) => {
use parquet2::schema::types::PhysicalType;

fn deserialize(page: &DataPage) {
// split the data buffer in repetition levels, definition levels and values
let (_rep_levels, _def_levels, _values_buffer) = split_buffer(page);

// decode and deserialize.
match (
page.descriptor.primitive_type.physical_type,
page.encoding(),
page.dictionary_page(),
) {
(
PhysicalType::Int32,
Encoding::PlainDictionary | Encoding::RleDictionary,
Some(_dict_page),
) => {
// plain encoded page with a dictionary
// _dict_page can be downcasted based on the descriptor's physical type
todo!()
}
(Encoding::Plain, None) => {
(PhysicalType::Int32, Encoding::Plain, None) => {
// plain encoded page
todo!()
}
Expand Down Expand Up @@ -81,7 +75,6 @@ fn main() -> Result<()> {
// ANCHOR: statistics
if let Some(maybe_stats) = column_metadata.statistics() {
let stats = maybe_stats?;
use parquet2::schema::types::PhysicalType;
use parquet2::statistics::PrimitiveStatistics;
match stats.physical_type() {
PhysicalType::Int32 => {
Expand Down Expand Up @@ -138,7 +131,7 @@ fn main() -> Result<()> {
let page = maybe_page?;
let page = parquet2::read::decompress(page, &mut decompress_buffer)?;

let _array = deserialize(&page, column_metadata.descriptor());
let _array = deserialize(&page);
}
// ANCHOR_END: decompress

Expand Down
14 changes: 7 additions & 7 deletions parquet-tools/src/lib/meta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,32 +98,32 @@ where
fn parquet_type_str(parquet_type: &ParquetType) -> String {
match parquet_type {
ParquetType::PrimitiveType {
basic_info,
field_info,
logical_type,
converted_type,
physical_type,
} => {
format!(
"{:27} {:?} {:?} P:{:?} L:{:?} C:{:?}",
basic_info.name(),
basic_info.repetition(),
basic_info.id(),
field_info.name(),
field_info.repetition(),
field_info.id(),
physical_type,
logical_type,
converted_type,
)
}
ParquetType::GroupType {
basic_info,
field_info,
logical_type,
converted_type,
..
} => {
format!(
"{:27} {:?} {:?} L:{:?} C:{:?}",
":",
basic_info.repetition(),
basic_info.id(),
field_info.repetition(),
field_info.id(),
logical_type,
converted_type,
)
Expand Down
9 changes: 3 additions & 6 deletions src/metadata/column_chunk_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use parquet_format_async_temp::{ColumnChunk, ColumnMetaData, Encoding};

use super::column_descriptor::ColumnDescriptor;
use crate::error::Result;
use crate::schema::types::{ParquetType, PhysicalType};
use crate::schema::types::PhysicalType;
use crate::statistics::{deserialize_statistics, Statistics};
use crate::{compression::Compression, schema::types::Type};

Expand Down Expand Up @@ -70,18 +70,15 @@ impl ColumnChunkMetaData {
/// The [`ColumnDescriptor`] for this column. This descriptor contains the physical and logical type
/// of the pages.
pub fn physical_type(&self) -> PhysicalType {
match self.descriptor().type_() {
ParquetType::PrimitiveType { physical_type, .. } => *physical_type,
_ => unreachable!(),
}
self.column_descr.descriptor.primitive_type.physical_type
}

/// Decodes the raw statistics into a statistics
pub fn statistics(&self) -> Option<Result<Arc<dyn Statistics>>> {
self.column_metadata()
.statistics
.as_ref()
.map(|x| deserialize_statistics(x, self.descriptor().clone()))
.map(|x| deserialize_statistics(x, self.column_descr.descriptor.primitive_type.clone()))
}

/// Total number of values in this column chunk.
Expand Down
75 changes: 20 additions & 55 deletions src/metadata/column_descriptor.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,43 @@
use crate::schema::types::{ParquetType, PhysicalType};
use crate::schema::types::{ParquetType, PrimitiveType};

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct Descriptor {
/// The [`PrimitiveType`] of this column
pub primitive_type: PrimitiveType,

/// The maximum definition level
pub max_def_level: i16,

/// The maximum repetition level
pub max_rep_level: i16,
}

/// A descriptor for leaf-level primitive columns.
/// This encapsulates information such as definition and repetition levels and is used to
/// re-assemble nested data.
#[derive(Debug, PartialEq, Clone)]
pub struct ColumnDescriptor {
// The "leaf" primitive type of this column
primitive_type: ParquetType,

// The maximum definition level for this column
max_def_level: i16,

// The maximum repetition level for this column
max_rep_level: i16,
// The descriptor this columns' leaf.
pub descriptor: Descriptor,

// The path of this column. For instance, "a.b.c.d".
path_in_schema: Vec<String>,
pub path_in_schema: Vec<String>,

base_type: ParquetType,
/// The [`ParquetType`] this descriptor is a leaf of
pub base_type: ParquetType,
}

impl ColumnDescriptor {
/// Creates new descriptor for leaf-level column.
pub fn new(
primitive_type: ParquetType,
max_def_level: i16,
max_rep_level: i16,
descriptor: Descriptor,
path_in_schema: Vec<String>,
base_type: ParquetType,
) -> Self {
Self {
primitive_type,
max_def_level,
max_rep_level,
descriptor,
path_in_schema,
base_type,
}
}

/// Returns maximum definition level for this column.
pub fn max_def_level(&self) -> i16 {
self.max_def_level
}

/// Returns maximum repetition level for this column.
pub fn max_rep_level(&self) -> i16 {
self.max_rep_level
}

pub fn path_in_schema(&self) -> &[String] {
&self.path_in_schema
}

pub fn base_type(&self) -> &ParquetType {
&self.base_type
}

/// Returns self type [`ParquetType`] for this leaf column.
pub fn type_(&self) -> &ParquetType {
&self.primitive_type
}

/// Returns self type [`PhysicalType`] for this leaf column.
/// # Panic
/// This function panics if the corresponding [`ParquetType`] is not a primitive type
pub fn physical_type(&self) -> &PhysicalType {
match &self.primitive_type {
ParquetType::PrimitiveType { physical_type, .. } => physical_type,
_ => unreachable!(""),
}
}

/// Returns column name.
pub fn name(&self) -> &str {
self.primitive_type.name()
}
}
2 changes: 1 addition & 1 deletion src/metadata/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ mod schema_descriptor;
mod sort;

pub use column_chunk_metadata::ColumnChunkMetaData;
pub use column_descriptor::ColumnDescriptor;
pub use column_descriptor::{ColumnDescriptor, Descriptor};
pub use column_order::ColumnOrder;
pub use file_metadata::{FileMetaData, KeyValue};
pub use row_metadata::RowGroupMetaData;
Expand Down
22 changes: 12 additions & 10 deletions src/metadata/schema_descriptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ use crate::{
error::ParquetError,
schema::{io_message::from_message, types::ParquetType, Repetition},
};
use crate::{error::Result, schema::types::BasicTypeInfo};
use crate::{error::Result, schema::types::FieldInfo};

use super::column_descriptor::ColumnDescriptor;
use super::column_descriptor::{ColumnDescriptor, Descriptor};

/// A schema descriptor. This encapsulates the top-level schemas for all the columns,
/// as well as all descriptors for all the primitive columns.
Expand Down Expand Up @@ -63,7 +63,7 @@ impl SchemaDescriptor {

pub(crate) fn into_thrift(self) -> Result<Vec<SchemaElement>> {
ParquetType::GroupType {
basic_info: BasicTypeInfo::new(self.name, Repetition::Optional, None, true),
field_info: FieldInfo::new(self.name, Repetition::Optional, None, true),
logical_type: None,
converted_type: None,
fields: self.fields,
Expand All @@ -74,8 +74,8 @@ impl SchemaDescriptor {
fn try_from_type(type_: ParquetType) -> Result<Self> {
match type_ {
ParquetType::GroupType {
basic_info, fields, ..
} => Ok(Self::new(basic_info.name().to_string(), fields)),
field_info, fields, ..
} => Ok(Self::new(field_info.name, fields)),
_ => Err(ParquetError::OutOfSpec(
"The parquet schema MUST be a group type".to_string(),
)),
Expand All @@ -102,7 +102,7 @@ fn build_tree<'a>(
path_so_far: &mut Vec<&'a str>,
) {
path_so_far.push(tp.name());
match *tp.get_basic_info().repetition() {
match tp.get_field_info().repetition {
Repetition::Optional => {
max_def_level += 1;
}
Expand All @@ -114,12 +114,14 @@ fn build_tree<'a>(
}

match tp {
ParquetType::PrimitiveType { .. } => {
ParquetType::PrimitiveType(p) => {
let path_in_schema = path_so_far.iter().copied().map(String::from).collect();
leaves.push(ColumnDescriptor::new(
tp.clone(),
max_def_level,
max_rep_level,
Descriptor {
primitive_type: p.clone(),
max_def_level,
max_rep_level,
},
path_in_schema,
base_tp.clone(),
));
Expand Down
Loading

0 comments on commit 4aed8f4

Please sign in to comment.