diff --git a/src/diskio/immediate.rs b/src/diskio/immediate.rs index 24d30f5e9c..90a395c459 100644 --- a/src/diskio/immediate.rs +++ b/src/diskio/immediate.rs @@ -136,6 +136,11 @@ impl Executor for ImmediateUnpacker { fn buffer_available(&self, _len: usize) -> bool { true } + + #[cfg(test)] + fn buffer_used(&self) -> usize { + 0 + } } /// The non-shared state for writing a file incrementally diff --git a/src/diskio/mod.rs b/src/diskio/mod.rs index ccd5b1cd7e..dbf1da1ff8 100644 --- a/src/diskio/mod.rs +++ b/src/diskio/mod.rs @@ -328,6 +328,10 @@ pub(crate) trait Executor { /// Query the memory budget to see if a particular size buffer is available fn buffer_available(&self, len: usize) -> bool; + + #[cfg(test)] + /// Query the memory budget to see how much of the buffer pool is in use + fn buffer_used(&self) -> usize; } /// Trivial single threaded IO to be used from executors. @@ -418,9 +422,11 @@ pub(crate) fn write_file_incremental, F: Fn(usize)>( let len = contents.len(); // Length 0 vector is used for clean EOF signalling. if len == 0 { + trace_scoped!("EOF_chunk", "name": path_display, "len": len); + drop(contents); + chunk_complete_callback(len); break; - } - { + } else { trace_scoped!("write_segment", "name": path_display, "len": len); f.write_all(&contents)?; drop(contents); diff --git a/src/diskio/test.rs b/src/diskio/test.rs index f8f4570d66..13882201e0 100644 --- a/src/diskio/test.rs +++ b/src/diskio/test.rs @@ -48,13 +48,13 @@ fn test_incremental_file(io_threads: &str) -> Result<()> { } } // sending a zero length chunk closes the file - let mut chunk = io_executor.get_buffer(0); + let mut chunk = io_executor.get_buffer(super::IO_CHUNK_SIZE); chunk = chunk.finished(); sender(chunk); loop { for work in io_executor.completed().collect::>() { match work { - super::CompletedIo::Chunk(_) => unreachable!(), + super::CompletedIo::Chunk(_) => {} super::CompletedIo::Item(_) => { file_finished = true; } @@ -69,6 +69,8 @@ fn test_incremental_file(io_threads: &str) -> Result<()> { // no more work should be outstanding unreachable!(); } + + assert_eq!(io_executor.buffer_used(), 0); Ok(()) })?; // We should be able to read back the file diff --git a/src/diskio/threaded.rs b/src/diskio/threaded.rs index 429d0f2b1d..c7c8cc4386 100644 --- a/src/diskio/threaded.rs +++ b/src/diskio/threaded.rs @@ -350,6 +350,11 @@ impl<'a> Executor for Threaded<'a> { let total_used = Threaded::ram_highwater(&self.vec_pools); total_used + size < self.ram_budget } + + #[cfg(test)] + fn buffer_used(&self) -> usize { + self.vec_pools.iter().map(|(_, p)| *p.in_use.borrow()).sum() + } } impl<'a> Drop for Threaded<'a> {