Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(rust,python): Fix possibly incorrect order of columns when using ipc stream with_columns #14859

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions crates/polars-core/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ impl PartialEq for DataFrame {
}
}

/// Asserts that two expressions of type [`DataFrame`] are equal according to [`DataFrame::equals`]
/// at runtime. If the expression are not equal, the program will panic with a message that displays
/// both dataframes.
#[macro_export]
macro_rules! assert_df_eq {
($a:expr, $b:expr $(,)?) => {
let a: &$crate::frame::DataFrame = &$a;
let b: &$crate::frame::DataFrame = &$b;
assert!(a.equals(b), "expected {:?}\nto equal {:?}", a, b);
};
}

#[cfg(test)]
mod test {
use crate::prelude::*;
Expand Down Expand Up @@ -194,6 +206,19 @@ mod test {
assert!(df1.equals(&df1))
}

#[test]
fn assert_df_eq_passes() {
let df = df!("a" => [1], "b" => [2]).unwrap();
assert_df_eq!(df, df);
drop(df); // Ensure `assert_df_eq!` does not consume its arguments.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does assert_eq take a ref by default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so, although it is not entirely obvious to me why the implementation of assert_eq is what it is.

}

#[test]
#[should_panic(expected = "to equal")]
fn assert_df_eq_panics() {
assert_df_eq!(df!("a" => [1]).unwrap(), df!("a" => [2]).unwrap(),);
}

#[test]
fn test_df_partialeq() {
let df1 = df!("a" => &[1, 2, 3],
Expand Down
39 changes: 2 additions & 37 deletions crates/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,20 +166,14 @@ where
self.projection = Some(prj);
}

let sorted_projection = self.projection.clone().map(|mut proj| {
proj.sort_unstable();
proj
});

let schema = if let Some(projection) = &sorted_projection {
let schema = if let Some(projection) = &self.projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};

let include_row_index = self.row_index.is_some();
let ipc_reader =
read::StreamReader::new(&mut self.reader, metadata.clone(), sorted_projection);
mickvangelderen marked this conversation as resolved.
Show resolved Hide resolved
read::StreamReader::new(&mut self.reader, metadata.clone(), self.projection);
finish_reader(
ipc_reader,
rechunk,
Expand All @@ -188,35 +182,6 @@ where
&schema,
self.row_index,
)
.map(|df| fix_column_order(df, self.projection, include_row_index))
}
}

fn fix_column_order(
df: DataFrame,
projection: Option<Vec<usize>>,
include_row_index: bool,
) -> DataFrame {
if let Some(proj) = projection {
let offset = usize::from(include_row_index);
let mut args = (0..proj.len()).zip(proj).collect::<Vec<_>>();
// first el of tuple is argument index
// second el is the projection index
args.sort_unstable_by_key(|tpl| tpl.1);
let cols = df.get_columns();

let iter = args.iter().map(|tpl| cols[tpl.0 + offset].clone());
let cols = if include_row_index {
let mut new_cols = vec![df.get_columns()[0].clone()];
new_cols.extend(iter);
new_cols
} else {
iter.collect()
};

unsafe { DataFrame::new_no_checks(cols) }
} else {
df
}
}

Expand Down
170 changes: 94 additions & 76 deletions crates/polars/tests/it/io/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,95 +2,112 @@
mod test {
use std::io::Cursor;

use polars_core::df;
use polars_core::prelude::*;
use polars_core::{assert_df_eq, df};
use polars_io::ipc::*;
use polars_io::{SerReader, SerWriter};

use crate::io::create_df;

#[test]
fn write_and_read_ipc_stream() {
// Vec<T> : Write + Read
// Cursor<Vec<_>>: Seek
fn create_ipc_stream(mut df: DataFrame) -> Cursor<Vec<u8>> {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = create_df();

IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
.expect("failed to write ICP stream");

buf.set_position(0);

let df_read = IpcStreamReader::new(buf).finish().unwrap();
assert!(df.equals(&df_read));
buf
}

#[test]
fn write_and_read_ipc_stream() {
let df = create_df();

let reader = create_ipc_stream(df);

let actual = IpcStreamReader::new(reader).finish().unwrap();

let expected = create_df();
assert_df_eq!(actual, expected);
}

#[test]
fn test_read_ipc_stream_with_projection() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df = df!(
"a" => [1],
"b" => [2],
"c" => [3],
)
.unwrap();

IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);
let reader = create_ipc_stream(df);

let expected = df!("b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df_read = IpcStreamReader::new(buf)
let actual = IpcStreamReader::new(reader)
.with_projection(Some(vec![1, 2]))
.finish()
.unwrap();
assert_eq!(df_read.shape(), (3, 2));
df_read.equals(&expected);

let expected = df!(
"b" => [2],
"c" => [3],
)
.unwrap();
assert_df_eq!(actual, expected);
}

#[test]
fn test_read_ipc_stream_with_columns() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df = df!(
"a" => [1],
"b" => [2],
"c" => [3],
)
.unwrap();

IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);
let reader = create_ipc_stream(df);

