Skip to content

Commit

Permalink
feat: Add executor in OpXxx and Operator (#4649)
Browse files Browse the repository at this point in the history
Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo committed May 27, 2024
1 parent bfe9e44 commit dd7f180
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 12 deletions.
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ mod tests {
/// unexpected struct/enum size change.
#[test]
fn assert_size() {
assert_eq!(24, size_of::<Operator>());
assert_eq!(40, size_of::<Operator>());
assert_eq!(256, size_of::<Entry>());
assert_eq!(232, size_of::<Metadata>());
assert_eq!(1, size_of::<EntryMode>());
Expand Down
53 changes: 52 additions & 1 deletion core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ pub struct OpRead {
override_cache_control: Option<String>,
override_content_disposition: Option<String>,
version: Option<String>,
executor: Option<Executor>,
}

impl OpRead {
Expand Down Expand Up @@ -380,6 +381,31 @@ impl OpRead {
pub fn version(&self) -> Option<&str> {
self.version.as_deref()
}

/// Set the executor of the option
pub fn with_executor(mut self, executor: Executor) -> Self {
self.executor = Some(executor);
self
}

/// Merge given executor into option.
///
/// If executor has already been set, this will do nothing.
/// Otherwise, this will set the given executor.
pub(crate) fn merge_executor(self, executor: Option<Executor>) -> Self {
if self.executor.is_some() {
return self;
}
if let Some(exec) = executor {
return self.with_executor(exec);
}
self
}

/// Get executor from option
pub fn executor(&self) -> Option<&Executor> {
self.executor.as_ref()
}
}

/// Args for reader operation.
Expand Down Expand Up @@ -550,10 +576,10 @@ pub struct OpWrite {
append: bool,
chunk: Option<usize>,
concurrent: usize,

content_type: Option<String>,
content_disposition: Option<String>,
cache_control: Option<String>,
executor: Option<Executor>,
}

impl OpWrite {
Expand Down Expand Up @@ -647,6 +673,31 @@ impl OpWrite {
self.concurrent = concurrent;
self
}

/// Get the executor from option
pub fn executor(&self) -> Option<&Executor> {
self.executor.as_ref()
}

/// Set the executor of the option
pub fn with_executor(mut self, executor: Executor) -> Self {
self.executor = Some(executor);
self
}

/// Merge given executor into option.
///
/// If executor has already been set, this will do nothing.
/// Otherwise, this will set the given executor.
pub(crate) fn merge_executor(self, executor: Option<Executor>) -> Self {
if self.executor.is_some() {
return self;
}
if let Some(exec) = executor {
return self.with_executor(exec);
}
self
}
}

/// Args for `copy` operation.
Expand Down
11 changes: 10 additions & 1 deletion core/src/types/execute/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};

/// Execute trait is used to execute task in background.
pub trait Execute: 'static {
pub trait Execute: Send + Sync + 'static {
/// Execute async task in background.
///
/// # Behavior
Expand All @@ -34,6 +34,15 @@ pub trait Execute: 'static {
fn execute(&self, f: BoxedStaticFuture<()>) -> Result<()>;
}

impl Execute for () {
fn execute(&self, _: BoxedStaticFuture<()>) -> Result<()> {
Err(Error::new(
ErrorKind::Unexpected,
"no executor has been set",
))
}
}

/// Task is generated by Executor that represents an executing task.
///
/// Users can fetch the results by calling `poll` or `.await` on this task.
Expand Down
22 changes: 18 additions & 4 deletions core/src/types/execute/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::*;
use crate::raw::MaybeSend;
use crate::*;
use futures::FutureExt;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;

Expand All @@ -29,11 +30,17 @@ use std::sync::Arc;
///
/// Executor will run futures in background and return a [`Task`] as handle to the future. Users
/// can call `task.await` to wait for the future to complete or drop the `Task` to cancel it.
#[derive(Clone)]
pub struct Executor {
executor: Arc<dyn Execute>,
}

#[cfg(feature = "executors-tokio")]
impl Debug for Executor {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Executor")
}
}

impl Default for Executor {
fn default() -> Self {
Self::new()
Expand All @@ -43,10 +50,17 @@ impl Default for Executor {
impl Executor {
/// Create a default executor.
///
/// The default executor is enabled by feature flags.
#[cfg(feature = "executors-tokio")]
/// The default executor is enabled by feature flags. If no feature flags enabled, the default
/// executor will always return error if users try to perform concurrent tasks.
pub fn new() -> Self {
Self::with(executors::TokioExecutor::default())
#[cfg(feature = "executors-tokio")]
{
Self::with(executors::TokioExecutor::default())
}
#[cfg(not(feature = "executors-tokio"))]
{
Self::with(())
}
}

/// Create a new executor with given execute impl.
Expand Down
37 changes: 32 additions & 5 deletions core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub struct Operator {

// limit is usually the maximum size of data that operator will handle in one operation
limit: usize,
/// The default executor that used to run futures in background.
default_executor: Option<Executor>,
}

/// # Operator basic API.
Expand All @@ -81,7 +83,11 @@ impl Operator {
.full_capability()
.batch_max_operations
.unwrap_or(1000);
Self { accessor, limit }
Self {
accessor,
limit,
default_executor: None,
}
}

pub(super) fn into_inner(self) -> Accessor {
Expand All @@ -103,6 +109,18 @@ impl Operator {
op
}

/// Get the default executor.
pub fn default_executor(&self) -> Option<Executor> {
self.default_executor.clone()
}

/// Specify the default executor.
pub fn with_default_executor(&self, executor: Executor) -> Self {
let mut op = self.clone();
op.default_executor = Some(executor);
op
}

/// Get information of underlying accessor.
///
/// # Examples
Expand Down Expand Up @@ -537,7 +555,10 @@ impl Operator {
OperatorFuture::new(
self.inner().clone(),
path,
(OpRead::default(), OpReader::default()),
(
OpRead::default().merge_executor(self.default_executor.clone()),
OpReader::default(),
),
|inner, path, (args, options)| async move {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Expand Down Expand Up @@ -647,7 +668,10 @@ impl Operator {
OperatorFuture::new(
self.inner().clone(),
path,
(OpRead::default(), OpReader::default()),
(
OpRead::default().merge_executor(self.default_executor.clone()),
OpReader::default(),
),
|inner, path, (args, options)| async move {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Expand Down Expand Up @@ -1052,7 +1076,7 @@ impl Operator {
OperatorFuture::new(
self.inner().clone(),
path,
OpWrite::default(),
OpWrite::default().merge_executor(self.default_executor.clone()),
|inner, path, args| async move {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Expand Down Expand Up @@ -1196,7 +1220,10 @@ impl Operator {
OperatorFuture::new(
self.inner().clone(),
path,
(OpWrite::default(), bs),
(
OpWrite::default().merge_executor(self.default_executor.clone()),
bs,
),
|inner, path, (args, bs)| async move {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Expand Down
15 changes: 15 additions & 0 deletions core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ impl<F: Future<Output = Result<Buffer>>> FutureRead<F> {
self.map(|(args, op_reader)| (args.with_version(v), op_reader))
}

/// Set the executor for this operation.
pub fn executor(self, executor: Executor) -> Self {
self.map(|(args, op_reader)| (args.with_executor(executor), op_reader))
}

/// Set the range header for this operation.
pub fn range(self, range: impl RangeBounds<u64>) -> Self {
self.map(|(args, op_reader)| (args, op_reader.with_range(range.into())))
Expand Down Expand Up @@ -312,6 +317,11 @@ impl<F: Future<Output = Result<()>>> FutureWrite<F> {
pub fn content_disposition(self, v: &str) -> Self {
self.map(|(args, bs)| (args.with_content_disposition(v), bs))
}

/// Set the executor for this operation.
pub fn executor(self, executor: Executor) -> Self {
self.map(|(args, bs)| (args.with_executor(executor), bs))
}
}

/// Future that generated by [`Operator::writer_with`].
Expand Down Expand Up @@ -369,6 +379,11 @@ impl<F: Future<Output = Result<Writer>>> FutureWriter<F> {
pub fn content_disposition(self, v: &str) -> Self {
self.map(|args| args.with_content_disposition(v))
}

/// Set the executor for this operation.
pub fn executor(self, executor: Executor) -> Self {
self.map(|args| args.with_executor(executor))
}
}

/// Future that generated by [`Operator::delete_with`].
Expand Down

0 comments on commit dd7f180

Please sign in to comment.