Skip to content

Commit

Permalink
fix(rust,python): Fix possibly incorrect order of columns when using …
Browse files Browse the repository at this point in the history
…ipc stream `with_columns` (#14859)
  • Loading branch information
mickvangelderen committed Mar 8, 2024
1 parent 5eeae45 commit 6188cbf
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 117 deletions.
25 changes: 25 additions & 0 deletions crates/polars-core/src/testing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,18 @@ impl PartialEq for DataFrame {
}
}

/// Asserts that two expressions of type [`DataFrame`] are equal according to [`DataFrame::equals`]
/// at runtime. If the expression are not equal, the program will panic with a message that displays
/// both dataframes.
#[macro_export]
macro_rules! assert_df_eq {
($a:expr, $b:expr $(,)?) => {
let a: &$crate::frame::DataFrame = &$a;
let b: &$crate::frame::DataFrame = &$b;
assert!(a.equals(b), "expected {:?}\nto equal {:?}", a, b);
};
}

#[cfg(test)]
mod test {
use crate::prelude::*;
Expand Down Expand Up @@ -194,6 +206,19 @@ mod test {
assert!(df1.equals(&df1))
}

#[test]
fn assert_df_eq_passes() {
let df = df!("a" => [1], "b" => [2]).unwrap();
assert_df_eq!(df, df);
drop(df); // Ensure `assert_df_eq!` does not consume its arguments.
}

#[test]
#[should_panic(expected = "to equal")]
fn assert_df_eq_panics() {
assert_df_eq!(df!("a" => [1]).unwrap(), df!("a" => [2]).unwrap(),);
}

#[test]
fn test_df_partialeq() {
let df1 = df!("a" => &[1, 2, 3],
Expand Down
39 changes: 2 additions & 37 deletions crates/polars-io/src/ipc/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,20 +166,14 @@ where
self.projection = Some(prj);
}

let sorted_projection = self.projection.clone().map(|mut proj| {
proj.sort_unstable();
proj
});

let schema = if let Some(projection) = &sorted_projection {
let schema = if let Some(projection) = &self.projection {
apply_projection(&metadata.schema, projection)
} else {
metadata.schema.clone()
};

let include_row_index = self.row_index.is_some();
let ipc_reader =
read::StreamReader::new(&mut self.reader, metadata.clone(), sorted_projection);
read::StreamReader::new(&mut self.reader, metadata.clone(), self.projection);
finish_reader(
ipc_reader,
rechunk,
Expand All @@ -188,35 +182,6 @@ where
&schema,
self.row_index,
)
.map(|df| fix_column_order(df, self.projection, include_row_index))
}
}

fn fix_column_order(
df: DataFrame,
projection: Option<Vec<usize>>,
include_row_index: bool,
) -> DataFrame {
if let Some(proj) = projection {
let offset = usize::from(include_row_index);
let mut args = (0..proj.len()).zip(proj).collect::<Vec<_>>();
// first el of tuple is argument index
// second el is the projection index
args.sort_unstable_by_key(|tpl| tpl.1);
let cols = df.get_columns();

let iter = args.iter().map(|tpl| cols[tpl.0 + offset].clone());
let cols = if include_row_index {
let mut new_cols = vec![df.get_columns()[0].clone()];
new_cols.extend(iter);
new_cols
} else {
iter.collect()
};

unsafe { DataFrame::new_no_checks(cols) }
} else {
df
}
}

Expand Down
170 changes: 94 additions & 76 deletions crates/polars/tests/it/io/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,95 +2,112 @@
mod test {
use std::io::Cursor;

use polars_core::df;
use polars_core::prelude::*;
use polars_core::{assert_df_eq, df};
use polars_io::ipc::*;
use polars_io::{SerReader, SerWriter};

use crate::io::create_df;

#[test]
fn write_and_read_ipc_stream() {
// Vec<T> : Write + Read
// Cursor<Vec<_>>: Seek
fn create_ipc_stream(mut df: DataFrame) -> Cursor<Vec<u8>> {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = create_df();

IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
.expect("failed to write ICP stream");

buf.set_position(0);

let df_read = IpcStreamReader::new(buf).finish().unwrap();
assert!(df.equals(&df_read));
buf
}

#[test]
fn write_and_read_ipc_stream() {
let df = create_df();

let reader = create_ipc_stream(df);

let actual = IpcStreamReader::new(reader).finish().unwrap();

let expected = create_df();
assert_df_eq!(actual, expected);
}

#[test]
fn test_read_ipc_stream_with_projection() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df = df!(
"a" => [1],
"b" => [2],
"c" => [3],
)
.unwrap();

IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);
let reader = create_ipc_stream(df);

let expected = df!("b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df_read = IpcStreamReader::new(buf)
let actual = IpcStreamReader::new(reader)
.with_projection(Some(vec![1, 2]))
.finish()
.unwrap();
assert_eq!(df_read.shape(), (3, 2));
df_read.equals(&expected);

let expected = df!(
"b" => [2],
"c" => [3],
)
.unwrap();
assert_df_eq!(actual, expected);
}

#[test]
fn test_read_ipc_stream_with_columns() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df = df!(
"a" => [1],
"b" => [2],
"c" => [3],
)
.unwrap();

IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);
let reader = create_ipc_stream(df);

let expected = df!("b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df_read = IpcStreamReader::new(buf)
let actual = IpcStreamReader::new(reader)
.with_columns(Some(vec!["c".to_string(), "b".to_string()]))
.finish()
.unwrap();
df_read.equals(&expected);

let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = df![
"a" => ["x", "y", "z"],
"b" => [123, 456, 789],
"c" => [4.5, 10.0, 10.0],
"d" => ["misc", "other", "value"],
]
let expected = df!(
"c" => [3],
"b" => [2],
)
.unwrap();
IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);
let expected = df![
"a" => ["x", "y", "z"],
"c" => [4.5, 10.0, 10.0],
"d" => ["misc", "other", "value"],
"b" => [123, 456, 789],
assert_df_eq!(actual, expected);
}

#[test]
fn test_read_ipc_stream_with_columns_reorder() {
let df = df![
"a" => [1],
"b" => [2],
"c" => [3],
]
.unwrap();
let df_read = IpcStreamReader::new(buf)

let reader = create_ipc_stream(df);

let actual = IpcStreamReader::new(reader)
.with_columns(Some(vec![
"a".to_string(),
"c".to_string(),
"d".to_string(),
"b".to_string(),
"c".to_string(),
"a".to_string(),
]))
.finish()
.unwrap();
df_read.equals(&expected);

let expected = df![
"b" => [2],
"c" => [3],
"a" => [1],
]
.unwrap();
assert_df_eq!(actual, expected);
}

#[test]
Expand All @@ -101,38 +118,39 @@ mod test {
}

#[test]
fn test_write_with_compression() {
let mut df = create_df();

let compressions = vec![None, Some(IpcCompression::LZ4), Some(IpcCompression::ZSTD)];

for compression in compressions.into_iter() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
IpcStreamWriter::new(&mut buf)
.with_compression(compression)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);

let df_read = IpcStreamReader::new(buf)
.finish()
.unwrap_or_else(|_| panic!("IPC reader: {:?}", compression));
assert!(df.equals(&df_read));
}
fn test_write_with_lz4_compression() {
test_write_with_compression(IpcCompression::LZ4);
}

