Skip to content

Commit

Permalink
Refactor rust ipc_stream tests
Browse files Browse the repository at this point in the history
  • Loading branch information
mickvangelderen committed Mar 6, 2024
1 parent ef08a86 commit 698a276
Showing 1 changed file with 101 additions and 75 deletions.
176 changes: 101 additions & 75 deletions crates/polars/tests/it/io/ipc_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,88 +9,113 @@ mod test {

use crate::io::create_df;

#[test]
fn write_and_read_ipc_stream() {
// Vec<T> : Write + Read
// Cursor<Vec<_>>: Seek
fn create_ipc_stream(mut df: DataFrame) -> Cursor<Vec<u8>> {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = create_df();

IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
.expect("failed to write ICP stream");

buf.set_position(0);

let df_read = IpcStreamReader::new(buf).finish().unwrap();
assert!(df.equals(&df_read));
buf
}

macro_rules! assert_df_eq {
($a:expr, $b:expr) => {
let a: &polars_core::frame::DataFrame = &$a;
let b: &polars_core::frame::DataFrame = &$b;
assert!(a.equals(b), "expected {:?}\nto equal {:?}", a, b);
};
}

#[test]
fn write_and_read_ipc_stream() {
let df = create_df();

let reader = create_ipc_stream(df);

let actual = IpcStreamReader::new(reader).finish().unwrap();

let expected = create_df();
assert_df_eq!(actual, expected);
}

#[test]
fn test_read_ipc_stream_with_projection() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df = df!(
"a" => [1],
"b" => [2],
"c" => [3],
)
.unwrap();

IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);
let reader = create_ipc_stream(df);

let expected = df!("b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df_read = IpcStreamReader::new(buf)
let actual = IpcStreamReader::new(reader)
.with_projection(Some(vec![1, 2]))
.finish()
.unwrap();
assert_eq!(df_read.shape(), (3, 2));
assert!(df_read.equals(&expected));

let expected = df!(
"b" => [2],
"c" => [3],
)
.unwrap();
assert_df_eq!(actual, expected);
}

#[test]
fn test_read_ipc_stream_with_columns() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = df!("a" => [1, 2, 3], "b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df = df!(
"a" => [1],
"b" => [2],
"c" => [3],
)
.unwrap();

IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);
let reader = create_ipc_stream(df);

let expected = df!("b" => [2, 3, 4], "c" => [3, 4, 5]).unwrap();
let df_read = IpcStreamReader::new(buf)
let actual = IpcStreamReader::new(reader)
.with_columns(Some(vec!["c".to_string(), "b".to_string()]))
.finish()
.unwrap();
assert!(df_read.equals(&expected));

let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let mut df = df![
"a" => ["x", "y", "z"],
"b" => [123, 456, 789],
"c" => [4.5, 10.0, 10.0],
"d" => ["misc", "other", "value"],
]
let expected = df!(
"c" => [3],
"b" => [2],
)
.unwrap();
IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);
let expected = df![
"a" => ["x", "y", "z"],
"c" => [4.5, 10.0, 10.0],
"d" => ["misc", "other", "value"],
"b" => [123, 456, 789],
assert_df_eq!(actual, expected);
}

#[test]
fn test_read_ipc_stream_with_columns_reorder() {
let df = df![
"a" => [1],
"b" => [2],
"c" => [3],
]
.unwrap();
let df_read = IpcStreamReader::new(buf)

let reader = create_ipc_stream(df);

let actual = IpcStreamReader::new(reader)
.with_columns(Some(vec![
"a".to_string(),
"c".to_string(),
"d".to_string(),
"b".to_string(),
"c".to_string(),
"a".to_string(),
]))
.finish()
.unwrap();
assert!(df_read.equals(&expected));

let expected = df![
"b" => [2],
"c" => [3],
"a" => [1],
]
.unwrap();
assert_df_eq!(actual, expected);
}

#[test]
Expand All @@ -101,38 +126,39 @@ mod test {
}

#[test]
fn test_write_with_compression() {
let mut df = create_df();

let compressions = vec![None, Some(IpcCompression::LZ4), Some(IpcCompression::ZSTD)];

for compression in compressions.into_iter() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
IpcStreamWriter::new(&mut buf)
.with_compression(compression)
.finish(&mut df)
.expect("ipc writer");
buf.set_position(0);

let df_read = IpcStreamReader::new(buf)
.finish()
.unwrap_or_else(|_| panic!("IPC reader: {:?}", compression));
assert!(df.equals(&df_read));
}
fn test_write_with_lz4_compression() {
test_write_with_compression(IpcCompression::LZ4);
}

#[test]
fn test_write_with_zstd_compression() {
test_write_with_compression(IpcCompression::ZSTD);
}

fn test_write_with_compression(compression: IpcCompression) {
let reader = {
let mut writer: Cursor<Vec<u8>> = Cursor::new(Vec::new());
IpcStreamWriter::new(&mut writer)
.with_compression(Some(compression))
.finish(&mut create_df())
.unwrap();
writer.set_position(0);
writer
};

let actual = IpcStreamReader::new(reader).finish().unwrap();
assert_df_eq!(actual, create_df());
}

#[test]
fn write_and_read_ipc_stream_empty_series() {
let mut buf: Cursor<Vec<u8>> = Cursor::new(Vec::new());
let chunked_array = Float64Chunked::new("empty", &[0_f64; 0]);
let mut df = DataFrame::new(vec![chunked_array.into_series()]).unwrap();
IpcStreamWriter::new(&mut buf)
.finish(&mut df)
.expect("ipc writer");
fn df() -> DataFrame {
DataFrame::new(vec![Float64Chunked::new("empty", &[0_f64; 0]).into_series()]).unwrap()
}

buf.set_position(0);
let reader = create_ipc_stream(df());

let df_read = IpcStreamReader::new(buf).finish().unwrap();
assert!(df.equals(&df_read));
let actual = IpcStreamReader::new(reader).finish().unwrap();
assert_df_eq!(df(), actual);
}
}

0 comments on commit 698a276

Please sign in to comment.