Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add executor in OpXxx and Operator #4649

Merged
merged 1 commit into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ mod tests {
/// unexpected struct/enum size change.
#[test]
fn assert_size() {
assert_eq!(24, size_of::<Operator>());
assert_eq!(40, size_of::<Operator>());
assert_eq!(256, size_of::<Entry>());
assert_eq!(232, size_of::<Metadata>());
assert_eq!(1, size_of::<EntryMode>());
Expand Down
53 changes: 52 additions & 1 deletion core/src/raw/ops.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ pub struct OpRead {
override_cache_control: Option<String>,
override_content_disposition: Option<String>,
version: Option<String>,
executor: Option<Executor>,
}

impl OpRead {
Expand Down Expand Up @@ -380,6 +381,31 @@ impl OpRead {
pub fn version(&self) -> Option<&str> {
self.version.as_deref()
}

/// Set the executor of the option
pub fn with_executor(mut self, executor: Executor) -> Self {
self.executor = Some(executor);
self
}

/// Merge given executor into option.
///
/// If executor has already been set, this will do nothing.
/// Otherwise, this will set the given executor.
pub(crate) fn merge_executor(self, executor: Option<Executor>) -> Self {
if self.executor.is_some() {
return self;
}
if let Some(exec) = executor {
return self.with_executor(exec);
}
self
}

/// Get executor from option
pub fn executor(&self) -> Option<&Executor> {
self.executor.as_ref()
}
}

/// Args for reader operation.
Expand Down Expand Up @@ -550,10 +576,10 @@ pub struct OpWrite {
append: bool,
chunk: Option<usize>,
concurrent: usize,

content_type: Option<String>,
content_disposition: Option<String>,
cache_control: Option<String>,
executor: Option<Executor>,
}

impl OpWrite {
Expand Down Expand Up @@ -647,6 +673,31 @@ impl OpWrite {
self.concurrent = concurrent;
self
}

/// Get the executor from option
pub fn executor(&self) -> Option<&Executor> {
self.executor.as_ref()
}

/// Set the executor of the option
pub fn with_executor(mut self, executor: Executor) -> Self {
self.executor = Some(executor);
self
}

/// Merge given executor into option.
///
/// If executor has already been set, this will do nothing.
/// Otherwise, this will set the given executor.
pub(crate) fn merge_executor(self, executor: Option<Executor>) -> Self {
if self.executor.is_some() {
return self;
}
if let Some(exec) = executor {
return self.with_executor(exec);
}
self
}
}

/// Args for `copy` operation.
Expand Down
11 changes: 10 additions & 1 deletion core/src/types/execute/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::pin::Pin;
use std::task::{Context, Poll};

/// Execute trait is used to execute task in background.
pub trait Execute: 'static {
pub trait Execute: Send + Sync + 'static {
/// Execute async task in background.
///
/// # Behavior
Expand All @@ -34,6 +34,15 @@ pub trait Execute: 'static {
fn execute(&self, f: BoxedStaticFuture<()>) -> Result<()>;
}

impl Execute for () {
fn execute(&self, _: BoxedStaticFuture<()>) -> Result<()> {
Err(Error::new(
ErrorKind::Unexpected,
"no executor has been set",
))
}
}

/// Task is generated by Executor that represents an executing task.
///
/// Users can fetch the results by calling `poll` or `.await` on this task.
Expand Down
22 changes: 18 additions & 4 deletions core/src/types/execute/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use super::*;
use crate::raw::MaybeSend;
use crate::*;
use futures::FutureExt;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::sync::Arc;

Expand All @@ -29,11 +30,17 @@ use std::sync::Arc;
///
/// Executor will run futures in background and return a [`Task`] as handle to the future. Users
/// can call `task.await` to wait for the future to complete or drop the `Task` to cancel it.
#[derive(Clone)]
pub struct Executor {
executor: Arc<dyn Execute>,
}

#[cfg(feature = "executors-tokio")]
impl Debug for Executor {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "Executor")
}
}

