diff --git a/integration-tests/src/read/binary.rs b/integration-tests/src/read/binary.rs index e62dda6e0..c17fe62d1 100644 --- a/integration-tests/src/read/binary.rs +++ b/integration-tests/src/read/binary.rs @@ -2,7 +2,7 @@ use parquet::{ encoding::{bitpacking, plain_byte_array, uleb128, Encoding}, error::Result, metadata::ColumnDescriptor, - page::{BinaryPageDict, DataPage, DataPageHeader, DataPageHeaderExt}, + page::{split_buffer, BinaryPageDict, DataPage, DataPageHeader, DataPageHeaderExt}, read::levels, }; @@ -70,25 +70,21 @@ pub fn page_dict_to_vec( descriptor: &ColumnDescriptor, ) -> Result>>> { assert_eq!(descriptor.max_rep_level(), 0); - match page.header() { - DataPageHeader::V1(header) => match (&page.encoding(), &page.dictionary_page()) { - (Encoding::PlainDictionary, Some(dict)) => { - let (_, def_levels, values) = - levels::split_buffer_v1(page.buffer(), false, descriptor.max_def_level() > 0); - Ok(read_dict_buffer( - def_levels, - values, - page.num_values() as u32, - dict.as_any().downcast_ref().unwrap(), - ( - &header.definition_level_encoding(), - descriptor.max_def_level(), - ), - )) - } - (_, None) => todo!("Dictionary-encoded page requires a dictionary"), - _ => todo!(), - }, + + let (_, def_levels, values) = split_buffer(page, descriptor); + + match (&page.encoding(), &page.dictionary_page()) { + (Encoding::PlainDictionary, Some(dict)) => Ok(read_dict_buffer( + def_levels, + values, + page.num_values() as u32, + dict.as_any().downcast_ref().unwrap(), + ( + &page.definition_level_encoding(), + descriptor.max_def_level(), + ), + )), + (_, None) => todo!("Dictionary-encoded page requires a dictionary"), _ => todo!(), } } @@ -130,23 +126,19 @@ fn read_buffer( pub fn page_to_vec(page: &DataPage, descriptor: &ColumnDescriptor) -> Result>>> { assert_eq!(descriptor.max_rep_level(), 0); - match page.header() { - DataPageHeader::V1(header) => match (&page.encoding(), &page.dictionary_page()) { - (Encoding::Plain, None) => { - let (_, def_levels, values) = - levels::split_buffer_v1(page.buffer(), false, descriptor.max_def_level() > 0); - Ok(read_buffer( - def_levels, - values, - page.num_values() as u32, - ( - &header.definition_level_encoding(), - descriptor.max_def_level(), - ), - )) - } - _ => todo!(), - }, + + let (_, def_levels, values) = split_buffer(page, descriptor); + + match (&page.encoding(), &page.dictionary_page()) { + (Encoding::Plain, None) => Ok(read_buffer( + def_levels, + values, + page.num_values() as u32, + ( + &page.definition_level_encoding(), + descriptor.max_def_level(), + ), + )), _ => todo!(), } } diff --git a/integration-tests/src/read/primitive.rs b/integration-tests/src/read/primitive.rs index 9fbe7f3dd..bea808dfd 100644 --- a/integration-tests/src/read/primitive.rs +++ b/integration-tests/src/read/primitive.rs @@ -6,8 +6,8 @@ use parquet::{ encoding::{bitpacking, uleb128, Encoding}, error::{ParquetError, Result}, metadata::ColumnDescriptor, - page::{DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict}, - read::levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder}, + page::{split_buffer, DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict}, + read::levels::{get_bit_width, RLEDecoder}, types::NativeType, }; @@ -111,44 +111,20 @@ pub fn page_dict_to_vec( descriptor: &ColumnDescriptor, ) -> Result>> { assert_eq!(descriptor.max_rep_level(), 0); - match page.header() { - DataPageHeader::V1(header) => match (page.encoding(), page.dictionary_page()) { - (Encoding::PlainDictionary, Some(dict)) => { - let (_, def_levels, values) = - split_buffer_v1(page.buffer(), false, descriptor.max_def_level() > 0); - Ok(read_dict_buffer::( - def_levels, - values, - page.num_values() as u32, - dict.as_any().downcast_ref().unwrap(), - ( - &header.definition_level_encoding(), - descriptor.max_def_level(), - ), - )) - } - (_, None) => Err(ParquetError::OutOfSpec( - "A dictionary-encoded page MUST be preceeded by a dictionary page".to_string(), - )), - _ => todo!(), - }, - DataPageHeader::V2(header) => match (&header.encoding(), &page.dictionary_page()) { - (Encoding::RleDictionary, Some(dict)) | (Encoding::PlainDictionary, Some(dict)) => { - let (_, def_levels, values) = split_buffer_v2( - page.buffer(), - header.repetition_levels_byte_length as usize, - header.definition_levels_byte_length as usize, - ); - Ok(read_dict_buffer::( - def_levels, - values, - page.num_values() as u32, - dict.as_any().downcast_ref().unwrap(), - (&Encoding::Rle, descriptor.max_def_level()), - )) - } - _ => todo!(), - }, + + let (_, def_levels, values) = split_buffer(page, descriptor); + + match (&page.encoding(), &page.dictionary_page()) { + (Encoding::RleDictionary, Some(dict)) | (Encoding::PlainDictionary, Some(dict)) => { + Ok(read_dict_buffer::( + def_levels, + values, + page.num_values() as u32, + dict.as_any().downcast_ref().unwrap(), + (&Encoding::Rle, descriptor.max_def_level()), + )) + } + _ => todo!(), } } @@ -157,38 +133,19 @@ pub fn page_to_vec( descriptor: &ColumnDescriptor, ) -> Result>> { assert_eq!(descriptor.max_rep_level(), 0); - match page.header() { - DataPageHeader::V1(header) => match (&header.encoding(), &page.dictionary_page()) { - (Encoding::Plain, None) => { - let (_, def_levels, values) = - split_buffer_v1(page.buffer(), false, descriptor.max_def_level() > 0); - Ok(read_buffer::( - def_levels, - values, - page.num_values() as u32, - ( - &header.definition_level_encoding(), - descriptor.max_def_level(), - ), - )) - } - _ => todo!(), - }, - DataPageHeader::V2(header) => match (&header.encoding(), &page.dictionary_page()) { - (Encoding::Plain, None) => { - let (_, def_levels, values) = split_buffer_v2( - page.buffer(), - header.repetition_levels_byte_length as usize, - header.definition_levels_byte_length as usize, - ); - Ok(read_buffer::( - def_levels, - values, - page.num_values() as u32, - (&Encoding::Rle, descriptor.max_def_level()), - )) - } - _ => todo!(), - }, + + let (_, def_levels, values) = split_buffer(page, descriptor); + + match (&page.encoding(), &page.dictionary_page()) { + (Encoding::Plain, None) => Ok(read_buffer::( + def_levels, + values, + page.num_values() as u32, + ( + &page.definition_level_encoding(), + descriptor.max_def_level(), + ), + )), + _ => todo!(), } } diff --git a/integration-tests/src/read/primitive_nested.rs b/integration-tests/src/read/primitive_nested.rs index aeed46aa0..180cb5ad6 100644 --- a/integration-tests/src/read/primitive_nested.rs +++ b/integration-tests/src/read/primitive_nested.rs @@ -6,8 +6,8 @@ use parquet::{ encoding::{bitpacking, uleb128, Encoding}, error::{ParquetError, Result}, metadata::ColumnDescriptor, - page::{DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict}, - read::levels::{get_bit_width, split_buffer_v1, RLEDecoder}, + page::{split_buffer, DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict}, + read::levels::{get_bit_width, RLEDecoder}, types::NativeType, }; @@ -142,28 +142,24 @@ pub fn page_to_array( page: &DataPage, descriptor: &ColumnDescriptor, ) -> Result { - match page.header() { - DataPageHeader::V1(header) => match (&page.encoding(), &page.dictionary_page()) { - (Encoding::Plain, None) => { - let (rep_levels, def_levels, values) = split_buffer_v1(page.buffer(), true, true); - Ok(read_array::( - rep_levels, - def_levels, - values, - page.num_values() as u32, - ( - &header.repetition_level_encoding(), - descriptor.max_rep_level(), - ), - ( - &header.definition_level_encoding(), - descriptor.max_def_level(), - ), - )) - } - _ => todo!(), - }, - DataPageHeader::V2(_) => todo!(), + let (rep_levels, def_levels, values) = split_buffer(page, descriptor); + + match (&page.encoding(), &page.dictionary_page()) { + (Encoding::Plain, None) => Ok(read_array::( + rep_levels, + def_levels, + values, + page.num_values() as u32, + ( + &page.repetition_level_encoding(), + descriptor.max_rep_level(), + ), + ( + &page.definition_level_encoding(), + descriptor.max_def_level(), + ), + )), + _ => todo!(), } } @@ -203,31 +199,28 @@ pub fn page_dict_to_array( descriptor: &ColumnDescriptor, ) -> Result { assert_eq!(descriptor.max_rep_level(), 1); - match page.header() { - DataPageHeader::V1(header) => match (page.encoding(), &page.dictionary_page()) { - (Encoding::PlainDictionary, Some(dict)) => { - let (rep_levels, def_levels, values) = split_buffer_v1(page.buffer(), true, true); - Ok(read_dict_array::( - rep_levels, - def_levels, - values, - page.num_values() as u32, - dict.as_any().downcast_ref().unwrap(), - ( - &header.repetition_level_encoding(), - descriptor.max_rep_level(), - ), - ( - &header.definition_level_encoding(), - descriptor.max_def_level(), - ), - )) - } - (_, None) => Err(ParquetError::OutOfSpec( - "A dictionary-encoded page MUST be preceeded by a dictionary page".to_string(), - )), - _ => todo!(), - }, - DataPageHeader::V2(_) => todo!(), + + let (rep_levels, def_levels, values) = split_buffer(page, descriptor); + + match (page.encoding(), &page.dictionary_page()) { + (Encoding::PlainDictionary, Some(dict)) => Ok(read_dict_array::( + rep_levels, + def_levels, + values, + page.num_values() as u32, + dict.as_any().downcast_ref().unwrap(), + ( + &page.repetition_level_encoding(), + descriptor.max_rep_level(), + ), + ( + &page.definition_level_encoding(), + descriptor.max_def_level(), + ), + )), + (_, None) => Err(ParquetError::OutOfSpec( + "A dictionary-encoded page MUST be preceeded by a dictionary page".to_string(), + )), + _ => todo!(), } } diff --git a/src/page/mod.rs b/src/page/mod.rs index 38984dbad..77558df8a 100644 --- a/src/page/mod.rs +++ b/src/page/mod.rs @@ -1,7 +1,6 @@ mod page_dict; pub use page_dict::*; -use std::convert::TryInto; use std::sync::Arc; pub use parquet_format_async_temp::{ @@ -11,7 +10,7 @@ pub use parquet_format_async_temp::{ pub use crate::parquet_bridge::{DataPageHeaderExt, PageType}; use crate::compression::Compression; -use crate::encoding::Encoding; +use crate::encoding::{get_length, Encoding}; use crate::error::Result; use crate::metadata::ColumnDescriptor; @@ -145,8 +144,22 @@ impl DataPage { pub fn encoding(&self) -> Encoding { match &self.header { - DataPageHeader::V1(d) => d.encoding.try_into().unwrap(), - DataPageHeader::V2(d) => d.encoding.try_into().unwrap(), + DataPageHeader::V1(d) => d.encoding(), + DataPageHeader::V2(d) => d.encoding(), + } + } + + pub fn definition_level_encoding(&self) -> Encoding { + match &self.header { + DataPageHeader::V1(d) => d.definition_level_encoding(), + DataPageHeader::V2(_) => Encoding::Rle, + } + } + + pub fn repetition_level_encoding(&self) -> Encoding { + match &self.header { + DataPageHeader::V1(d) => d.repetition_level_encoding(), + DataPageHeader::V2(_) => Encoding::Rle, } } @@ -185,5 +198,64 @@ pub enum CompressedPage { Dict(CompressedDictPage), } -// read: CompressedPage -> Page -// write: Page -> CompressedPage +/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v1 pages. +#[inline] +pub fn split_buffer_v1(buffer: &[u8], has_rep: bool, has_def: bool) -> (&[u8], &[u8], &[u8]) { + let (rep, buffer) = if has_rep { + let level_buffer_length = get_length(buffer) as usize; + ( + &buffer[4..4 + level_buffer_length], + &buffer[4 + level_buffer_length..], + ) + } else { + (&[] as &[u8], buffer) + }; + + let (def, buffer) = if has_def { + let level_buffer_length = get_length(buffer) as usize; + ( + &buffer[4..4 + level_buffer_length], + &buffer[4 + level_buffer_length..], + ) + } else { + (&[] as &[u8], buffer) + }; + + (rep, def, buffer) +} + +/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values) for v2 pages. +pub fn split_buffer_v2( + buffer: &[u8], + rep_level_buffer_length: usize, + def_level_buffer_length: usize, +) -> (&[u8], &[u8], &[u8]) { + ( + &buffer[..rep_level_buffer_length], + &buffer[rep_level_buffer_length..rep_level_buffer_length + def_level_buffer_length], + &buffer[rep_level_buffer_length + def_level_buffer_length..], + ) +} + +/// Splits the page buffer into 3 slices corresponding to (encoded rep levels, encoded def levels, encoded values). +pub fn split_buffer<'a>( + page: &'a DataPage, + descriptor: &ColumnDescriptor, +) -> (&'a [u8], &'a [u8], &'a [u8]) { + match page.header() { + DataPageHeader::V1(_) => split_buffer_v1( + page.buffer(), + descriptor.max_rep_level() > 0, + descriptor.max_def_level() > 0, + ), + DataPageHeader::V2(header) => { + let def_level_buffer_length = header.definition_levels_byte_length as usize; + let rep_level_buffer_length = header.repetition_levels_byte_length as usize; + split_buffer_v2( + page.buffer(), + rep_level_buffer_length, + def_level_buffer_length, + ) + } + } +} diff --git a/src/read/compression.rs b/src/read/compression.rs index 42802697b..541a1af33 100644 --- a/src/read/compression.rs +++ b/src/read/compression.rs @@ -66,6 +66,8 @@ pub fn decompress_buffer( } /// Decompresses the page, using `buffer` for decompression. +/// If `page.buffer.len() == 0`, there was no decompression and the buffer was moved. +/// Else, decompression took place. pub fn decompress( mut compressed_page: CompressedDataPage, buffer: &mut Vec, diff --git a/src/read/levels.rs b/src/read/levels.rs index b081c65bf..0789ec082 100644 --- a/src/read/levels.rs +++ b/src/read/levels.rs @@ -1,4 +1,4 @@ -use crate::encoding::{bitpacking, get_length, hybrid_rle}; +use crate::encoding::{bitpacking, hybrid_rle}; /// Returns the number of bits needed to store the given maximum definition or repetition level. #[inline] @@ -102,45 +102,6 @@ impl<'a> Iterator for RLEDecoder<'a> { impl<'a> ExactSizeIterator for RLEDecoder<'a> {} -/// returns slices corresponding to (rep, def, values) for v1 pages -#[inline] -pub fn split_buffer_v1(buffer: &[u8], has_rep: bool, has_def: bool) -> (&[u8], &[u8], &[u8]) { - let (rep, buffer) = if has_rep { - let level_buffer_length = get_length(buffer) as usize; - ( - &buffer[4..4 + level_buffer_length], - &buffer[4 + level_buffer_length..], - ) - } else { - (&[] as &[u8], buffer) - }; - - let (def, buffer) = if has_def { - let level_buffer_length = get_length(buffer) as usize; - ( - &buffer[4..4 + level_buffer_length], - &buffer[4 + level_buffer_length..], - ) - } else { - (&[] as &[u8], buffer) - }; - - (rep, def, buffer) -} - -/// returns slices corresponding to (rep, def, values) for v2 pages -pub fn split_buffer_v2( - buffer: &[u8], - rep_level_buffer_length: usize, - def_level_buffer_length: usize, -) -> (&[u8], &[u8], &[u8]) { - ( - &buffer[..rep_level_buffer_length], - &buffer[rep_level_buffer_length..rep_level_buffer_length + def_level_buffer_length], - &buffer[rep_level_buffer_length + def_level_buffer_length..], - ) -} - #[cfg(test)] mod tests { use super::get_bit_width; @@ -161,4 +122,4 @@ mod tests { assert_eq!(8, get_bit_width(255)); assert_eq!(9, get_bit_width(256)); } -} \ No newline at end of file +}