let expected = df!("b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df_read = IpcStreamReader::new(buf)
let actual = IpcStreamReader::new(reader)
.with_columns(Some(vec!["c".to_string(), "b".to_string()]))
.finish()
.unwrap();
df_read.equals(&expected);

let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = df![
"a" => ["x", "y", "z"],
"b" => [123, 456, 789],
"c" => [4.5, 10.0, 10.0],
"d" => ["misc", "other", "value"],
]
let expected = df!(
"c" => [3],
"b" => [2],
)
.unwrap();
IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);
let expected = df![
"a" => ["x", "y", "z"],
"c" => [4.5, 10.0, 10.0],
"d" => ["misc", "other", "value"],
"b" => [123, 456, 789],
assert_df_eq!(actual, expected);
}

#[test]
fn test_read_ipc_stream_with_columns_reorder() {
let df = df![
"a" => [1],
"b" => [2],
"c" => [3],
]
.unwrap();
let df_read = IpcStreamReader::new(buf)

let reader = create_ipc_stream(df);

let actual = IpcStreamReader::new(reader)
.with_columns(Some(vec![
"a".to_string(),
"c".to_string(),
"d".to_string(),
"b".to_string(),
"c".to_string(),
"a".to_string(),
]))
.finish()
.unwrap();
df_read.equals(&expected);

let expected = df![
"b" => [2],
"c" => [3],
"a" => [1],
]
.unwrap();
assert_df_eq!(actual, expected);
}

#[test]
Expand All @@ -101,38 +118,39 @@ mod test {
}

#[test]
fn test_write_with_compression() {
let mut df = create_df();

let compressions = vec![None, Some(IpcCompression::LZ4), Some(IpcCompression::ZSTD)];

for compression in compressions.into_iter() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
IpcStreamWriter::new(&mut buf)
.with_compression(compression)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);

let df_read = IpcStreamReader::new(buf)
.finish()
.unwrap_or_else(|_| panic!("IPC reader: {:?}", compression));
assert!(df.equals(&df_read));
}
fn test_write_with_lz4_compression() {
test_write_with_compression(IpcCompression::LZ4);
}

#[test]
fn test_write_with_zstd_compression() {
test_write_with_compression(IpcCompression::ZSTD);
}

fn test_write_with_compression(compression: IpcCompression) {
let reader = {
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
IpcStreamWriter::new(&mut writer)
.with_compression(Some(compression))
.finish(&mut create_df())
.unwrap();
writer.set_position(0);
writer
};

let actual = IpcStreamReader::new(reader).finish().unwrap();
assert_df_eq!(actual, create_df());
}

#[test]
fn write_and_read_ipc_stream_empty_series() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let chunked_array = Float64Chunked::new("empty", &[0_f64; 0]);
let mut df = DataFrame::new(vec![chunked_array.into_series()]).unwrap();
IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
fn df() -> DataFrame {
DataFrame::new(vec![Float64Chunked::new("empty", &[0_f64; 0]).into_series()]).unwrap()
}

buf.set_position(0);
let reader = create_ipc_stream(df());

let df_read = IpcStreamReader::new(buf).finish().unwrap();
assert!(df.equals(&df_read));
let actual = IpcStreamReader::new(reader).finish().unwrap();
assert_df_eq!(df(), actual);
}
}
21 changes: 17 additions & 4 deletions py-polars/tests/unit/io/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,28 @@ def test_select_columns_from_file(

@pytest.mark.parametrize("stream", [True, False])
def test_select_columns_from_buffer(stream: bool) -> None:
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
df = pl.DataFrame(
{
"a": [1],
"b": [2],
"c": [3],
}
)

f = io.BytesIO()
write_ipc(df, stream, f)
f.seek(0)

read_df = read_ipc(stream, f, columns=["b", "c"], use_pyarrow=False)
assert_frame_equal(expected, read_df)
actual = read_ipc(stream, f, columns=["b", "c", "a"], use_pyarrow=False)

expected = pl.DataFrame(
{
"b": [2],
"c": [3],
"a": [1],
}
)
assert_frame_equal(expected, actual)


@pytest.mark.parametrize("stream", [True, False])
Expand Down
Loading