From bac72209cff0e68c4f5c28bb42b2b0b60ac705a1 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Mon, 27 May 2024 22:44:31 +0800 Subject: [PATCH] feat: Add executor in OpXxx and Operator Signed-off-by: Xuanwo --- core/src/lib.rs | 2 +- core/src/raw/ops.rs | 53 ++++++++++++++++++++- core/src/types/execute/api.rs | 11 ++++- core/src/types/execute/executor.rs | 22 +++++++-- core/src/types/operator/operator.rs | 37 ++++++++++++-- core/src/types/operator/operator_futures.rs | 15 ++++++ 6 files changed, 128 insertions(+), 12 deletions(-) diff --git a/core/src/lib.rs b/core/src/lib.rs index 46e408f51dd..2705f042eef 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -153,7 +153,7 @@ mod tests { /// unexpected struct/enum size change. #[test] fn assert_size() { - assert_eq!(24, size_of::()); + assert_eq!(40, size_of::()); assert_eq!(256, size_of::()); assert_eq!(232, size_of::()); assert_eq!(1, size_of::()); diff --git a/core/src/raw/ops.rs b/core/src/raw/ops.rs index 2c4b4949319..e22b25f31ed 100644 --- a/core/src/raw/ops.rs +++ b/core/src/raw/ops.rs @@ -306,6 +306,7 @@ pub struct OpRead { override_cache_control: Option, override_content_disposition: Option, version: Option, + executor: Option, } impl OpRead { @@ -380,6 +381,31 @@ impl OpRead { pub fn version(&self) -> Option<&str> { self.version.as_deref() } + + /// Set the executor of the option + pub fn with_executor(mut self, executor: Executor) -> Self { + self.executor = Some(executor); + self + } + + /// Merge given executor into option. + /// + /// If executor has already been set, this will do nothing. + /// Otherwise, this will set the given executor. + pub(crate) fn merge_executor(self, executor: Option) -> Self { + if self.executor.is_some() { + return self; + } + if let Some(exec) = executor { + return self.with_executor(exec); + } + self + } + + /// Get executor from option + pub fn executor(&self) -> Option<&Executor> { + self.executor.as_ref() + } } /// Args for reader operation. @@ -550,10 +576,10 @@ pub struct OpWrite { append: bool, chunk: Option, concurrent: usize, - content_type: Option, content_disposition: Option, cache_control: Option, + executor: Option, } impl OpWrite { @@ -647,6 +673,31 @@ impl OpWrite { self.concurrent = concurrent; self } + + /// Get the executor from option + pub fn executor(&self) -> Option<&Executor> { + self.executor.as_ref() + } + + /// Set the executor of the option + pub fn with_executor(mut self, executor: Executor) -> Self { + self.executor = Some(executor); + self + } + + /// Merge given executor into option. + /// + /// If executor has already been set, this will do nothing. + /// Otherwise, this will set the given executor. + pub(crate) fn merge_executor(self, executor: Option) -> Self { + if self.executor.is_some() { + return self; + } + if let Some(exec) = executor { + return self.with_executor(exec); + } + self + } } /// Args for `copy` operation. diff --git a/core/src/types/execute/api.rs b/core/src/types/execute/api.rs index d9c4b926f87..8307d87aae4 100644 --- a/core/src/types/execute/api.rs +++ b/core/src/types/execute/api.rs @@ -24,7 +24,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; /// Execute trait is used to execute task in background. -pub trait Execute: 'static { +pub trait Execute: Send + Sync + 'static { /// Execute async task in background. /// /// # Behavior @@ -34,6 +34,15 @@ pub trait Execute: 'static { fn execute(&self, f: BoxedStaticFuture<()>) -> Result<()>; } +impl Execute for () { + fn execute(&self, _: BoxedStaticFuture<()>) -> Result<()> { + Err(Error::new( + ErrorKind::Unexpected, + "no executor has been set", + )) + } +} + /// Task is generated by Executor that represents an executing task. /// /// Users can fetch the results by calling `poll` or `.await` on this task. diff --git a/core/src/types/execute/executor.rs b/core/src/types/execute/executor.rs index 4d1be299425..d516e38aa84 100644 --- a/core/src/types/execute/executor.rs +++ b/core/src/types/execute/executor.rs @@ -19,6 +19,7 @@ use super::*; use crate::raw::MaybeSend; use crate::*; use futures::FutureExt; +use std::fmt::{Debug, Formatter}; use std::future::Future; use std::sync::Arc; @@ -29,11 +30,17 @@ use std::sync::Arc; /// /// Executor will run futures in background and return a [`Task`] as handle to the future. Users /// can call `task.await` to wait for the future to complete or drop the `Task` to cancel it. +#[derive(Clone)] pub struct Executor { executor: Arc, } -#[cfg(feature = "executors-tokio")] +impl Debug for Executor { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "Executor") + } +} + impl Default for Executor { fn default() -> Self { Self::new() @@ -43,10 +50,17 @@ impl Default for Executor { impl Executor { /// Create a default executor. /// - /// The default executor is enabled by feature flags. - #[cfg(feature = "executors-tokio")] + /// The default executor is enabled by feature flags. If no feature flags enabled, the default + /// executor will always return error if users try to perform concurrent tasks. pub fn new() -> Self { - Self::with(executors::TokioExecutor::default()) + #[cfg(feature = "executors-tokio")] + { + Self::with(executors::TokioExecutor::default()) + } + #[cfg(not(feature = "executors-tokio"))] + { + Self::with(()) + } } /// Create a new executor with given execute impl. diff --git a/core/src/types/operator/operator.rs b/core/src/types/operator/operator.rs index ece483ab33a..4685bb64e24 100644 --- a/core/src/types/operator/operator.rs +++ b/core/src/types/operator/operator.rs @@ -67,6 +67,8 @@ pub struct Operator { // limit is usually the maximum size of data that operator will handle in one operation limit: usize, + /// The default executor that used to run futures in background. + default_executor: Option, } /// # Operator basic API. @@ -81,7 +83,11 @@ impl Operator { .full_capability() .batch_max_operations .unwrap_or(1000); - Self { accessor, limit } + Self { + accessor, + limit, + default_executor: None, + } } pub(super) fn into_inner(self) -> Accessor { @@ -103,6 +109,18 @@ impl Operator { op } + /// Get the default executor. + pub fn default_executor(&self) -> Option { + self.default_executor.clone() + } + + /// Specify the default executor. + pub fn with_default_executor(&self, executor: Executor) -> Self { + let mut op = self.clone(); + op.default_executor = Some(executor); + op + } + /// Get information of underlying accessor. /// /// # Examples @@ -537,7 +555,10 @@ impl Operator { OperatorFuture::new( self.inner().clone(), path, - (OpRead::default(), OpReader::default()), + ( + OpRead::default().merge_executor(self.default_executor.clone()), + OpReader::default(), + ), |inner, path, (args, options)| async move { if !validate_path(&path, EntryMode::FILE) { return Err( @@ -647,7 +668,10 @@ impl Operator { OperatorFuture::new( self.inner().clone(), path, - (OpRead::default(), OpReader::default()), + ( + OpRead::default().merge_executor(self.default_executor.clone()), + OpReader::default(), + ), |inner, path, (args, options)| async move { if !validate_path(&path, EntryMode::FILE) { return Err( @@ -1052,7 +1076,7 @@ impl Operator { OperatorFuture::new( self.inner().clone(), path, - OpWrite::default(), + OpWrite::default().merge_executor(self.default_executor.clone()), |inner, path, args| async move { if !validate_path(&path, EntryMode::FILE) { return Err( @@ -1196,7 +1220,10 @@ impl Operator { OperatorFuture::new( self.inner().clone(), path, - (OpWrite::default(), bs), + ( + OpWrite::default().merge_executor(self.default_executor.clone()), + bs, + ), |inner, path, (args, bs)| async move { if !validate_path(&path, EntryMode::FILE) { return Err( diff --git a/core/src/types/operator/operator_futures.rs b/core/src/types/operator/operator_futures.rs index 7d1b723c1e9..832cae01945 100644 --- a/core/src/types/operator/operator_futures.rs +++ b/core/src/types/operator/operator_futures.rs @@ -217,6 +217,11 @@ impl>> FutureRead { self.map(|(args, op_reader)| (args.with_version(v), op_reader)) } + /// Set the executor for this operation. + pub fn executor(self, executor: Executor) -> Self { + self.map(|(args, op_reader)| (args.with_executor(executor), op_reader)) + } + /// Set the range header for this operation. pub fn range(self, range: impl RangeBounds) -> Self { self.map(|(args, op_reader)| (args, op_reader.with_range(range.into()))) @@ -312,6 +317,11 @@ impl>> FutureWrite { pub fn content_disposition(self, v: &str) -> Self { self.map(|(args, bs)| (args.with_content_disposition(v), bs)) } + + /// Set the executor for this operation. + pub fn executor(self, executor: Executor) -> Self { + self.map(|(args, bs)| (args.with_executor(executor), bs)) + } } /// Future that generated by [`Operator::writer_with`]. @@ -369,6 +379,11 @@ impl>> FutureWriter { pub fn content_disposition(self, v: &str) -> Self { self.map(|args| args.with_content_disposition(v)) } + + /// Set the executor for this operation. + pub fn executor(self, executor: Executor) -> Self { + self.map(|args| args.with_executor(executor)) + } } /// Future that generated by [`Operator::delete_with`].