Skip to content

Commit

Permalink
remove BufVectoredIo impl
Browse files Browse the repository at this point in the history
  • Loading branch information
George-Miao committed Apr 25, 2024
1 parent 6005a26 commit 03eea73
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 87 deletions.
1 change: 1 addition & 0 deletions core/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

106 changes: 53 additions & 53 deletions core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,15 @@ default = ["reqwest/rustls-tls", "services-memory"]
#
# You should never enable this feature unless you are developing opendal.
tests = [
"dep:rand",
"dep:sha2",
"dep:dotenvy",
"layers-blocking",
"services-azblob",
"services-fs",
"services-http",
"services-memory",
"services-s3",
"dep:rand",
"dep:sha2",
"dep:dotenvy",
"layers-blocking",
"services-azblob",
"services-fs",
"services-http",
"services-memory",
"services-s3",
]

# Enable path cache.
Expand Down Expand Up @@ -94,30 +94,30 @@ layers-dtrace = ["dep:probe"]
services-alluxio = []
services-atomicserver = ["dep:atomic_lib"]
services-azblob = [
"dep:sha2",
"dep:reqsign",
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
"dep:sha2",
"dep:reqsign",
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
]
services-azdls = [
"dep:reqsign",
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
]
services-azfile = [
"dep:reqsign",
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-azblob",
"reqsign?/reqwest_request",
]
services-b2 = []
services-cacache = ["dep:cacache"]
services-chainsafe = []
services-cloudflare-kv = []
services-compfs = ["dep:compio"]
services-cos = [
"dep:reqsign",
"reqsign?/services-tencent",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-tencent",
"reqsign?/reqwest_request",
]
services-d1 = []
services-dashmap = ["dep:dashmap"]
Expand All @@ -128,9 +128,9 @@ services-foundationdb = ["dep:foundationdb"]
services-fs = ["tokio/fs"]
services-ftp = ["dep:suppaftp", "dep:bb8", "dep:async-tls"]
services-gcs = [
"dep:reqsign",
"reqsign?/services-google",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-google",
"reqsign?/reqwest_request",
]
services-gdrive = ["internal-path-cache"]
services-ghac = []
Expand All @@ -152,15 +152,15 @@ services-moka = ["dep:moka"]
services-mongodb = ["dep:mongodb"]
services-mysql = ["dep:mysql_async"]
services-obs = [
"dep:reqsign",
"reqsign?/services-huaweicloud",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-huaweicloud",
"reqsign?/reqwest_request",
]
services-onedrive = []
services-oss = [
"dep:reqsign",
"reqsign?/services-aliyun",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-aliyun",
"reqsign?/reqwest_request",
]
services-pcloud = []
services-persy = ["dep:persy", "internal-tokio-rt"]
Expand All @@ -170,9 +170,9 @@ services-redis = ["dep:redis", "redis?/tokio-rustls-comp"]
services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"]
services-s3 = [
"dep:reqsign",
"reqsign?/services-aws",
"reqsign?/reqwest_request",
"dep:reqsign",
"reqsign?/services-aws",
"reqsign?/reqwest_request",
]
services-seafile = []
services-sftp = ["dep:openssh", "dep:openssh-sftp-client", "dep:bb8"]
Expand Down Expand Up @@ -219,8 +219,8 @@ backon = "0.4.3"
base64 = "0.22"
bytes = "1.6"
chrono = { version = "0.4.28", default-features = false, features = [
"clock",
"std",
"clock",
"std",
] }
flagset = "0.4"
futures = { version = "0.3", default-features = false, features = ["std"] }
Expand All @@ -232,7 +232,7 @@ once_cell = "1"
percent-encoding = "2"
quick-xml = { version = "0.31", features = ["serialize", "overlapped-lists"] }
reqwest = { version = "0.12.2", features = [
"stream",
"stream",
], default-features = false }
serde = { version = "1", features = ["derive"] }
serde_json = "1"
Expand Down Expand Up @@ -263,16 +263,16 @@ bb8-postgres = { version = "0.8.1", optional = true }
tokio-postgres = { version = "0.7.8", optional = true }
# for services-cacache
cacache = { version = "13.0", default-features = false, features = [
"tokio-runtime",
"mmap",
"tokio-runtime",
"mmap",
], optional = true }
# for services-dashmap
dashmap = { version = "5.4", optional = true }
# for services-etcd
etcd-client = { version = "0.12", optional = true, features = ["tls"] }
# for services-foundationdb
foundationdb = { version = "0.8.0", features = [
"embedded-fdb-include",
"embedded-fdb-include",
], optional = true }
# for services-hdfs
hdrs = { version = "0.3.2", optional = true, features = ["async_file"] }
Expand All @@ -288,23 +288,23 @@ moka = { version = "0.12", optional = true, features = ["future", "sync"] }
mongodb = { version = "2.8.1", optional = true, features = ["tokio-runtime"] }
# for services-mysql
mysql_async = { version = "0.32.2", default-features = false, features = [
"default-rustls",
"default-rustls",
], optional = true }
# for services-sftp
openssh = { version = "0.10.0", optional = true }
openssh-sftp-client = { version = "0.14.0", optional = true, features = [
"openssh",
"tracing",
"openssh",
"tracing",
] }
# for services-persy
persy = { version = "1.4.6", optional = true }
# for services-redb
redb = { version = "1.1.0", optional = true }
# for services-redis
redis = { version = "0.23.1", features = [
"cluster-async",
"tokio-comp",
"connection-manager",
"cluster-async",
"tokio-comp",
"connection-manager",
], optional = true }
# for services-rocksdb
rocksdb = { version = "0.21.0", default-features = false, optional = true }
Expand All @@ -314,9 +314,9 @@ rusqlite = { version = "0.29.0", optional = true, features = ["bundled"] }
sled = { version = "0.34.7", optional = true }
# for services-ftp
suppaftp = { version = "5.3.1", default-features = false, features = [
"async-secure",
"rustls",
"async-rustls",
"async-secure",
"rustls",
"async-rustls",
], optional = true }
# for services-tikv
tikv-client = { version = "0.3.0", optional = true, default-features = false }
Expand All @@ -325,7 +325,7 @@ hdfs-native = { version = "0.6.0", optional = true }
# for services-surrealdb
surrealdb = { version = "1.3.0", optional = true, features = ["protocol-http"] }
# for services-compfs
compio = { version = "0.10.0", optional = true, features = ["runtime"] }
compio = { version = "0.10.0", optional = true, features = ["runtime", "bytes", "polling"] }

