Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added a flush method to IPC writers #6108

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions arrow-ipc/src/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -982,6 +982,14 @@ impl<W: Write> FileWriter<W> {
self.writer.get_mut()
}

/// Flush the underlying writer.
///
/// Both the BufWriter and the underlying writer are flushed.
pub fn flush(&mut self) -> Result<(), ArrowError> {
self.writer.flush()?;
Ok(())
}

/// Unwraps the BufWriter housed in FileWriter.writer, returning the underlying
/// writer
///
Expand Down Expand Up @@ -1097,6 +1105,14 @@ impl<W: Write> StreamWriter<W> {
self.writer.get_mut()
}

/// Flush the underlying writer.
///
/// Both the BufWriter and the underlying writer are flushed.
pub fn flush(&mut self) -> Result<(), ArrowError> {
self.writer.flush()?;
Ok(())
}

/// Unwraps the BufWriter housed in StreamWriter.writer, returning the underlying
/// writer
///
Expand Down Expand Up @@ -2615,4 +2631,51 @@ mod tests {
offset from expected alignment of 16 by 8"
);
}

#[test]
fn test_flush() {
// We write a schema which is small enough to fit into a buffer and not get flushed,
// and then force the write with .flush().
let num_cols = 2;
let mut fields = Vec::new();
let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap();
for i in 0..num_cols {
let field = Field::new(&format!("col_{}", i), DataType::Decimal128(38, 10), true);
fields.push(field);
}
let schema = Schema::new(fields);
let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new());
let inner_file_writer = BufWriter::with_capacity(1024, Vec::new());
let mut stream_writer =
StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone())
.unwrap();
let mut file_writer =
FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap();

let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len();
let file_bytes_written_on_new = file_writer.get_ref().get_ref().len();
stream_writer.flush().unwrap();
file_writer.flush().unwrap();
let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len();
let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len();
let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap();
// Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes)
// and then a length of 0 (4 bytes) for a total of 8 bytes.
// Everything before that should have been flushed in the .flush() call.
let expected_stream_flushed_bytes = stream_out.len() - 8;
// A file write is the same as the stream write except for the leading magic string
// ARROW1 plus padding, which is 8 bytes.
let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8;

assert!(
stream_bytes_written_on_new < stream_bytes_written_on_flush,
"this test makes no sense if flush is not actually required"
);
assert!(
file_bytes_written_on_new < file_bytes_written_on_flush,
"this test makes no sense if flush is not actually required"
);
assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes);
assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes);
}
}
Loading