From 24a461ccb0b7ec858ea2d392a3f4082e7afaa8d1 Mon Sep 17 00:00:00 2001 From: Mateusz Gienieczko Date: Wed, 24 Jul 2024 14:23:21 +0200 Subject: [PATCH] added a flush method to IPC writers While the writers expose `get_ref` and `get_mut` to access the underlying `io::Write` writer, there is an internal layer of a `BufWriter` that is not accessible. Because of that, there is no way to ensure that all messages written thus far to the `StreamWriter` or `FileWriter` have actually been passed to the underlying writer. Here we expose a `flush` method that flushes the internal buffer and the underlying writer. See #6099 for the discussion. --- arrow-ipc/src/writer.rs | 63 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/arrow-ipc/src/writer.rs b/arrow-ipc/src/writer.rs index d0a78ca2702e..5a8adb31b038 100644 --- a/arrow-ipc/src/writer.rs +++ b/arrow-ipc/src/writer.rs @@ -982,6 +982,14 @@ impl FileWriter { self.writer.get_mut() } + /// Flush the underlying writer. + /// + /// Both the BufWriter and the underlying writer are flushed. + pub fn flush(&mut self) -> Result<(), ArrowError> { + self.writer.flush()?; + Ok(()) + } + /// Unwraps the BufWriter housed in FileWriter.writer, returning the underlying /// writer /// @@ -1097,6 +1105,14 @@ impl StreamWriter { self.writer.get_mut() } + /// Flush the underlying writer. + /// + /// Both the BufWriter and the underlying writer are flushed. + pub fn flush(&mut self) -> Result<(), ArrowError> { + self.writer.flush()?; + Ok(()) + } + /// Unwraps the BufWriter housed in StreamWriter.writer, returning the underlying /// writer /// @@ -2615,4 +2631,51 @@ mod tests { offset from expected alignment of 16 by 8" ); } + + #[test] + fn test_flush() { + // We write a schema which is small enough to fit into a buffer and not get flushed, + // and then force the write with .flush(). + let num_cols = 2; + let mut fields = Vec::new(); + let options = IpcWriteOptions::try_new(8, false, MetadataVersion::V5).unwrap(); + for i in 0..num_cols { + let field = Field::new(&format!("col_{}", i), DataType::Decimal128(38, 10), true); + fields.push(field); + } + let schema = Schema::new(fields); + let inner_stream_writer = BufWriter::with_capacity(1024, Vec::new()); + let inner_file_writer = BufWriter::with_capacity(1024, Vec::new()); + let mut stream_writer = + StreamWriter::try_new_with_options(inner_stream_writer, &schema, options.clone()) + .unwrap(); + let mut file_writer = + FileWriter::try_new_with_options(inner_file_writer, &schema, options).unwrap(); + + let stream_bytes_written_on_new = stream_writer.get_ref().get_ref().len(); + let file_bytes_written_on_new = file_writer.get_ref().get_ref().len(); + stream_writer.flush().unwrap(); + file_writer.flush().unwrap(); + let stream_bytes_written_on_flush = stream_writer.get_ref().get_ref().len(); + let file_bytes_written_on_flush = file_writer.get_ref().get_ref().len(); + let stream_out = stream_writer.into_inner().unwrap().into_inner().unwrap(); + // Finishing a stream writes the continuation bytes in MetadataVersion::V5 (4 bytes) + // and then a length of 0 (4 bytes) for a total of 8 bytes. + // Everything before that should have been flushed in the .flush() call. + let expected_stream_flushed_bytes = stream_out.len() - 8; + // A file write is the same as the stream write except for the leading magic string + // ARROW1 plus padding, which is 8 bytes. + let expected_file_flushed_bytes = expected_stream_flushed_bytes + 8; + + assert!( + stream_bytes_written_on_new < stream_bytes_written_on_flush, + "this test makes no sense if flush is not actually required" + ); + assert!( + file_bytes_written_on_new < file_bytes_written_on_flush, + "this test makes no sense if flush is not actually required" + ); + assert_eq!(stream_bytes_written_on_flush, expected_stream_flushed_bytes); + assert_eq!(file_bytes_written_on_flush, expected_file_flushed_bytes); + } }