# Layers
# for layers-async-backtrace
Expand Down Expand Up @@ -360,14 +360,14 @@ dotenvy = "0.15"
libtest-mimic = "0.6"
minitrace = { version = "0.6", features = ["enable"] }
opentelemetry = { version = "0.21", default-features = false, features = [
"trace",
"trace",
] }
pretty_assertions = "1"
rand = "0.8"
sha2 = "0.10"
size = "0.4"
tokio = { version = "1.27", features = ["fs", "macros", "rt-multi-thread"] }
tracing-subscriber = { version = "0.3", features = [
"env-filter",
"tracing-log",
"env-filter",
"tracing-log",
] }
101 changes: 67 additions & 34 deletions core/src/services/compfs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,10 @@
// specific language governing permissions and limitations
// under the License.

use bytes::Bytes;
use compio::buf::{IntoInner, IoBuf, IoVectoredBuf, OwnedIter, OwnedIterator};
use compio::buf::IoBuf;

use crate::Buffer;

#[derive(Debug, Clone)]
struct BufferOwnedIter {
buf: Buffer,
current: Bytes,
}

unsafe impl IoBuf for Buffer {
fn as_buf_ptr(&self) -> *const u8 {
self.current().as_ptr()
Expand All @@ -41,37 +34,77 @@ unsafe impl IoBuf for Buffer {
}
}

impl IoVectoredBuf for Buffer {
fn as_dyn_bufs(&self) -> impl Iterator<Item = &dyn IoBuf> {
self
}
// TODO: impl IoVectoredBuf for Buffer
// impl IoVectoredBuf for Buffer {
// fn as_dyn_bufs(&self) -> impl Iterator<Item = &dyn IoBuf> {}
//
// fn owned_iter(self) -> Result<OwnedIter<impl OwnedIterator<Inner = Self>>, Self> {
// Ok(OwnedIter::new(BufferIter {
// current: self.current(),
// buf: self,
// }))
// }
// }

fn owned_iter(self) -> Result<OwnedIter<impl OwnedIterator<Inner = Self>>, Self> {
Ok(OwnedIter::new(BufferOwnedIter {
current: self.current(),
buf: self,
}))
}
}
// #[derive(Debug, Clone)]
// struct BufferIter {
// buf: Buffer,
// current: Bytes,
// }

impl IntoInner for BufferOwnedIter {
type Inner = Buffer;
// impl IntoInner for BufferIter {
// type Inner = Buffer;
//
// fn into_inner(self) -> Self::Inner {
// self.buf
// }
// }

fn into_inner(self) -> Self::Inner {
self.buf
}
}
// impl OwnedIterator for BufferIter {
// fn next(mut self) -> Result<Self, Self::Inner> {
// let Some(current) = self.buf.next() else {
// return Err(self.buf);
// };
// self.current = current;
// Ok(self)
// }
//
// fn current(&self) -> &dyn IoBuf {
// &self.current
// }
// }

#[cfg(test)]
mod tests {
use bytes::{Buf, Bytes};
use rand::{Rng, thread_rng};

use super::*;

impl OwnedIterator for BufferOwnedIter {
fn next(mut self) -> Result<Self, Self::Inner> {
let Some(current) = self.buf.next() else {
return Err(self.buf);
};
self.current = current;
Ok(self)
fn setup_buffer() -> (Buffer, usize, Bytes) {
let mut rng = thread_rng();

let bs = (0..100)
.map(|_| {
let len = rng.gen_range(1..100);
let mut buf = vec![0; len];
rng.fill(&mut buf[..]);
Bytes::from(buf)
})
.collect::<Vec<_>>();

let total_size = bs.iter().map(|b| b.len()).sum::<usize>();
let total_content = bs.iter().flatten().copied().collect::<Bytes>();
let buf = Buffer::from(bs);

(buf, total_size, total_content)
}

fn current(&self) -> &dyn IoBuf {
&self.current
#[test]
fn test_io_buf() {
let (buf, _len, _bytes) = setup_buffer();
let slice = IoBuf::as_slice(&buf);

assert_eq!(slice, buf.current().chunk())
}
}

0 comments on commit 03eea73

Please sign in to comment.