Skip to content

Commit

Permalink
feat: provide send-wrapper to contidionally implement Send for operat…
Browse files Browse the repository at this point in the history
…ors (#4443)

* feat: provide send-wrapper to contidionally implement Send for operators

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* implement IntoSend*

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

* fix typo

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>

---------

Signed-off-by: Ruihang Xia <waynestxia@gmail.com>
  • Loading branch information
waynexia authored Apr 22, 2024
1 parent 5895cd8 commit 525c512
Show file tree
Hide file tree
Showing 3 changed files with 170 additions and 12 deletions.
7 changes: 6 additions & 1 deletion integrations/object_store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,22 @@ repository = "https://github.com/apache/opendal"
rust-version = "1.75"
version = "0.43.0"

[features]
send_wrapper = ["dep:send_wrapper"]

[dependencies]
async-trait = "0.1"
bytes = "1"
futures = "0.3"
futures-util = "0.3"
object_store = "0.9"
opendal = { version = "0.45.1", path = "../../core" }
pin-project = "1.1"
send_wrapper = { version = "0.6", features = ["futures"], optional = true }
tokio = { version = "1", default-features = false }

[dev-dependencies]
opendal = { version = "0.45.1", path = "../../core", features = [
"services-memory",
"services-memory",
] }
tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread"] }
70 changes: 59 additions & 11 deletions integrations/object_store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@
// specific language governing permissions and limitations
// under the License.

mod send_wrapper;

use std::future::IntoFuture;
use std::ops::Range;

use async_trait::async_trait;
use bytes::Bytes;
use futures::stream::BoxStream;
Expand All @@ -36,7 +41,8 @@ use opendal::Entry;
use opendal::Metadata;
use opendal::Metakey;
use opendal::Operator;
use std::ops::Range;
use send_wrapper::IntoSendFuture;
use send_wrapper::IntoSendStream;
use tokio::io::AsyncWrite;

#[derive(Debug)]
Expand All @@ -62,6 +68,7 @@ impl ObjectStore for OpendalStore {
async fn put(&self, location: &Path, bytes: Bytes) -> Result<PutResult> {
self.inner
.write(location.as_ref(), bytes)
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;
Ok(PutResult {
Expand Down Expand Up @@ -118,6 +125,7 @@ impl ObjectStore for OpendalStore {
let meta = self
.inner
.stat(location.as_ref())
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;

Expand All @@ -131,15 +139,17 @@ impl ObjectStore for OpendalStore {
let r = self
.inner
.reader(location.as_ref())
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;

let stream =
r.into_bytes_stream(0..meta.size as u64)
.map_err(|err| object_store::Error::Generic {
store: "IoError",
source: Box::new(err),
});
let stream = r
.into_bytes_stream(0..meta.size as u64)
.into_send()
.map_err(|err| object_store::Error::Generic {
store: "IoError",
source: Box::new(err),
});

Ok(GetResult {
payload: GetResultPayload::Stream(Box::pin(stream)),
Expand All @@ -153,6 +163,8 @@ impl ObjectStore for OpendalStore {
.inner
.read_with(location.as_ref())
.range(range.start as u64..range.end as u64)
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;

Expand All @@ -163,6 +175,7 @@ impl ObjectStore for OpendalStore {
let meta = self
.inner
.stat(location.as_ref())
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;

Expand All @@ -178,6 +191,7 @@ impl ObjectStore for OpendalStore {
async fn delete(&self, location: &Path) -> Result<()> {
self.inner
.delete(location.as_ref())
.into_send()
.await
.map_err(|err| format_object_store_error(err, location.as_ref()))?;

Expand Down Expand Up @@ -207,7 +221,7 @@ impl ObjectStore for OpendalStore {
Ok::<_, object_store::Error>(stream)
};

fut.into_stream().try_flatten().boxed()
fut.into_stream().try_flatten().into_send().boxed()
}

fn list_with_offset(
Expand All @@ -225,25 +239,31 @@ impl ObjectStore for OpendalStore {
.start_after(offset.as_ref())
.metakey(Metakey::ContentLength | Metakey::LastModified)
.recursive(true)
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, &path))?
.then(try_format_object_meta)
.into_send()
.boxed()
} else {
self.inner
.lister_with(&path)
.metakey(Metakey::ContentLength | Metakey::LastModified)
.recursive(true)
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, &path))?
.try_filter(move |entry| futures::future::ready(entry.path() > offset.as_ref()))
.then(try_format_object_meta)
.into_send()
.boxed()
};
Ok::<_, object_store::Error>(fut)
};

fut.into_stream().try_flatten().boxed()
fut.into_stream().into_send().try_flatten().boxed()
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
Expand All @@ -252,13 +272,16 @@ impl ObjectStore for OpendalStore {
.inner
.lister_with(&path)
.metakey(Metakey::Mode | Metakey::ContentLength | Metakey::LastModified)
.into_future()
.into_send()
.await
.map_err(|err| format_object_store_error(err, &path))?;
.map_err(|err| format_object_store_error(err, &path))?
.into_send();

let mut common_prefixes = Vec::new();
let mut objects = Vec::new();

while let Some(res) = stream.next().await {
while let Some(res) = stream.next().into_send().await {
let entry = res.map_err(|err| format_object_store_error(err, ""))?;
let meta = entry.metadata();

Expand Down Expand Up @@ -341,6 +364,31 @@ async fn try_format_object_meta(res: Result<Entry, opendal::Error>) -> Result<Ob
Ok(format_object_meta(entry.path(), meta))
}

// Make sure `send_wrapper` works as expected
#[cfg(all(feature = "send_wrapper", target_arch = "wasm32"))]
mod assert_send {
use object_store::ObjectStore;

#[allow(dead_code)]
fn assert_send<T: Send>(_: T) {}

#[allow(dead_code)]
fn assertion() {
let op = super::Operator::new(opendal::services::Memory::default())
.unwrap()
.finish();
let store = super::OpendalStore::new(op);
assert_send(store.put(&"test".into(), bytes::Bytes::new()));
assert_send(store.get(&"test".into()));
assert_send(store.get_range(&"test".into(), 0..1));
assert_send(store.head(&"test".into()));
assert_send(store.delete(&"test".into()));
assert_send(store.list(None));
assert_send(store.list_with_offset(None, &"test".into()));
assert_send(store.list_with_delimiter(None));
}
}

#[cfg(test)]
mod tests {
use std::sync::Arc;
Expand Down
105 changes: 105 additions & 0 deletions integrations/object_store/src/send_wrapper.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Conditionally add the `Send` marker trait for the wrapped type.
//! Only take effect when the `send_wrapper` feature is enabled.

use futures::Stream;
#[cfg(feature = "send_wrapper")]
pub use send_wrapper::SendWrapper;

#[cfg(not(feature = "send_wrapper"))]
pub use noop_wrapper::NoopWrapper as SendWrapper;

#[cfg(not(feature = "send_wrapper"))]
mod noop_wrapper {
use std::{
pin::Pin,
task::{Context, Poll},
};

use futures::{Future, Stream};
use pin_project::pin_project;

#[pin_project]
pub struct NoopWrapper<T> {
#[pin]
item: T,
}

impl<T> Future for NoopWrapper<T>
where
T: Future,
{
type Output = T::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.item.poll(cx)
}
}

impl<T> Stream for NoopWrapper<T>
where
T: Stream,
{
type Item = T::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
this.item.poll_next(cx)
}
}

impl<T> NoopWrapper<T> {
pub fn new(item: T) -> Self {
Self { item }
}
}
}

pub trait IntoSendFuture {
type Output;

fn into_send(self) -> Self::Output;
}

impl<T> IntoSendFuture for T
where
T: futures::Future,
{
type Output = SendWrapper<T>;
fn into_send(self) -> Self::Output {
SendWrapper::new(self)
}
}

pub trait IntoSendStream {
type Output;

fn into_send(self) -> Self::Output;
}

impl<T> IntoSendStream for T
where
T: Stream,
{
type Output = SendWrapper<T>;
fn into_send(self) -> Self::Output {
SendWrapper::new(self)
}
}

0 comments on commit 525c512

Please sign in to comment.