From f5a91dd47050361628898d57b3bcce59f5f1689e Mon Sep 17 00:00:00 2001 From: NKID00 Date: Wed, 4 Sep 2024 11:01:48 +0800 Subject: [PATCH] fix: handle async cancel during file open --- core/src/services/monoiofs/core.rs | 4 ++-- core/src/services/monoiofs/reader.rs | 17 +++++++++-------- core/src/services/monoiofs/writer.rs | 13 +++++++------ 3 files changed, 18 insertions(+), 16 deletions(-) diff --git a/core/src/services/monoiofs/core.rs b/core/src/services/monoiofs/core.rs index 30d14adc987..0a3d18e8a22 100644 --- a/core/src/services/monoiofs/core.rs +++ b/core/src/services/monoiofs/core.rs @@ -145,7 +145,7 @@ impl MonoiofsCore { pub async fn spawn(&self, f: F) where F: FnOnce() -> Fut + 'static + Send, - Fut: Future, + Fut: Future + 'static, T: 'static, { let result = self @@ -153,7 +153,7 @@ impl MonoiofsCore { .send_async(Box::new(move || { // task will be spawned on current thread, task panic // will cause current worker thread panic - monoio::spawn(async move { f().await }); + monoio::spawn(f()); })) .await; self.unwrap(result); diff --git a/core/src/services/monoiofs/reader.rs b/core/src/services/monoiofs/reader.rs index 6e4e3079068..d86df4d575a 100644 --- a/core/src/services/monoiofs/reader.rs +++ b/core/src/services/monoiofs/reader.rs @@ -68,15 +68,16 @@ impl MonoiofsReader { // worker thread let file = match result { Ok(file) => { - open_result_tx - .send(Ok(())) - .expect("send result from worker thread should success"); + let Ok(()) = open_result_tx.send(Ok(())) else { + // MonoiofsReader::new is cancelled, exit worker task + return; + }; file } Err(e) => { - open_result_tx - .send(Err(new_std_io_error(e))) - .expect("send result from worker thread should success"); + // discard the result if send failed due to MonoiofsReader::new + // cancelled since we are going to exit anyway + let _ = open_result_tx.send(Err(new_std_io_error(e))); return; } }; @@ -89,8 +90,8 @@ impl MonoiofsReader { match req { ReaderRequest::Read { pos, buf, tx } => { let (result, buf) = file.read_at(buf, pos).await; - // buf.len() will be set to n by monoio if read successfully, - // so n is dropped + // buf.len() will be set to n by monoio if read + // successfully, so n is dropped let result = result.map(move |_| buf).map_err(new_std_io_error); // discard the result if send failed due to // MonoiofsReader::read cancelled diff --git a/core/src/services/monoiofs/writer.rs b/core/src/services/monoiofs/writer.rs index 6af3d485cfd..eb85ebc3950 100644 --- a/core/src/services/monoiofs/writer.rs +++ b/core/src/services/monoiofs/writer.rs @@ -72,15 +72,16 @@ impl MonoiofsWriter { // worker thread let file = match result { Ok(file) => { - open_result_tx - .send(Ok(())) - .expect("send result from worker thread should success"); + let Ok(()) = open_result_tx.send(Ok(())) else { + // MonoiofsWriter::new is cancelled, exit worker task + return; + }; file } Err(e) => { - open_result_tx - .send(Err(new_std_io_error(e))) - .expect("send result from worker thread should success"); + // discard the result if send failed due to MonoiofsWriter::new + // cancelled since we are going to exit anyway + let _ = open_result_tx.send(Err(new_std_io_error(e))); return; } };