Skip to content

Commit

Permalink
Cleanup reading page index (#4149) (#4090) (#4151)
Browse files Browse the repository at this point in the history
* Cleanup reading page index (#4149) (#4090)

* Review feedback

* Add test

* Review feedback
  • Loading branch information
tustvold authored Apr 28, 2023
1 parent 521fdb9 commit 1434d1f
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 163 deletions.
124 changes: 51 additions & 73 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,18 +78,16 @@
use std::collections::VecDeque;
use std::fmt::Formatter;

use std::io::{Cursor, SeekFrom};
use std::io::SeekFrom;
use std::ops::Range;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::format::{OffsetIndex, PageLocation};
use bytes::{Buf, Bytes};
use futures::future::{BoxFuture, FutureExt};
use futures::ready;
use futures::stream::Stream;
use thrift::protocol::{TCompactInputProtocol, TSerializable};

use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};

Expand All @@ -109,9 +107,13 @@ use crate::column::page::{PageIterator, PageReader};
use crate::errors::{ParquetError, Result};
use crate::file::footer::{decode_footer, decode_metadata};
use crate::file::metadata::{ParquetMetaData, RowGroupMetaData};
use crate::file::page_index::index::Index;
use crate::file::page_index::index_reader::{
acc_range, decode_column_index, decode_offset_index,
};
use crate::file::reader::{ChunkReader, Length, SerializedPageReader};
use crate::format::PageLocation;

use crate::file::page_index::index_reader;
use crate::file::FOOTER_SIZE;

use crate::schema::types::{ColumnDescPtr, SchemaDescPtr};
Expand All @@ -121,6 +123,7 @@ pub use metadata::*;

#[cfg(feature = "object_store")]
mod store;

#[cfg(feature = "object_store")]
pub use store::*;

Expand Down Expand Up @@ -240,78 +243,53 @@ impl<T: AsyncFileReader + Send + 'static> ArrowReaderBuilder<AsyncReader<T>> {
&& metadata.column_index().is_none()
&& metadata.offset_index().is_none()
{
let mut fetch_ranges = vec![];
let mut index_lengths: Vec<Vec<usize>> = vec![];

for rg in metadata.row_groups() {
let (loc_offset, loc_length) =
index_reader::get_location_offset_and_total_length(rg.columns())?;

let (idx_offset, idx_lengths) =
index_reader::get_index_offset_and_lengths(rg.columns())?;
let idx_length = idx_lengths.iter().sum::<usize>();

// If index data is missing, return without any indexes
if loc_length == 0 || idx_length == 0 {
return Self::new_builder(AsyncReader(input), metadata, options);
}

fetch_ranges.push(loc_offset as usize..loc_offset as usize + loc_length);
fetch_ranges.push(idx_offset as usize..idx_offset as usize + idx_length);
index_lengths.push(idx_lengths);
}

let mut chunks = input.get_byte_ranges(fetch_ranges).await?.into_iter();
let mut index_lengths = index_lengths.into_iter();

let mut row_groups = metadata.row_groups().to_vec();

let mut columns_indexes = vec![];
let mut offset_indexes = vec![];

for rg in row_groups.iter_mut() {
let columns = rg.columns();

let location_data = chunks.next().unwrap();
let mut cursor = Cursor::new(location_data);
let mut offset_index = vec![];

for _ in 0..columns.len() {
let mut prot = TCompactInputProtocol::new(&mut cursor);
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
offset_index.push(offset.page_locations);
let fetch = metadata.row_groups().iter().flat_map(|r| r.columns()).fold(
None,
|a, c| {
let a = acc_range(a, c.column_index_range());
acc_range(a, c.offset_index_range())
},
);

if let Some(fetch) = fetch {
let bytes = input.get_bytes(fetch.clone()).await?;
let get = |r: Range<usize>| {
&bytes[(r.start - fetch.start)..(r.end - fetch.start)]
};

let mut offset_index = Vec::with_capacity(metadata.num_row_groups());
let mut column_index = Vec::with_capacity(metadata.num_row_groups());
for rg in metadata.row_groups() {
let columns = rg.columns();
let mut rg_offset_index = Vec::with_capacity(columns.len());
let mut rg_column_index = Vec::with_capacity(columns.len());

for chunk in rg.columns() {
let t = chunk.column_type();
let c = match chunk.column_index_range() {
Some(range) => decode_column_index(get(range), t)?,
None => Index::NONE,
};

let o = match chunk.offset_index_range() {
Some(range) => decode_offset_index(get(range))?,
None => return Err(general_err!("missing offset index")),
};

rg_column_index.push(c);
rg_offset_index.push(o);
}
offset_index.push(rg_offset_index);
column_index.push(rg_column_index);
}

offset_indexes.push(offset_index);

let index_data = chunks.next().unwrap();
let index_lengths = index_lengths.next().unwrap();

let mut start = 0;
let data = index_lengths.into_iter().map(|length| {
let r = index_data.slice(start..start + length);
start += length;
r
});

let indexes = rg
.columns()
.iter()
.zip(data)
.map(|(column, data)| {
let column_type = column.column_type();
index_reader::deserialize_column_index(&data, column_type)
})
.collect::<Result<Vec<_>>>()?;
columns_indexes.push(indexes);
metadata = Arc::new(ParquetMetaData::new_with_page_index(
metadata.file_metadata().clone(),
metadata.row_groups().to_vec(),
Some(column_index),
Some(offset_index),
));
}

metadata = Arc::new(ParquetMetaData::new_with_page_index(
metadata.file_metadata().clone(),
row_groups,
Some(columns_indexes),
Some(offset_indexes),
));
}

Self::new_builder(AsyncReader(input), metadata, options)
Expand Down
15 changes: 15 additions & 0 deletions parquet/src/file/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
//! [`ColumnChunkMetaData`](struct.ColumnChunkMetaData.html) has information about column
//! chunk (primitive leaf column), including encoding/compression, number of values, etc.

use std::ops::Range;
use std::sync::Arc;

use crate::format::{
Expand Down Expand Up @@ -565,6 +566,13 @@ impl ColumnChunkMetaData {
self.column_index_length
}

/// Returns the range for the offset index if any
pub(crate) fn column_index_range(&self) -> Option<Range<usize>> {
let offset = usize::try_from(self.column_index_offset?).ok()?;
let length = usize::try_from(self.column_index_length?).ok()?;
Some(offset..(offset + length))
}

/// Returns the offset for the offset index.
pub fn offset_index_offset(&self) -> Option<i64> {
self.offset_index_offset
Expand All @@ -575,6 +583,13 @@ impl ColumnChunkMetaData {
self.offset_index_length
}

/// Returns the range for the offset index if any
pub(crate) fn offset_index_range(&self) -> Option<Range<usize>> {
let offset = usize::try_from(self.offset_index_offset?).ok()?;
let length = usize::try_from(self.offset_index_length?).ok()?;
Some(offset..(offset + length))
}

/// Method to convert from Thrift.
pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) -> Result<Self> {
if cc.meta_data.is_none() {
Expand Down
137 changes: 47 additions & 90 deletions parquet/src/file/page_index/index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,23 @@ use crate::file::metadata::ColumnChunkMetaData;
use crate::file::page_index::index::{Index, NativeIndex};
use crate::file::reader::ChunkReader;
use crate::format::{ColumnIndex, OffsetIndex, PageLocation};
use std::io::{Cursor, Read};
use std::io::Cursor;
use std::ops::Range;
use thrift::protocol::{TCompactInputProtocol, TSerializable};

/// Computes the covering range of two optional ranges
///
/// For example `acc_range(Some(7..9), Some(1..3)) = Some(1..9)`
pub(crate) fn acc_range(
a: Option<Range<usize>>,
b: Option<Range<usize>>,
) -> Option<Range<usize>> {
match (a, b) {
(Some(a), Some(b)) => Some(a.start.min(b.start)..a.end.max(b.end)),
(None, x) | (x, None) => x,
}
}

/// Reads per-column [`Index`] for all columns of a row group by
/// decoding [`ColumnIndex`] .
///
Expand All @@ -42,31 +56,23 @@ pub fn read_columns_indexes<R: ChunkReader>(
reader: &R,
chunks: &[ColumnChunkMetaData],
) -> Result<Vec<Index>, ParquetError> {
let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
let length = lengths.iter().sum::<usize>();

if length == 0 {
return Ok(vec![Index::NONE; chunks.len()]);
}
let fetch = chunks
.iter()
.fold(None, |range, c| acc_range(range, c.column_index_range()));

//read all need data into buffer
let mut reader = reader.get_read(offset, length)?;
let mut data = vec![0; length];
reader.read_exact(&mut data)?;
let fetch = match fetch {
Some(r) => r,
None => return Ok(vec![Index::NONE; chunks.len()]),
};

let mut start = 0;
let data = lengths.into_iter().map(|length| {
let r = &data[start..start + length];
start += length;
r
});
let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - fetch.start)];

chunks
.iter()
.zip(data)
.map(|(chunk, data)| {
let column_type = chunk.column_type();
deserialize_column_index(data, column_type)
.map(|c| match c.column_index_range() {
Some(r) => decode_column_index(get(r), c.column_type()),
None => Ok(Index::NONE),
})
.collect()
}
Expand All @@ -86,88 +92,39 @@ pub fn read_pages_locations<R: ChunkReader>(
reader: &R,
chunks: &[ColumnChunkMetaData],
) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
let (offset, total_length) = get_location_offset_and_total_length(chunks)?;

if total_length == 0 {
return Ok(vec![]);
}

//read all need data into buffer
let mut reader = reader.get_read(offset, total_length)?;
let mut data = vec![0; total_length];
reader.read_exact(&mut data)?;

let mut d = Cursor::new(data);
let mut result = vec![];

for _ in 0..chunks.len() {
let mut prot = TCompactInputProtocol::new(&mut d);
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
result.push(offset.page_locations);
}
Ok(result)
}
let fetch = chunks
.iter()
.fold(None, |range, c| acc_range(range, c.offset_index_range()));

//Get File offsets of every ColumnChunk's page_index
//If there are invalid offset return a zero offset with empty lengths.
pub(crate) fn get_index_offset_and_lengths(
chunks: &[ColumnChunkMetaData],
) -> Result<(u64, Vec<usize>), ParquetError> {
let first_col_metadata = if let Some(chunk) = chunks.first() {
chunk
} else {
return Ok((0, vec![]));
let fetch = match fetch {
Some(r) => r,
None => return Ok(vec![]),
};

let offset: u64 = if let Some(offset) = first_col_metadata.column_index_offset() {
offset.try_into().unwrap()
} else {
return Ok((0, vec![]));
};
let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - fetch.start)];

let lengths = chunks
chunks
.iter()
.map(|x| x.column_index_length())
.map(|maybe_length| {
let index_length = maybe_length.unwrap_or(0);
Ok(index_length.try_into().unwrap())
.map(|c| match c.offset_index_range() {
Some(r) => decode_offset_index(get(r)),
None => Err(general_err!("missing offset index")),
})
.collect::<Result<Vec<_>, ParquetError>>()?;

Ok((offset, lengths))
.collect()
}

//Get File offset of ColumnChunk's pages_locations
//If there are invalid offset return a zero offset with zero length.
pub(crate) fn get_location_offset_and_total_length(
chunks: &[ColumnChunkMetaData],
) -> Result<(u64, usize), ParquetError> {
let metadata = if let Some(chunk) = chunks.first() {
chunk
} else {
return Ok((0, 0));
};

let offset: u64 = if let Some(offset) = metadata.offset_index_offset() {
offset.try_into().unwrap()
} else {
return Ok((0, 0));
};

let total_length = chunks
.iter()
.map(|x| x.offset_index_length().unwrap())
.sum::<i32>() as usize;
Ok((offset, total_length))
pub(crate) fn decode_offset_index(
data: &[u8],
) -> Result<Vec<PageLocation>, ParquetError> {
let mut prot = TCompactInputProtocol::new(data);
let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
Ok(offset.page_locations)
}

pub(crate) fn deserialize_column_index(
pub(crate) fn decode_column_index(
data: &[u8],
column_type: Type,
) -> Result<Index, ParquetError> {
if data.is_empty() {
return Ok(Index::NONE);
}
let mut d = Cursor::new(data);
let mut prot = TCompactInputProtocol::new(&mut d);

Expand Down
Loading

0 comments on commit 1434d1f

Please sign in to comment.