Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Incorrect height from slicing after projecting only the file path column #20817

Merged
merged 7 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions crates/polars-io/src/ipc/ipc_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,16 +253,16 @@ impl<R: MmapBytesReader> SerReader<R> for IpcReader<R> {
// In case only hive columns are projected, the df would be empty, but we need the row count
// of the file in order to project the correct number of rows for the hive columns.
let (mut df, row_count) = (|| {
if self
.projection
.as_ref()
.map(|x| x.is_empty())
.unwrap_or(false)
{
return PolarsResult::Ok((
Default::default(),
get_row_count(&mut self.reader)? as usize,
));
if self.projection.as_ref().is_some_and(|x| x.is_empty()) {
let row_count = if let Some(v) = self.n_rows {
v
} else {
get_row_count(&mut self.reader)? as usize
};
let mut df = DataFrame::empty();
unsafe { df.set_height(row_count) };

return PolarsResult::Ok((df, row_count));
}

if self.memory_map.is_some() && self.reader.to_file().is_some() {
Expand Down
64 changes: 32 additions & 32 deletions crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ pub struct BatchedParquetReader {
chunk_size: usize,
use_statistics: bool,
hive_partition_columns: Option<Arc<[Series]>>,
include_file_path: Option<StringChunked>,
include_file_path: Option<Column>,
/// Has returned at least one materialized frame.
has_returned: bool,
}
Expand Down Expand Up @@ -1079,8 +1079,16 @@ impl BatchedParquetReader {
chunk_size,
use_statistics,
hive_partition_columns: hive_partition_columns.map(Arc::from),
include_file_path: include_file_path
.map(|(col, path)| StringChunked::full(col, &path, 1)),
include_file_path: include_file_path.map(|(col, path)| {
Column::new_scalar(
col,
Scalar::new(
DataType::String,
AnyValue::StringOwned(path.as_ref().into()),
),
1,
)
}),
has_returned: false,
})
}
Expand Down Expand Up @@ -1128,6 +1136,8 @@ impl BatchedParquetReader {
.fetch_row_groups(row_group_range.clone())
.await?;

let prev_rows_read = self.rows_read;

let mut dfs = {
// Spawn the decoding and decompression of the bytes on a rayon task.
// This will ensure we don't block the async thread.
Expand Down Expand Up @@ -1170,36 +1180,26 @@ impl BatchedParquetReader {
dfs
};

if let Some(ca) = self.include_file_path.as_mut() {
let mut max_len = 0;

if self.projection.is_empty() {
max_len = self.metadata.num_rows;
} else {
for df in &dfs {
max_len = std::cmp::max(max_len, df.height());
if let Some(column) = self.include_file_path.as_ref() {
if dfs.first().is_some_and(|x| x.width() > 0) {
for df in &mut dfs {
unsafe { df.with_column_unchecked(column.new_from_index(0, df.height())) };
}
}

// Re-use the same ChunkedArray
if ca.len() < max_len {
*ca = ca.new_from_index(0, max_len);
}

for df in &mut dfs {
unsafe {
df.with_column_unchecked(
ca.slice(
0,
if !self.projection.is_empty() {
df.height()
} else {
self.metadata.num_rows
},
)
.into_column(),
)
};
} else {
let (offset, len) = self.slice;
let end = offset + len;

debug_assert_eq!(dfs.len(), 1);
dfs.get_mut(0).unwrap().insert_column(
0,
column.new_from_index(
0,
(self.rows_read.min(end.try_into().unwrap_or(IdxSize::MAX))
- prev_rows_read)
.try_into()
.unwrap(),
),
)?;
}
}

Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ impl<R: MmapBytesReader> SerReader<R> for ParquetReader<R> {
fn finish(mut self) -> PolarsResult<DataFrame> {
let schema = self.schema()?;
let metadata = self.get_metadata()?.clone();
let n_rows = metadata.num_rows;
let n_rows = metadata.num_rows.min(self.slice.0 + self.slice.1);

if let Some(cols) = &self.columns {
self.projection = Some(columns_to_projection(cols, schema.as_ref())?);
Expand Down
27 changes: 17 additions & 10 deletions crates/polars-mem-engine/src/executors/multi_file_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ use std::borrow::Cow;
use hive::HivePartitions;
use polars_core::config;
use polars_core::frame::column::ScalarColumn;
use polars_core::utils::{
accumulate_dataframes_vertical, accumulate_dataframes_vertical_unchecked,
};
use polars_core::utils::accumulate_dataframes_vertical_unchecked;
use polars_io::predicates::BatchStats;
use polars_io::RowIndex;

Expand Down Expand Up @@ -327,10 +325,21 @@ impl MultiScanExec {

let final_per_source_schema = &self.file_info.schema;
let file_output_schema = if let Some(file_with_columns) = file_with_columns.as_ref() {
Arc::new(final_per_source_schema.try_project(file_with_columns.as_ref())?)
let mut schema = final_per_source_schema.try_project(file_with_columns.as_ref())?;

if let Some(v) = include_file_paths.clone() {
schema.extend([(v, DataType::String)]);
}

Arc::new(schema)
} else {
final_per_source_schema.clone()
};

if slice.is_some_and(|x| x.1 == 0) {
return Ok(DataFrame::empty_with_schema(final_per_source_schema));
}

let mut missing_columns = Vec::new();

let verbose = config::verbose();
Expand Down Expand Up @@ -548,13 +557,11 @@ impl MultiScanExec {
dfs.push(df);
}

let out = if cfg!(debug_assertions) {
accumulate_dataframes_vertical(dfs)?
if dfs.is_empty() {
Ok(DataFrame::empty_with_schema(final_per_source_schema))
} else {
accumulate_dataframes_vertical_unchecked(dfs)
};

Ok(out)
Ok(accumulate_dataframes_vertical_unchecked(dfs))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,35 @@ impl ProjectionPushDown {
file_options.include_file_paths.as_deref(),
);

if let Some(projection) = file_options.with_columns.as_mut() {
if projection.is_empty() {
match &scan_type {
#[cfg(feature = "parquet")]
FileScan::Parquet { .. } => {},
#[cfg(feature = "ipc")]
FileScan::Ipc { .. } => {},
// Other scan types do not yet support projection of e.g. only the row index or file path
// column - ensure at least 1 column is projected from the file.
_ => {
*projection = match &file_info.reader_schema {
Some(Either::Left(s)) => s.iter_names().next(),
Some(Either::Right(s)) => s.iter_names().next(),
None => None,
}
.into_iter()
.cloned()
.collect();

// TODO: Don't know why this works without needing to remove it
// later.
acc_projections.push(ColumnNode(
expr_arena.add(AExpr::Column(projection[0].clone())),
));
},
}
}
}

output_schema = if let Some(ref with_columns) = file_options.with_columns {
let mut schema = update_scan_schema(
&acc_projections,
Expand Down
23 changes: 15 additions & 8 deletions py-polars/tests/unit/io/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,7 +654,7 @@ def test_scan_nonexistent_path(format: str) -> None:
"streaming",
[True, False],
)
def test_scan_include_file_name(
def test_scan_include_file_paths(
tmp_path: Path,
scan_func: Callable[..., pl.LazyFrame],
write_func: Callable[[pl.DataFrame, Path], None],
Expand Down Expand Up @@ -684,13 +684,20 @@ def test_scan_include_file_name(
lf: pl.LazyFrame = f(tmp_path, include_file_paths="path")
assert_frame_equal(lf.collect(streaming=streaming), df)

# TODO: Support this with CSV
if scan_func not in [pl.scan_csv, pl.scan_ndjson]:
# Test projecting only the path column
assert_frame_equal(
lf.select("path").collect(streaming=streaming),
df.select("path"),
)
# Test projecting only the path column
q = lf.select("path")
assert q.collect_schema() == {"path": pl.String}
assert_frame_equal(
q.collect(streaming=streaming),
df.select("path"),
)

q = q.select("path").head(3)
assert q.collect_schema() == {"path": pl.String}
assert_frame_equal(
q.collect(streaming=streaming),
df.select("path").head(3),
)

# Test predicates
for predicate in [pl.col("path") != pl.col("x"), pl.col("path") != ""]:
Expand Down
Binary file added stream_test.arrow
Binary file not shown.
Loading