Skip to content

Commit

Permalink
Simplified API.
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Aug 25, 2021
1 parent 7661cd4 commit 0cae8e8
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 207 deletions.
66 changes: 29 additions & 37 deletions integration-tests/src/read/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use parquet::{
encoding::{bitpacking, plain_byte_array, uleb128, Encoding},
error::Result,
metadata::ColumnDescriptor,
page::{BinaryPageDict, DataPage, DataPageHeader, DataPageHeaderExt},
page::{split_buffer, BinaryPageDict, DataPage, DataPageHeader, DataPageHeaderExt},
read::levels,
};

Expand Down Expand Up @@ -70,25 +70,21 @@ pub fn page_dict_to_vec(
descriptor: &ColumnDescriptor,
) -> Result<Vec<Option<Vec<u8>>>> {
assert_eq!(descriptor.max_rep_level(), 0);
match page.header() {
DataPageHeader::V1(header) => match (&page.encoding(), &page.dictionary_page()) {
(Encoding::PlainDictionary, Some(dict)) => {
let (_, def_levels, values) =
levels::split_buffer_v1(page.buffer(), false, descriptor.max_def_level() > 0);
Ok(read_dict_buffer(
def_levels,
values,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
(
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
))
}
(_, None) => todo!("Dictionary-encoded page requires a dictionary"),
_ => todo!(),
},

let (_, def_levels, values) = split_buffer(page, descriptor);

match (&page.encoding(), &page.dictionary_page()) {
(Encoding::PlainDictionary, Some(dict)) => Ok(read_dict_buffer(
def_levels,
values,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
(
&page.definition_level_encoding(),
descriptor.max_def_level(),
),
)),
(_, None) => todo!("Dictionary-encoded page requires a dictionary"),
_ => todo!(),
}
}
Expand Down Expand Up @@ -130,23 +126,19 @@ fn read_buffer(

pub fn page_to_vec(page: &DataPage, descriptor: &ColumnDescriptor) -> Result<Vec<Option<Vec<u8>>>> {
assert_eq!(descriptor.max_rep_level(), 0);
match page.header() {
DataPageHeader::V1(header) => match (&page.encoding(), &page.dictionary_page()) {
(Encoding::Plain, None) => {
let (_, def_levels, values) =
levels::split_buffer_v1(page.buffer(), false, descriptor.max_def_level() > 0);
Ok(read_buffer(
def_levels,
values,
page.num_values() as u32,
(
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
))
}
_ => todo!(),
},

let (_, def_levels, values) = split_buffer(page, descriptor);

match (&page.encoding(), &page.dictionary_page()) {
(Encoding::Plain, None) => Ok(read_buffer(
def_levels,
values,
page.num_values() as u32,
(
&page.definition_level_encoding(),
descriptor.max_def_level(),
),
)),
_ => todo!(),
}
}
103 changes: 30 additions & 73 deletions integration-tests/src/read/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use parquet::{
encoding::{bitpacking, uleb128, Encoding},
error::{ParquetError, Result},
metadata::ColumnDescriptor,
page::{DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict},
read::levels::{get_bit_width, split_buffer_v1, split_buffer_v2, RLEDecoder},
page::{split_buffer, DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict},
read::levels::{get_bit_width, RLEDecoder},
types::NativeType,
};

Expand Down Expand Up @@ -111,44 +111,20 @@ pub fn page_dict_to_vec<T: NativeType>(
descriptor: &ColumnDescriptor,
) -> Result<Vec<Option<T>>> {
assert_eq!(descriptor.max_rep_level(), 0);
match page.header() {
DataPageHeader::V1(header) => match (page.encoding(), page.dictionary_page()) {
(Encoding::PlainDictionary, Some(dict)) => {
let (_, def_levels, values) =
split_buffer_v1(page.buffer(), false, descriptor.max_def_level() > 0);
Ok(read_dict_buffer::<T>(
def_levels,
values,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
(
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
))
}
(_, None) => Err(ParquetError::OutOfSpec(
"A dictionary-encoded page MUST be preceeded by a dictionary page".to_string(),
)),
_ => todo!(),
},
DataPageHeader::V2(header) => match (&header.encoding(), &page.dictionary_page()) {
(Encoding::RleDictionary, Some(dict)) | (Encoding::PlainDictionary, Some(dict)) => {
let (_, def_levels, values) = split_buffer_v2(
page.buffer(),
header.repetition_levels_byte_length as usize,
header.definition_levels_byte_length as usize,
);
Ok(read_dict_buffer::<T>(
def_levels,
values,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
(&Encoding::Rle, descriptor.max_def_level()),
))
}
_ => todo!(),
},

let (_, def_levels, values) = split_buffer(page, descriptor);

match (&page.encoding(), &page.dictionary_page()) {
(Encoding::RleDictionary, Some(dict)) | (Encoding::PlainDictionary, Some(dict)) => {
Ok(read_dict_buffer::<T>(
def_levels,
values,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
(&Encoding::Rle, descriptor.max_def_level()),
))
}
_ => todo!(),
}
}

Expand All @@ -157,38 +133,19 @@ pub fn page_to_vec<T: NativeType>(
descriptor: &ColumnDescriptor,
) -> Result<Vec<Option<T>>> {
assert_eq!(descriptor.max_rep_level(), 0);
match page.header() {
DataPageHeader::V1(header) => match (&header.encoding(), &page.dictionary_page()) {
(Encoding::Plain, None) => {
let (_, def_levels, values) =
split_buffer_v1(page.buffer(), false, descriptor.max_def_level() > 0);
Ok(read_buffer::<T>(
def_levels,
values,
page.num_values() as u32,
(
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
))
}
_ => todo!(),
},
DataPageHeader::V2(header) => match (&header.encoding(), &page.dictionary_page()) {
(Encoding::Plain, None) => {
let (_, def_levels, values) = split_buffer_v2(
page.buffer(),
header.repetition_levels_byte_length as usize,
header.definition_levels_byte_length as usize,
);
Ok(read_buffer::<T>(
def_levels,
values,
page.num_values() as u32,
(&Encoding::Rle, descriptor.max_def_level()),
))
}
_ => todo!(),
},

let (_, def_levels, values) = split_buffer(page, descriptor);

match (&page.encoding(), &page.dictionary_page()) {
(Encoding::Plain, None) => Ok(read_buffer::<T>(
def_levels,
values,
page.num_values() as u32,
(
&page.definition_level_encoding(),
descriptor.max_def_level(),
),
)),
_ => todo!(),
}
}
93 changes: 43 additions & 50 deletions integration-tests/src/read/primitive_nested.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ use parquet::{
encoding::{bitpacking, uleb128, Encoding},
error::{ParquetError, Result},
metadata::ColumnDescriptor,
page::{DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict},
read::levels::{get_bit_width, split_buffer_v1, RLEDecoder},
page::{split_buffer, DataPage, DataPageHeader, DataPageHeaderExt, PrimitivePageDict},
read::levels::{get_bit_width, RLEDecoder},
types::NativeType,
};

Expand Down Expand Up @@ -142,28 +142,24 @@ pub fn page_to_array<T: NativeType>(
page: &DataPage,
descriptor: &ColumnDescriptor,
) -> Result<Array> {
match page.header() {
DataPageHeader::V1(header) => match (&page.encoding(), &page.dictionary_page()) {
(Encoding::Plain, None) => {
let (rep_levels, def_levels, values) = split_buffer_v1(page.buffer(), true, true);
Ok(read_array::<T>(
rep_levels,
def_levels,
values,
page.num_values() as u32,
(
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
))
}
_ => todo!(),
},
DataPageHeader::V2(_) => todo!(),
let (rep_levels, def_levels, values) = split_buffer(page, descriptor);

match (&page.encoding(), &page.dictionary_page()) {
(Encoding::Plain, None) => Ok(read_array::<T>(
rep_levels,
def_levels,
values,
page.num_values() as u32,
(
&page.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&page.definition_level_encoding(),
descriptor.max_def_level(),
),
)),
_ => todo!(),
}
}

Expand Down Expand Up @@ -203,31 +199,28 @@ pub fn page_dict_to_array<T: NativeType>(
descriptor: &ColumnDescriptor,
) -> Result<Array> {
assert_eq!(descriptor.max_rep_level(), 1);
match page.header() {
DataPageHeader::V1(header) => match (page.encoding(), &page.dictionary_page()) {
(Encoding::PlainDictionary, Some(dict)) => {
let (rep_levels, def_levels, values) = split_buffer_v1(page.buffer(), true, true);
Ok(read_dict_array::<T>(
rep_levels,
def_levels,
values,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
(
&header.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&header.definition_level_encoding(),
descriptor.max_def_level(),
),
))
}
(_, None) => Err(ParquetError::OutOfSpec(
"A dictionary-encoded page MUST be preceeded by a dictionary page".to_string(),
)),
_ => todo!(),
},
DataPageHeader::V2(_) => todo!(),

let (rep_levels, def_levels, values) = split_buffer(page, descriptor);

match (page.encoding(), &page.dictionary_page()) {
(Encoding::PlainDictionary, Some(dict)) => Ok(read_dict_array::<T>(
rep_levels,
def_levels,
values,
page.num_values() as u32,
dict.as_any().downcast_ref().unwrap(),
(
&page.repetition_level_encoding(),
descriptor.max_rep_level(),
),
(
&page.definition_level_encoding(),
descriptor.max_def_level(),
),
)),
(_, None) => Err(ParquetError::OutOfSpec(
"A dictionary-encoded page MUST be preceeded by a dictionary page".to_string(),
)),
_ => todo!(),
}
}
Loading

0 comments on commit 0cae8e8

Please sign in to comment.