diff --git a/crates/polars-core/src/testing.rs b/crates/polars-core/src/testing.rs index 82003da6f0c2..99c28a617b2b 100644 --- a/crates/polars-core/src/testing.rs +++ b/crates/polars-core/src/testing.rs @@ -164,6 +164,18 @@ impl PartialEq for DataFrame { } } +/// Asserts that two expressions of type [`DataFrame`] are equal according to [`DataFrame::equals`] +/// at runtime. If the expression are not equal, the program will panic with a message that displays +/// both dataframes. +#[macro_export] +macro_rules! assert_df_eq { + ($a:expr, $b:expr $(,)?) => { + let a: &$crate::frame::DataFrame = &$a; + let b: &$crate::frame::DataFrame = &$b; + assert!(a.equals(b), "expected {:?}\nto equal {:?}", a, b); + }; +} + #[cfg(test)] mod test { use crate::prelude::*; @@ -194,6 +206,19 @@ mod test { assert!(df1.equals(&df1)) } + #[test] + fn assert_df_eq_passes() { + let df = df!("a" => [1], "b" => [2]).unwrap(); + assert_df_eq!(df, df); + drop(df); // Ensure `assert_df_eq!` does not consume its arguments. + } + + #[test] + #[should_panic(expected = "to equal")] + fn assert_df_eq_panics() { + assert_df_eq!(df!("a" => [1]).unwrap(), df!("a" => [2]).unwrap(),); + } + #[test] fn test_df_partialeq() { let df1 = df!("a" => &[1, 2, 3], diff --git a/crates/polars-io/src/ipc/ipc_stream.rs b/crates/polars-io/src/ipc/ipc_stream.rs index 0e1842712a13..e748f670ad3b 100644 --- a/crates/polars-io/src/ipc/ipc_stream.rs +++ b/crates/polars-io/src/ipc/ipc_stream.rs @@ -166,20 +166,14 @@ where self.projection = Some(prj); } - let sorted_projection = self.projection.clone().map(|mut proj| { - proj.sort_unstable(); - proj - }); - - let schema = if let Some(projection) = &sorted_projection { + let schema = if let Some(projection) = &self.projection { apply_projection(&metadata.schema, projection) } else { metadata.schema.clone() }; - let include_row_index = self.row_index.is_some(); let ipc_reader = - read::StreamReader::new(&mut self.reader, metadata.clone(), sorted_projection); + read::StreamReader::new(&mut self.reader, metadata.clone(), self.projection); finish_reader( ipc_reader, rechunk, @@ -188,35 +182,6 @@ where &schema, self.row_index, ) - .map(|df| fix_column_order(df, self.projection, include_row_index)) - } -} - -fn fix_column_order( - df: DataFrame, - projection: Option>, - include_row_index: bool, -) -> DataFrame { - if let Some(proj) = projection { - let offset = usize::from(include_row_index); - let mut args = (0..proj.len()).zip(proj).collect::>(); - // first el of tuple is argument index - // second el is the projection index - args.sort_unstable_by_key(|tpl| tpl.1); - let cols = df.get_columns(); - - let iter = args.iter().map(|tpl| cols[tpl.0 + offset].clone()); - let cols = if include_row_index { - let mut new_cols = vec![df.get_columns()[0].clone()]; - new_cols.extend(iter); - new_cols - } else { - iter.collect() - }; - - unsafe { DataFrame::new_no_checks(cols) } - } else { - df } } diff --git a/crates/polars/tests/it/io/ipc_stream.rs b/crates/polars/tests/it/io/ipc_stream.rs index 2010db6b543c..18d67990cb53 100644 --- a/crates/polars/tests/it/io/ipc_stream.rs +++ b/crates/polars/tests/it/io/ipc_stream.rs @@ -2,95 +2,112 @@ mod test { use std::io::Cursor; - use polars_core::df; use polars_core::prelude::*; + use polars_core::{assert_df_eq, df}; use polars_io::ipc::*; use polars_io::{SerReader, SerWriter}; use crate::io::create_df; - #[test] - fn write_and_read_ipc_stream() { - // Vec : Write + Read - // Cursor>: Seek + fn create_ipc_stream(mut df: DataFrame) -> Cursor> { let mut buf: Cursor> = Cursor::new(Vec::new()); - let mut df = create_df(); IpcStreamWriter::new(&mut buf) .finish(&mut df) - .expect("ipc writer"); + .expect("failed to write ICP stream"); buf.set_position(0); - let df_read = IpcStreamReader::new(buf).finish().unwrap(); - assert!(df.equals(&df_read)); + buf + } + + #[test] + fn write_and_read_ipc_stream() { + let df = create_df(); + + let reader = create_ipc_stream(df); + + let actual = IpcStreamReader::new(reader).finish().unwrap(); + + let expected = create_df(); + assert_df_eq!(actual, expected); } #[test] fn test_read_ipc_stream_with_projection() { - let mut buf: Cursor> = Cursor::new(Vec::new()); - let mut df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap(); + let df = df!( + "a" => [1], + "b" => [2], + "c" => [3], + ) + .unwrap(); - IpcStreamWriter::new(&mut buf) - .finish(&mut df) - .expect("ipc writer"); - buf.set_position(0); + let reader = create_ipc_stream(df); - let expected = df!("b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap(); - let df_read = IpcStreamReader::new(buf) + let actual = IpcStreamReader::new(reader) .with_projection(Some(vec![1, 2])) .finish() .unwrap(); - assert_eq!(df_read.shape(), (3, 2)); - df_read.equals(&expected); + + let expected = df!( + "b" => [2], + "c" => [3], + ) + .unwrap(); + assert_df_eq!(actual, expected); } #[test] fn test_read_ipc_stream_with_columns() { - let mut buf: Cursor> = Cursor::new(Vec::new()); - let mut df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap(); + let df = df!( + "a" => [1], + "b" => [2], + "c" => [3], + ) + .unwrap(); - IpcStreamWriter::new(&mut buf) - .finish(&mut df) - .expect("ipc writer"); - buf.set_position(0); + let reader = create_ipc_stream(df); - let expected = df!("b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap(); - let df_read = IpcStreamReader::new(buf) + let actual = IpcStreamReader::new(reader) .with_columns(Some(vec!["c".to_string(), "b".to_string()])) .finish() .unwrap(); - df_read.equals(&expected); - let mut buf: Cursor> = Cursor::new(Vec::new()); - let mut df = df![ - "a" => ["x", "y", "z"], - "b" => [123, 456, 789], - "c" => [4.5, 10.0, 10.0], - "d" => ["misc", "other", "value"], - ] + let expected = df!( + "c" => [3], + "b" => [2], + ) .unwrap(); - IpcStreamWriter::new(&mut buf) - .finish(&mut df) - .expect("ipc writer"); - buf.set_position(0); - let expected = df![ - "a" => ["x", "y", "z"], - "c" => [4.5, 10.0, 10.0], - "d" => ["misc", "other", "value"], - "b" => [123, 456, 789], + assert_df_eq!(actual, expected); + } + + #[test] + fn test_read_ipc_stream_with_columns_reorder() { + let df = df![ + "a" => [1], + "b" => [2], + "c" => [3], ] .unwrap(); - let df_read = IpcStreamReader::new(buf) + + let reader = create_ipc_stream(df); + + let actual = IpcStreamReader::new(reader) .with_columns(Some(vec![ - "a".to_string(), - "c".to_string(), - "d".to_string(), "b".to_string(), + "c".to_string(), + "a".to_string(), ])) .finish() .unwrap(); - df_read.equals(&expected); + + let expected = df![ + "b" => [2], + "c" => [3], + "a" => [1], + ] + .unwrap(); + assert_df_eq!(actual, expected); } #[test] @@ -101,38 +118,39 @@ mod test { } #[test] - fn test_write_with_compression() { - let mut df = create_df(); - - let compressions = vec![None, Some(IpcCompression::LZ4), Some(IpcCompression::ZSTD)]; - - for compression in compressions.into_iter() { - let mut buf: Cursor> = Cursor::new(Vec::new()); - IpcStreamWriter::new(&mut buf) - .with_compression(compression) - .finish(&mut df) - .expect("ipc writer"); - buf.set_position(0); - - let df_read = IpcStreamReader::new(buf) - .finish() - .unwrap_or_else(|_| panic!("IPC reader: {:?}", compression)); - assert!(df.equals(&df_read)); - } + fn test_write_with_lz4_compression() { + test_write_with_compression(IpcCompression::LZ4); + } + + #[test] + fn test_write_with_zstd_compression() { + test_write_with_compression(IpcCompression::ZSTD); + } + + fn test_write_with_compression(compression: IpcCompression) { + let reader = { + let mut writer: Cursor> = Cursor::new(Vec::new()); + IpcStreamWriter::new(&mut writer) + .with_compression(Some(compression)) + .finish(&mut create_df()) + .unwrap(); + writer.set_position(0); + writer + }; + + let actual = IpcStreamReader::new(reader).finish().unwrap(); + assert_df_eq!(actual, create_df()); } #[test] fn write_and_read_ipc_stream_empty_series() { - let mut buf: Cursor> = Cursor::new(Vec::new()); - let chunked_array = Float64Chunked::new("empty", &[0_f64; 0]); - let mut df = DataFrame::new(vec![chunked_array.into_series()]).unwrap(); - IpcStreamWriter::new(&mut buf) - .finish(&mut df) - .expect("ipc writer"); + fn df() -> DataFrame { + DataFrame::new(vec![Float64Chunked::new("empty", &[0_f64; 0]).into_series()]).unwrap() + } - buf.set_position(0); + let reader = create_ipc_stream(df()); - let df_read = IpcStreamReader::new(buf).finish().unwrap(); - assert!(df.equals(&df_read)); + let actual = IpcStreamReader::new(reader).finish().unwrap(); + assert_df_eq!(df(), actual); } } diff --git a/py-polars/tests/unit/io/test_ipc.py b/py-polars/tests/unit/io/test_ipc.py index 41b47c135ec8..8c03e3ee3136 100644 --- a/py-polars/tests/unit/io/test_ipc.py +++ b/py-polars/tests/unit/io/test_ipc.py @@ -85,15 +85,28 @@ def test_select_columns_from_file( @pytest.mark.parametrize("stream", [True, False]) def test_select_columns_from_buffer(stream: bool) -> None: - df = pl.DataFrame({"a": [1, 2, 3], "b": [True, False, True], "c": ["a", "b", "c"]}) - expected = pl.DataFrame({"b": [True, False, True], "c": ["a", "b", "c"]}) + df = pl.DataFrame( + { + "a": [1], + "b": [2], + "c": [3], + } + ) f = io.BytesIO() write_ipc(df, stream, f) f.seek(0) - read_df = read_ipc(stream, f, columns=["b", "c"], use_pyarrow=False) - assert_frame_equal(expected, read_df) + actual = read_ipc(stream, f, columns=["b", "c", "a"], use_pyarrow=False) + + expected = pl.DataFrame( + { + "b": [2], + "c": [3], + "a": [1], + } + ) + assert_frame_equal(expected, actual) @pytest.mark.parametrize("stream", [True, False])