#[test]
fn test_write_with_zstd_compression() {
test_write_with_compression(IpcCompression::ZSTD);
}

fn test_write_with_compression(compression: IpcCompression) {
let reader = {
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
IpcStreamWriter::new(&mut writer)
.with_compression(Some(compression))
.finish(&mut create_df())
.unwrap();
writer.set_position(0);
writer
};

let actual = IpcStreamReader::new(reader).finish().unwrap();
assert_df_eq!(actual, create_df());
}

#[test]
fn write_and_read_ipc_stream_empty_series() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let chunked_array = Float64Chunked::new("empty", &[0_f64; 0]);
let mut df = DataFrame::new(vec![chunked_array.into_series()]).unwrap();
IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
fn df() -> DataFrame {
DataFrame::new(vec![Float64Chunked::new("empty", &[0_f64; 0]).into_series()]).unwrap()
}

buf.set_position(0);
let reader = create_ipc_stream(df());

let df_read = IpcStreamReader::new(buf).finish().unwrap();
assert!(df.equals(&df_read));
let actual = IpcStreamReader::new(reader).finish().unwrap();
assert_df_eq!(df(), actual);
}
}
21 changes: 17 additions & 4 deletions py-polars/tests/unit/io/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,28 @@ def test_select_columns_from_file(

@pytest.mark.parametrize("stream", [True, False])
def test_select_columns_from_buffer(stream: bool) -> None:
df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]})
expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]})
df = pl.DataFrame(
{
"a": [1],
"b": [2],
"c": [3],
}
)

f = io.BytesIO()
write_ipc(df, stream, f)
f.seek(0)

read_df = read_ipc(stream, f, columns=["b", "c"], use_pyarrow=False)
assert_frame_equal(expected, read_df)
actual = read_ipc(stream, f, columns=["b", "c", "a"], use_pyarrow=False)

expected = pl.DataFrame(
{
"b": [2],
"c": [3],
"a": [1],
}
)
assert_frame_equal(expected, actual)


@pytest.mark.parametrize("stream", [True, False])
Expand Down

0 comments on commit 6188cbf

Please sign in to comment.