impl Default for Executor {
fn default() -> Self {
Self::new()
Expand All @@ -43,10 +50,17 @@ impl Default for Executor {
impl Executor {
/// Create a default executor.
///
/// The default executor is enabled by feature flags.
#[cfg(feature = "executors-tokio")]
/// The default executor is enabled by feature flags. If no feature flags enabled, the default
/// executor will always return error if users try to perform concurrent tasks.
pub fn new() -> Self {
Self::with(executors::TokioExecutor::default())
#[cfg(feature = "executors-tokio")]
{
Self::with(executors::TokioExecutor::default())
}
#[cfg(not(feature = "executors-tokio"))]
{
Self::with(())
}
}

/// Create a new executor with given execute impl.
Expand Down
37 changes: 32 additions & 5 deletions core/src/types/operator/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ pub struct Operator {

// limit is usually the maximum size of data that operator will handle in one operation
limit: usize,
/// The default executor that used to run futures in background.
default_executor: Option<Executor>,
}

/// # Operator basic API.
Expand All @@ -81,7 +83,11 @@ impl Operator {
.full_capability()
.batch_max_operations
.unwrap_or(1000);
Self { accessor, limit }
Self {
accessor,
limit,
default_executor: None,
}
}

pub(super) fn into_inner(self) -> Accessor {
Expand All @@ -103,6 +109,18 @@ impl Operator {
op
}

/// Get the default executor.
pub fn default_executor(&self) -> Option<Executor> {
self.default_executor.clone()
}

/// Specify the default executor.
pub fn with_default_executor(&self, executor: Executor) -> Self {
let mut op = self.clone();
op.default_executor = Some(executor);
op
}

/// Get information of underlying accessor.
///
/// # Examples
Expand Down Expand Up @@ -537,7 +555,10 @@ impl Operator {
OperatorFuture::new(
self.inner().clone(),
path,
(OpRead::default(), OpReader::default()),
(
OpRead::default().merge_executor(self.default_executor.clone()),
OpReader::default(),
),
|inner, path, (args, options)| async move {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Expand Down Expand Up @@ -647,7 +668,10 @@ impl Operator {
OperatorFuture::new(
self.inner().clone(),
path,
(OpRead::default(), OpReader::default()),
(
OpRead::default().merge_executor(self.default_executor.clone()),
OpReader::default(),
),
|inner, path, (args, options)| async move {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Expand Down Expand Up @@ -1052,7 +1076,7 @@ impl Operator {
OperatorFuture::new(
self.inner().clone(),
path,
OpWrite::default(),
OpWrite::default().merge_executor(self.default_executor.clone()),
|inner, path, args| async move {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Expand Down Expand Up @@ -1196,7 +1220,10 @@ impl Operator {
OperatorFuture::new(
self.inner().clone(),
path,
(OpWrite::default(), bs),
(
OpWrite::default().merge_executor(self.default_executor.clone()),
bs,
),
|inner, path, (args, bs)| async move {
if !validate_path(&path, EntryMode::FILE) {
return Err(
Expand Down
15 changes: 15 additions & 0 deletions core/src/types/operator/operator_futures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ impl<F: Future<Output = Result<Buffer>>> FutureRead<F> {
self.map(|(args, op_reader)| (args.with_version(v), op_reader))
}

/// Set the executor for this operation.
pub fn executor(self, executor: Executor) -> Self {
self.map(|(args, op_reader)| (args.with_executor(executor), op_reader))
}

/// Set the range header for this operation.
pub fn range(self, range: impl RangeBounds<u64>) -> Self {
self.map(|(args, op_reader)| (args, op_reader.with_range(range.into())))
Expand Down Expand Up @@ -312,6 +317,11 @@ impl<F: Future<Output = Result<()>>> FutureWrite<F> {
pub fn content_disposition(self, v: &str) -> Self {
self.map(|(args, bs)| (args.with_content_disposition(v), bs))
}

/// Set the executor for this operation.
pub fn executor(self, executor: Executor) -> Self {
self.map(|(args, bs)| (args.with_executor(executor), bs))
}
}

/// Future that generated by [`Operator::writer_with`].
Expand Down Expand Up @@ -369,6 +379,11 @@ impl<F: Future<Output = Result<Writer>>> FutureWriter<F> {
pub fn content_disposition(self, v: &str) -> Self {
self.map(|args| args.with_content_disposition(v))
}

/// Set the executor for this operation.
pub fn executor(self, executor: Executor) -> Self {
self.map(|args| args.with_executor(executor))
}
}

/// Future that generated by [`Operator::delete_with`].
Expand Down
Loading