Skip to content

Commit

Permalink
c
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion committed Sep 18, 2024
1 parent 77b1e42 commit 36e18df
Show file tree
Hide file tree
Showing 6 changed files with 222 additions and 149 deletions.
48 changes: 28 additions & 20 deletions crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ pub use super::read_impl::BatchedParquetReader;
use super::read_impl::{compute_row_group_range, read_parquet, FetchRowGroupsFromMmapReader};
#[cfg(feature = "cloud")]
use super::utils::materialize_empty_df;
use super::utils::projected_arrow_schema_to_projection_indices;
#[cfg(feature = "cloud")]
use crate::cloud::CloudOptions;
use crate::mmap::MmapBytesReader;
Expand Down Expand Up @@ -80,20 +81,23 @@ impl<R: MmapBytesReader> ParquetReader<R> {
self
}

/// Ensure the schema of the file matches the given schema. Calling this
/// after setting the projection will ensure only the projected indices
/// are checked.
pub fn check_schema(mut self, schema: &ArrowSchema) -> PolarsResult<Self> {
let self_schema = self.schema()?;
let self_schema = self_schema.as_ref();
/// Checks that the file contains all the columns in `projected_arrow_schema` with the same
/// dtype, and sets the projection indices.
pub fn with_projected_arrow_schema(
mut self,
first_schema: &ArrowSchema,
projected_arrow_schema: Option<&ArrowSchema>,
) -> PolarsResult<Self> {
let schema = self.schema()?;

if let Some(projection) = self.projection.as_deref() {
ensure_matching_schema(
&schema.try_project_indices(projection)?,
&self_schema.try_project_indices(projection)?,
if let Some(projected_arrow_schema) = projected_arrow_schema {
self.projection = projected_arrow_schema_to_projection_indices(
schema.as_ref(),
projected_arrow_schema,
)?;
} else {
ensure_matching_schema(schema, self_schema)?;
self.projection =
projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
}

Ok(self)
Expand Down Expand Up @@ -288,17 +292,21 @@ impl ParquetAsyncReader {
})
}

pub async fn check_schema(mut self, schema: &ArrowSchema) -> PolarsResult<Self> {
let self_schema = self.schema().await?;
let self_schema = self_schema.as_ref();

if let Some(projection) = self.projection.as_deref() {
ensure_matching_schema(
&schema.try_project_indices(projection)?,
&self_schema.try_project_indices(projection)?,
pub async fn with_projected_arrow_schema(
mut self,
first_schema: &ArrowSchema,
projected_arrow_schema: Option<&ArrowSchema>,
) -> PolarsResult<Self> {
let schema = self.schema().await?;

if let Some(projected_arrow_schema) = projected_arrow_schema {
self.projection = projected_arrow_schema_to_projection_indices(
schema.as_ref(),
projected_arrow_schema,
)?;
} else {
ensure_matching_schema(schema, self_schema)?;
self.projection =
projected_arrow_schema_to_projection_indices(schema.as_ref(), first_schema)?;
}

Ok(self)
Expand Down
33 changes: 32 additions & 1 deletion crates/polars-io/src/parquet/read/utils.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::borrow::Cow;

use polars_core::prelude::{ArrowSchema, DataFrame, Series, IDX_DTYPE};
use polars_core::prelude::{ArrowSchema, DataFrame, DataType, Series, IDX_DTYPE};
use polars_error::{polars_bail, PolarsResult};

use crate::hive::materialize_hive_partitions;
use crate::utils::apply_projection;
Expand Down Expand Up @@ -28,3 +29,33 @@ pub fn materialize_empty_df(

df
}

pub(super) fn projected_arrow_schema_to_projection_indices(
schema: &ArrowSchema,
projected_arrow_schema: &ArrowSchema,
) -> PolarsResult<Option<Vec<usize>>> {
let mut projection_indices = Vec::with_capacity(projected_arrow_schema.len());
let mut is_full_ordered_projection = projected_arrow_schema.len() == schema.len();

for (i, field) in projected_arrow_schema.iter_values().enumerate() {
let dtype = {
let Some((idx, _, field)) = schema.get_full(&field.name) else {
polars_bail!(SchemaMismatch: "did not find column in file: {}", field.name)
};

projection_indices.push(idx);
is_full_ordered_projection &= idx == i;

DataType::from_arrow(&field.dtype, true)
};
let expected_dtype = DataType::from_arrow(&field.dtype, true);

if dtype.clone() != expected_dtype {
polars_bail!(SchemaMismatch: "data type mismatch for column {}: found: {}, expected: {}",
&field.name, dtype, expected_dtype
)
}
}

Ok((!is_full_ordered_projection).then_some(projection_indices))
}
28 changes: 0 additions & 28 deletions crates/polars-mem-engine/src/executors/scan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,6 @@ pub(crate) use self::python_scan::*;
use super::*;
use crate::prelude::*;

#[cfg(any(feature = "ipc", feature = "parquet"))]
type Projection = Option<Vec<usize>>;
#[cfg(any(feature = "ipc", feature = "parquet"))]
type Predicate = Option<Arc<dyn PhysicalIoExpr>>;

#[cfg(any(feature = "ipc", feature = "parquet"))]
fn prepare_scan_args(
predicate: Option<Arc<dyn PhysicalExpr>>,
with_columns: &mut Option<Arc<[PlSmallStr]>>,
schema: &mut SchemaRef,
has_row_index: bool,
hive_partitions: Option<&[Series]>,
) -> (Projection, Predicate) {
let with_columns = mem::take(with_columns);
let schema = mem::take(schema);

let projection = materialize_projection(
with_columns.as_deref(),
&schema,
hive_partitions,
has_row_index,
);

let predicate = predicate.map(phys_expr_to_io_expr);

(projection, predicate)
}

/// Producer of an in memory DataFrame
pub struct DataFrameExec {
pub(crate) df: Arc<DataFrame>,
Expand Down
112 changes: 52 additions & 60 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ impl ParquetExec {
// Modified if we have a negative slice
let mut first_source = 0;

let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left();

let projected_arrow_schema = {
if let Some(with_columns) = self.file_options.with_columns.as_deref() {
Some(Arc::new(first_schema.try_project(with_columns)?))
} else {
None
}
};
let predicate = self.predicate.clone().map(phys_expr_to_io_expr);

// (offset, end)
let (slice_offset, slice_end) = if let Some(slice) = self.file_options.slice {
if slice.0 >= 0 {
Expand Down Expand Up @@ -150,14 +161,6 @@ impl ParquetExec {
.as_ref()
.map(|x| x[i].materialize_partition_columns());

let (projection, predicate) = prepare_scan_args(
self.predicate.clone(),
&mut self.file_options.with_columns.clone(),
&mut self.file_info.schema.clone(),
base_row_index.is_some(),
hive_partitions.as_deref(),
);

let memslice = source.to_memslice()?;

let mut reader = ParquetReader::new(std::io::Cursor::new(memslice));
Expand All @@ -181,9 +184,7 @@ impl ParquetExec {
.map(|x| (x.clone(), Arc::from(source.to_include_path_name()))),
);

reader
.num_rows()
.map(|num_rows| (reader, num_rows, predicate, projection))
reader.num_rows().map(|num_rows| (reader, num_rows))
});

// We do this in parallel because wide tables can take a long time deserializing metadata.
Expand All @@ -192,7 +193,7 @@ impl ParquetExec {
let current_offset_ref = &mut current_offset;
let row_statistics = readers_and_metadata
.iter()
.map(|(_, num_rows, _, _)| {
.map(|(_, num_rows)| {
let cum_rows = *current_offset_ref;
(
cum_rows,
Expand All @@ -205,31 +206,24 @@ impl ParquetExec {
readers_and_metadata
.into_par_iter()
.zip(row_statistics.into_par_iter())
.map(
|((reader, _, predicate, projection), (cumulative_read, slice))| {
let row_index = base_row_index.as_ref().map(|rc| RowIndex {
name: rc.name.clone(),
offset: rc.offset + cumulative_read as IdxSize,
});

let df = reader
.with_slice(Some(slice))
.with_row_index(row_index)
.with_predicate(predicate.clone())
.with_projection(projection.clone())
.check_schema(
self.file_info
.reader_schema
.clone()
.unwrap()
.unwrap_left()
.as_ref(),
)?
.finish()?;

Ok(df)
},
)
.map(|((reader, _), (cumulative_read, slice))| {
let row_index = base_row_index.as_ref().map(|rc| RowIndex {
name: rc.name.clone(),
offset: rc.offset + cumulative_read as IdxSize,
});

let df = reader
.with_slice(Some(slice))
.with_row_index(row_index)
.with_predicate(predicate.clone())
.with_projected_arrow_schema(
first_schema.as_ref(),
projected_arrow_schema.as_deref(),
)?
.finish()?;

Ok(df)
})
.collect::<PolarsResult<Vec<_>>>()
})?;

Expand Down Expand Up @@ -261,6 +255,17 @@ impl ParquetExec {
eprintln!("POLARS PREFETCH_SIZE: {}", batch_size)
}

let first_schema = self.file_info.reader_schema.clone().unwrap().unwrap_left();

let projected_arrow_schema = {
if let Some(with_columns) = self.file_options.with_columns.as_deref() {
Some(Arc::new(first_schema.try_project(with_columns)?))
} else {
None
}
};
let predicate = self.predicate.clone().map(phys_expr_to_io_expr);

// Modified if we have a negative slice
let mut first_file_idx = 0;

Expand Down Expand Up @@ -384,12 +389,12 @@ impl ParquetExec {
.collect::<Vec<_>>();

// Now read the actual data.
let file_info = &self.file_info;
let file_options = &self.file_options;
let use_statistics = self.options.use_statistics;
let predicate = &self.predicate;
let base_row_index_ref = &base_row_index;
let include_file_paths = self.file_options.include_file_paths.as_ref();
let first_schema = first_schema.clone();
let projected_arrow_schema = projected_arrow_schema.clone();
let predicate = predicate.clone();

if verbose {
eprintln!("reading of {}/{} file...", processed, paths.len());
Expand All @@ -399,40 +404,27 @@ impl ParquetExec {
.into_iter()
.enumerate()
.map(|(i, (_, reader))| {
let first_schema = first_schema.clone();
let projected_arrow_schema = projected_arrow_schema.clone();
let predicate = predicate.clone();
let (cumulative_read, slice) = row_statistics[i];
let hive_partitions = hive_parts
.as_ref()
.map(|x| x[i].materialize_partition_columns());

let schema = self
.file_info
.reader_schema
.as_ref()
.unwrap()
.as_ref()
.unwrap_left()
.clone();

async move {
let file_info = file_info.clone();
let row_index = base_row_index_ref.as_ref().map(|rc| RowIndex {
name: rc.name.clone(),
offset: rc.offset + cumulative_read as IdxSize,
});

let (projection, predicate) = prepare_scan_args(
predicate.clone(),
&mut file_options.with_columns.clone(),
&mut file_info.schema.clone(),
row_index.is_some(),
hive_partitions.as_deref(),
);

let df = reader
.with_slice(Some(slice))
.with_row_index(row_index)
.with_projection(projection)
.check_schema(schema.as_ref())
.with_projected_arrow_schema(
first_schema.as_ref(),
projected_arrow_schema.as_deref(),
)
.await?
.use_statistics(use_statistics)
.with_predicate(predicate)
Expand Down
Loading

0 comments on commit 36e18df

Please sign in to comment.