From 1bfc124c01dfb10017246b8090bed710342eb308 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 9 Apr 2024 13:57:57 +0800 Subject: [PATCH 1/3] feat: provide send-wrapper to contidionally implement Send for operators Signed-off-by: Ruihang Xia --- integrations/object_store/Cargo.toml | 9 +- integrations/object_store/src/lib.rs | 99 ++++++++++--------- integrations/object_store/src/send_wrapper.rs | 72 ++++++++++++++ 3 files changed, 133 insertions(+), 47 deletions(-) create mode 100644 integrations/object_store/src/send_wrapper.rs diff --git a/integrations/object_store/Cargo.toml b/integrations/object_store/Cargo.toml index 1eb27b7b8e5..027f238d27b 100644 --- a/integrations/object_store/Cargo.toml +++ b/integrations/object_store/Cargo.toml @@ -27,6 +27,9 @@ repository = "https://github.com/apache/opendal" rust-version = "1.75" version = "0.43.0" +[features] +send_wrapper = ["dep:send_wrapper"] + [dependencies] async-trait = "0.1" bytes = "1" @@ -34,8 +37,12 @@ futures = "0.3" futures-util = "0.3" object_store = "0.9" opendal = { version = "0.45.1", path = "../../core" } +pin-project = "1.1" +send_wrapper = { version = "0.6", features = ["futures"], optional = true } tokio = { version = "1", default-features = false } [dev-dependencies] tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] } -opendal = { version = "0.45.1", path = "../../core", features = ["services-memory"] } +opendal = { version = "0.45.1", path = "../../core", features = [ + "services-memory", +] } diff --git a/integrations/object_store/src/lib.rs b/integrations/object_store/src/lib.rs index dd523cc9192..6daa922e337 100644 --- a/integrations/object_store/src/lib.rs +++ b/integrations/object_store/src/lib.rs @@ -15,6 +15,10 @@ // specific language governing permissions and limitations // under the License. +mod send_wrapper; + +use std::ops::Range; + use async_trait::async_trait; use bytes::Bytes; use futures::stream::BoxStream; @@ -36,7 +40,7 @@ use opendal::Entry; use opendal::Metadata; use opendal::Metakey; use opendal::Operator; -use std::ops::Range; +use send_wrapper::SendWrapper; use tokio::io::AsyncWrite; #[derive(Debug)] @@ -60,8 +64,7 @@ impl std::fmt::Display for OpendalStore { #[async_trait] impl ObjectStore for OpendalStore { async fn put(&self, location: &Path, bytes: Bytes) -> Result { - self.inner - .write(location.as_ref(), bytes) + SendWrapper::new(self.inner.write(location.as_ref(), bytes)) .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; Ok(PutResult { @@ -115,9 +118,7 @@ impl ObjectStore for OpendalStore { } async fn get(&self, location: &Path) -> Result { - let meta = self - .inner - .stat(location.as_ref()) + let meta = SendWrapper::new(self.inner.stat(location.as_ref())) .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; @@ -128,9 +129,7 @@ impl ObjectStore for OpendalStore { e_tag: meta.etag().map(|x| x.to_string()), version: meta.version().map(|x| x.to_string()), }; - let r = self - .inner - .reader(location.as_ref()) + let r = SendWrapper::new(self.inner.reader(location.as_ref())) .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; @@ -149,20 +148,20 @@ impl ObjectStore for OpendalStore { } async fn get_range(&self, location: &Path, range: Range) -> Result { - let bs = self - .inner - .read_with(location.as_ref()) - .range(range.start as u64..range.end as u64) - .await - .map_err(|err| format_object_store_error(err, location.as_ref()))?; + let bs = SendWrapper::new(async { + self.inner + .read_with(location.as_ref()) + .range(range.start as u64..range.end as u64) + .await + }) + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))?; Ok(Bytes::from(bs)) } async fn head(&self, location: &Path) -> Result { - let meta = self - .inner - .stat(location.as_ref()) + let meta = SendWrapper::new(self.inner.stat(location.as_ref())) .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; @@ -176,8 +175,7 @@ impl ObjectStore for OpendalStore { } async fn delete(&self, location: &Path) -> Result<()> { - self.inner - .delete(location.as_ref()) + SendWrapper::new(self.inner.delete(location.as_ref())) .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; @@ -207,7 +205,7 @@ impl ObjectStore for OpendalStore { Ok::<_, object_store::Error>(stream) }; - fut.into_stream().try_flatten().boxed() + SendWrapper::new(fut.into_stream().try_flatten()).boxed() } fn list_with_offset( @@ -220,40 +218,49 @@ impl ObjectStore for OpendalStore { let fut = async move { let fut = if self.inner.info().full_capability().list_with_start_after { - self.inner - .lister_with(&path) - .start_after(offset.as_ref()) - .metakey(Metakey::ContentLength | Metakey::LastModified) - .recursive(true) - .await - .map_err(|err| format_object_store_error(err, &path))? - .then(try_format_object_meta) - .boxed() + SendWrapper::new( + self.inner + .lister_with(&path) + .start_after(offset.as_ref()) + .metakey(Metakey::ContentLength | Metakey::LastModified) + .recursive(true) + .await + .map_err(|err| format_object_store_error(err, &path))? + .then(try_format_object_meta), + ) + .boxed() } else { - self.inner - .lister_with(&path) - .metakey(Metakey::ContentLength | Metakey::LastModified) - .recursive(true) - .await - .map_err(|err| format_object_store_error(err, &path))? - .try_filter(move |entry| futures::future::ready(entry.path() > offset.as_ref())) - .then(try_format_object_meta) - .boxed() + SendWrapper::new( + self.inner + .lister_with(&path) + .metakey(Metakey::ContentLength | Metakey::LastModified) + .recursive(true) + .await + .map_err(|err| format_object_store_error(err, &path))? + .try_filter(move |entry| { + futures::future::ready(entry.path() > offset.as_ref()) + }) + .then(try_format_object_meta), + ) + .boxed() }; Ok::<_, object_store::Error>(fut) }; - fut.into_stream().try_flatten().boxed() + SendWrapper::new(fut.into_stream().try_flatten()).boxed() } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { let path = prefix.map_or("".into(), |x| format!("{}/", x)); - let mut stream = self - .inner - .lister_with(&path) - .metakey(Metakey::Mode | Metakey::ContentLength | Metakey::LastModified) - .await - .map_err(|err| format_object_store_error(err, &path))?; + let stream = SendWrapper::new(async { + self.inner + .lister_with(&path) + .metakey(Metakey::Mode | Metakey::ContentLength | Metakey::LastModified) + .await + }) + .await + .map_err(|err| format_object_store_error(err, &path))?; + let mut stream = SendWrapper::new(stream); let mut common_prefixes = Vec::new(); let mut objects = Vec::new(); diff --git a/integrations/object_store/src/send_wrapper.rs b/integrations/object_store/src/send_wrapper.rs new file mode 100644 index 00000000000..25c1518cffa --- /dev/null +++ b/integrations/object_store/src/send_wrapper.rs @@ -0,0 +1,72 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Conditionally add the `Send` marker trait for the wrapped type. +//! Only take effect when the `send_wrapper` feature is enabled. + +#[cfg(feature = "send_wrapper")] +pub use send_wrapper::SendWrapper; + +#[cfg(not(feature = "send_wrapper"))] +pub use noop_wrapper::NoopWrapper as SendWrapper; + +#[cfg(not(feature = "send_wrapper"))] +mod noop_wrapper { + use std::{ + pin::Pin, + task::{Context, Poll}, + }; + + use futures::{Future, Stream}; + use pin_project::pin_project; + + #[pin_project] + pub struct NoopWrapper { + #[pin] + item: T, + } + + impl Future for NoopWrapper + where + T: Future, + { + type Output = T::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + this.item.poll(cx) + } + } + + impl Stream for NoopWrapper + where + T: Stream, + { + type Item = T::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + this.item.poll_next(cx) + } + } + + impl NoopWrapper { + pub fn new(item: T) -> Self { + Self { item } + } + } +} From 41024aaf9f9d66e5328bc89cd7359862941bca6b Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 21 Apr 2024 21:05:00 +0800 Subject: [PATCH 2/3] implement IntoSend* Signed-off-by: Ruihang Xia --- integrations/object_store/src/lib.rs | 141 +++++++++++------- integrations/object_store/src/send_wrapper.rs | 33 ++++ 2 files changed, 124 insertions(+), 50 deletions(-) diff --git a/integrations/object_store/src/lib.rs b/integrations/object_store/src/lib.rs index 6daa922e337..0697be878b5 100644 --- a/integrations/object_store/src/lib.rs +++ b/integrations/object_store/src/lib.rs @@ -17,6 +17,7 @@ mod send_wrapper; +use std::future::IntoFuture; use std::ops::Range; use async_trait::async_trait; @@ -40,7 +41,8 @@ use opendal::Entry; use opendal::Metadata; use opendal::Metakey; use opendal::Operator; -use send_wrapper::SendWrapper; +use send_wrapper::IntoSendFuture; +use send_wrapper::IntoSendStream; use tokio::io::AsyncWrite; #[derive(Debug)] @@ -64,7 +66,9 @@ impl std::fmt::Display for OpendalStore { #[async_trait] impl ObjectStore for OpendalStore { async fn put(&self, location: &Path, bytes: Bytes) -> Result { - SendWrapper::new(self.inner.write(location.as_ref(), bytes)) + self.inner + .write(location.as_ref(), bytes) + .into_send() .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; Ok(PutResult { @@ -118,7 +122,10 @@ impl ObjectStore for OpendalStore { } async fn get(&self, location: &Path) -> Result { - let meta = SendWrapper::new(self.inner.stat(location.as_ref())) + let meta = self + .inner + .stat(location.as_ref()) + .into_send() .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; @@ -129,12 +136,16 @@ impl ObjectStore for OpendalStore { e_tag: meta.etag().map(|x| x.to_string()), version: meta.version().map(|x| x.to_string()), }; - let r = SendWrapper::new(self.inner.reader(location.as_ref())) + let r = self + .inner + .reader(location.as_ref()) + .into_send() .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; let stream = r .into_futures_bytes_stream(0..meta.size as u64) + .into_send() .map_err(|err| object_store::Error::Generic { store: "IoError", source: Box::new(err), @@ -148,20 +159,23 @@ impl ObjectStore for OpendalStore { } async fn get_range(&self, location: &Path, range: Range) -> Result { - let bs = SendWrapper::new(async { - self.inner - .read_with(location.as_ref()) - .range(range.start as u64..range.end as u64) - .await - }) - .await - .map_err(|err| format_object_store_error(err, location.as_ref()))?; + let bs = self + .inner + .read_with(location.as_ref()) + .range(range.start as u64..range.end as u64) + .into_future() + .into_send() + .await + .map_err(|err| format_object_store_error(err, location.as_ref()))?; Ok(Bytes::from(bs)) } async fn head(&self, location: &Path) -> Result { - let meta = SendWrapper::new(self.inner.stat(location.as_ref())) + let meta = self + .inner + .stat(location.as_ref()) + .into_send() .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; @@ -175,7 +189,9 @@ impl ObjectStore for OpendalStore { } async fn delete(&self, location: &Path) -> Result<()> { - SendWrapper::new(self.inner.delete(location.as_ref())) + self.inner + .delete(location.as_ref()) + .into_send() .await .map_err(|err| format_object_store_error(err, location.as_ref()))?; @@ -205,7 +221,7 @@ impl ObjectStore for OpendalStore { Ok::<_, object_store::Error>(stream) }; - SendWrapper::new(fut.into_stream().try_flatten()).boxed() + fut.into_stream().try_flatten().into_send().boxed() } fn list_with_offset( @@ -218,54 +234,54 @@ impl ObjectStore for OpendalStore { let fut = async move { let fut = if self.inner.info().full_capability().list_with_start_after { - SendWrapper::new( - self.inner - .lister_with(&path) - .start_after(offset.as_ref()) - .metakey(Metakey::ContentLength | Metakey::LastModified) - .recursive(true) - .await - .map_err(|err| format_object_store_error(err, &path))? - .then(try_format_object_meta), - ) - .boxed() + self.inner + .lister_with(&path) + .start_after(offset.as_ref()) + .metakey(Metakey::ContentLength | Metakey::LastModified) + .recursive(true) + .into_future() + .into_send() + .await + .map_err(|err| format_object_store_error(err, &path))? + .then(try_format_object_meta) + .into_send() + .boxed() } else { - SendWrapper::new( - self.inner - .lister_with(&path) - .metakey(Metakey::ContentLength | Metakey::LastModified) - .recursive(true) - .await - .map_err(|err| format_object_store_error(err, &path))? - .try_filter(move |entry| { - futures::future::ready(entry.path() > offset.as_ref()) - }) - .then(try_format_object_meta), - ) - .boxed() + self.inner + .lister_with(&path) + .metakey(Metakey::ContentLength | Metakey::LastModified) + .recursive(true) + .into_future() + .into_send() + .await + .map_err(|err| format_object_store_error(err, &path))? + .try_filter(move |entry| futures::future::ready(entry.path() > offset.as_ref())) + .then(try_format_object_meta) + .into_send() + .boxed() }; Ok::<_, object_store::Error>(fut) }; - SendWrapper::new(fut.into_stream().try_flatten()).boxed() + fut.into_stream().into_send().try_flatten().boxed() } async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result { let path = prefix.map_or("".into(), |x| format!("{}/", x)); - let stream = SendWrapper::new(async { - self.inner - .lister_with(&path) - .metakey(Metakey::Mode | Metakey::ContentLength | Metakey::LastModified) - .await - }) - .await - .map_err(|err| format_object_store_error(err, &path))?; - let mut stream = SendWrapper::new(stream); + let mut stream = self + .inner + .lister_with(&path) + .metakey(Metakey::Mode | Metakey::ContentLength | Metakey::LastModified) + .into_future() + .into_send() + .await + .map_err(|err| format_object_store_error(err, &path))? + .into_send(); let mut common_prefixes = Vec::new(); let mut objects = Vec::new(); - while let Some(res) = stream.next().await { + while let Some(res) = stream.next().into_send().await { let entry = res.map_err(|err| format_object_store_error(err, ""))?; let meta = entry.metadata(); @@ -348,6 +364,31 @@ async fn try_format_object_meta(res: Result) -> Result(_: T) {} + + #[allow(dead_code)] + fn assertion() { + let op = super::Operator::new(opendal::services::Memory::default()) + .unwrap() + .finish(); + let store = super::OpendalStore::new(op); + assert_send(store.put(&"test".into(), bytes::Bytes::new())); + assert_send(store.get(&"test".into())); + assert_send(store.get_range(&"test".into(), 0..1)); + assert_send(store.head(&"test".into())); + assert_send(store.delete(&"test".into())); + assert_send(store.list(None)); + assert_send(store.list_with_offset(None, &"test".into())); + assert_send(store.list_with_delimiter(None)); + } +} + #[cfg(test)] mod tests { use std::sync::Arc; diff --git a/integrations/object_store/src/send_wrapper.rs b/integrations/object_store/src/send_wrapper.rs index 25c1518cffa..34eb52690ef 100644 --- a/integrations/object_store/src/send_wrapper.rs +++ b/integrations/object_store/src/send_wrapper.rs @@ -18,6 +18,7 @@ //! Conditionally add the `Send` marker trait for the wrapped type. //! Only take effect when the `send_wrapper` feature is enabled. +use futures::Stream; #[cfg(feature = "send_wrapper")] pub use send_wrapper::SendWrapper; @@ -70,3 +71,35 @@ mod noop_wrapper { } } } + +pub trait IntoSendFuture { + type Output; + + fn into_send(self) -> Self::Output; +} + +impl IntoSendFuture for T +where + T: futures::Future, +{ + type Output = SendWrapper; + fn into_send(self) -> Self::Output { + SendWrapper::new(self) + } +} + +pub trait IntoSendStream { + type Output; + + fn into_send(self) -> Self::Output; +} + +impl IntoSendStream for T +where + T: Stream, +{ + type Output = SendWrapper; + fn into_send(self) -> Self::Output { + SendWrapper::new(self) + } +} From 18909cac74471cdfc271367212e8b124598e507f Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Sun, 21 Apr 2024 21:07:57 +0800 Subject: [PATCH 3/3] fix typo Signed-off-by: Ruihang Xia --- integrations/object_store/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/object_store/src/lib.rs b/integrations/object_store/src/lib.rs index 70b936452a8..63b778a043c 100644 --- a/integrations/object_store/src/lib.rs +++ b/integrations/object_store/src/lib.rs @@ -364,7 +364,7 @@ async fn try_format_object_meta(res: Result) -> Result