Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Fix duplicate column output and panic for include_file_paths #18255

Merged
merged 2 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/polars-core/src/chunked_array/ops/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ pub trait ChunkFilter<T: PolarsDataType> {
/// Create a new ChunkedArray filled with values at that index.
pub trait ChunkExpandAtIndex<T: PolarsDataType> {
/// Create a new ChunkedArray filled with values at that index.
fn new_from_index(&self, length: usize, index: usize) -> ChunkedArray<T>;
fn new_from_index(&self, index: usize, length: usize) -> ChunkedArray<T>;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

index and length were flipped on the trait definition

}

macro_rules! impl_chunk_expand {
Expand Down Expand Up @@ -536,7 +536,7 @@ impl ChunkExpandAtIndex<ListType> for ListChunked {

#[cfg(feature = "dtype-struct")]
impl ChunkExpandAtIndex<StructType> for StructChunked {
fn new_from_index(&self, length: usize, index: usize) -> ChunkedArray<StructType> {
fn new_from_index(&self, index: usize, length: usize) -> ChunkedArray<StructType> {
let (chunk_idx, idx) = self.index_to_chunked_index(index);
let chunk = self.downcast_chunks().get(chunk_idx).unwrap();
let chunk = if chunk.is_null(idx) {
Expand Down
11 changes: 9 additions & 2 deletions crates/polars-core/src/frame/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1171,8 +1171,15 @@ impl DataFrame {
/// # Safety
/// The caller must ensure `column.len() == self.height()` .
pub unsafe fn with_column_unchecked(&mut self, column: Series) -> &mut Self {
self.get_columns_mut().push(column);
self
#[cfg(debug_assertions)]
{
return self.with_column(column).unwrap();
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

drive-by - enable checks here in debug mode to prevent similar mistakes

}
#[cfg(not(debug_assertions))]
{
self.get_columns_mut().push(column);
self
}
}

fn add_column_by_schema(&mut self, s: Series, schema: &Schema) -> PolarsResult<()> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ impl SeriesTrait for SeriesWrap<StructChunked> {
}

fn new_from_index(&self, _index: usize, _length: usize) -> Series {
self.0.new_from_index(_length, _index).into_series()
self.0.new_from_index(_index, _length).into_series()
}

fn cast(&self, dtype: &DataType, cast_options: CastOptions) -> PolarsResult<Series> {
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-io/src/parquet/read/read_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1022,7 +1022,7 @@ impl BatchedParquetReader {

// Re-use the same ChunkedArray
if ca.len() < max_len {
*ca = ca.new_from_index(max_len, 0);
*ca = ca.new_from_index(0, max_len);
}

for df in &mut dfs {
Expand Down
19 changes: 2 additions & 17 deletions crates/polars-mem-engine/src/executors/scan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,15 +187,14 @@ impl ParquetExec {
readers_and_metadata
.into_par_iter()
.zip(row_statistics.into_par_iter())
.enumerate()
.map(
|(i, ((reader, _, predicate, projection), (cumulative_read, slice)))| {
|((reader, _, predicate, projection), (cumulative_read, slice))| {
let row_index = base_row_index.as_ref().map(|rc| RowIndex {
name: rc.name.clone(),
offset: rc.offset + cumulative_read as IdxSize,
});

let mut df = reader
let df = reader
.with_slice(Some(slice))
.with_row_index(row_index)
.with_predicate(predicate.clone())
Expand All @@ -210,20 +209,6 @@ impl ParquetExec {
)?
.finish()?;

if let Some(col) = &self.file_options.include_file_paths {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was duplicate from a very old implementation approach that didn't push include_file_paths into the actual reader.

let path = paths[i].to_str().unwrap();
unsafe {
df.with_column_unchecked(
StringChunked::full(
col,
path,
std::cmp::max(df.height(), slice.1),
)
.into_series(),
)
};
}

Ok(df)
},
)
Expand Down
2 changes: 1 addition & 1 deletion crates/polars-pipe/src/executors/sources/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ impl Source for CsvSource {

if let Some(ca) = &mut self.include_file_path {
if ca.len() < max_height {
*ca = ca.new_from_index(max_height, 0);
*ca = ca.new_from_index(0, max_height);
};

for data_chunk in &mut out {
Expand Down
12 changes: 4 additions & 8 deletions py-polars/tests/unit/io/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -640,18 +640,14 @@ def test_scan_include_file_name(
streaming: bool,
) -> None:
tmp_path.mkdir(exist_ok=True)
paths: list[Path] = []
dfs: list[pl.DataFrame] = []

for x in ["1", "2"]:
paths.append(Path(f"{tmp_path}/{x}.bin").absolute())
dfs.append(pl.DataFrame({"x": x}))
write_func(dfs[-1], paths[-1])

df = pl.concat(dfs).with_columns(
pl.Series("path", map(str, paths), dtype=pl.String)
)
path = Path(f"{tmp_path}/{x}.bin").absolute()
dfs.append(pl.DataFrame({"x": 10 * [x]}).with_columns(path=pl.lit(str(path))))
write_func(dfs[-1].drop("path"), path)

df = pl.concat(dfs)
assert df.columns == ["x", "path"]

with pytest.raises(
Expand Down