Skip to content

Commit

Permalink
refactor(core): Change Read's behavior to ensure it reads the exact s…
Browse files Browse the repository at this point in the history
…ize of data (#4634)

* rename limit to size

Signed-off-by: Xuanwo <github@xuanwo.io>

* Add buffer size check

Signed-off-by: Xuanwo <github@xuanwo.io>

* Polish comments

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix comments

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix blocking test

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix binding c

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix bindings

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix cpp

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix java test

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix fuzz test

Signed-off-by: Xuanwo <github@xuanwo.io>

* Format code

Signed-off-by: Xuanwo <github@xuanwo.io>

* Fix licenses

Signed-off-by: Xuanwo <github@xuanwo.io>

---------

Signed-off-by: Xuanwo <github@xuanwo.io>
  • Loading branch information
Xuanwo authored May 21, 2024
1 parent 3393895 commit 7d50e3c
Show file tree
Hide file tree
Showing 77 changed files with 417 additions and 447 deletions.
8 changes: 8 additions & 0 deletions bindings/c/include/opendal.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,14 @@ typedef enum opendal_code {
* The given file paths are same.
*/
OPENDAL_IS_SAME_FILE,
/**
* The condition of this operation is not match.
*/
OPENDAL_CONDITION_NOT_MATCH,
/**
* The range of the content is not satisfied.
*/
OPENDAL_RANGE_NOT_SATISFIED,
} opendal_code;

/**
Expand Down
6 changes: 6 additions & 0 deletions bindings/c/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ pub enum opendal_code {
OPENDAL_RATE_LIMITED,
/// The given file paths are same.
OPENDAL_IS_SAME_FILE,
/// The condition of this operation is not match.
OPENDAL_CONDITION_NOT_MATCH,
/// The range of the content is not satisfied.
OPENDAL_RANGE_NOT_SATISFIED,
}

impl From<core::ErrorKind> for opendal_code {
Expand All @@ -60,6 +64,8 @@ impl From<core::ErrorKind> for opendal_code {
core::ErrorKind::AlreadyExists => opendal_code::OPENDAL_ALREADY_EXISTS,
core::ErrorKind::RateLimited => opendal_code::OPENDAL_RATE_LIMITED,
core::ErrorKind::IsSameFile => opendal_code::OPENDAL_IS_SAME_FILE,
core::ErrorKind::ConditionNotMatch => opendal_code::OPENDAL_CONDITION_NOT_MATCH,
core::ErrorKind::RangeNotSatisfied => opendal_code::OPENDAL_RANGE_NOT_SATISFIED,
// if this is triggered, check the [`core`] crate and add a
// new error code accordingly
_ => panic!("The newly added ErrorKind in core crate is not handled in C bindings"),
Expand Down
8 changes: 4 additions & 4 deletions bindings/c/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,8 +369,8 @@ pub unsafe extern "C" fn opendal_operator_reader(
let op = (*op).as_ref();

let path = unsafe { std::ffi::CStr::from_ptr(path).to_str().unwrap() };
let meta = match op.stat(path) {
Ok(meta) => meta,
let reader = match op.reader(path) {
Ok(reader) => reader,
Err(err) => {
return opendal_result_operator_reader {
reader: std::ptr::null_mut(),
Expand All @@ -379,9 +379,9 @@ pub unsafe extern "C" fn opendal_operator_reader(
}
};

match op.reader(path) {
match reader.into_std_read(..) {
Ok(reader) => opendal_result_operator_reader {
reader: Box::into_raw(Box::new(opendal_reader::new(reader, meta.content_length()))),
reader: Box::into_raw(Box::new(opendal_reader::new(reader))),
error: std::ptr::null_mut(),
},
Err(e) => opendal_result_operator_reader {
Expand Down
4 changes: 2 additions & 2 deletions bindings/c/src/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ pub struct opendal_reader {
}

impl opendal_reader {
pub(crate) fn new(reader: core::BlockingReader, size: u64) -> Self {
pub(crate) fn new(reader: core::StdReader) -> Self {
Self {
inner: Box::into_raw(Box::new(reader.into_std_read(0..size))),
inner: Box::into_raw(Box::new(reader)),
}
}

Expand Down
4 changes: 3 additions & 1 deletion bindings/cpp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@ impl Operator {
fn reader(&self, path: &str) -> Result<Box<Reader>> {
let meta = self.0.stat(path)?;
Ok(Box::new(Reader(
self.0.reader(path)?.into_std_read(0..meta.content_length()),
self.0
.reader(path)?
.into_std_read(0..meta.content_length())?,
)))
}

Expand Down
11 changes: 0 additions & 11 deletions bindings/java/.cargo/config

This file was deleted.

28 changes: 28 additions & 0 deletions bindings/java/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# See also https://github.com/rust-lang/rust/issues/44991

[target.x86_64-unknown-linux-musl]
rustflags = [
"-C", "target-feature=-crt-static",
]

[target.aarch64-unknown-linux-musl]
rustflags = [
"-C", "target-feature=-crt-static",
]
1 change: 1 addition & 0 deletions bindings/java/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ impl Error {
ErrorKind::RateLimited => "RateLimited",
ErrorKind::IsSameFile => "IsSameFile",
ErrorKind::ConditionNotMatch => "ConditionNotMatch",
ErrorKind::RangeNotSatisfied => "RangeNotSatisfied",
_ => "Unexpected",
})?;
let message = env.new_string(format!("{:?}", self.inner))?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,6 @@ public enum Code {
RateLimited,
IsSameFile,
ConditionNotMatch,
ContentTruncated,
ContentIncomplete,
RangeNotSatisfied,
}
}
2 changes: 1 addition & 1 deletion bindings/java/src/operator_input_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ fn intern_construct_reader(
path: JString,
) -> crate::Result<jlong> {
let path = jstring_to_string(env, &path)?;
let reader = op.reader(&path)?.into_bytes_iterator(..);
let reader = op.reader(&path)?.into_bytes_iterator(..)?;
Ok(Box::into_raw(Box::new(reader)) as jlong)
}

Expand Down
3 changes: 1 addition & 2 deletions bindings/nodejs/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,10 +219,9 @@ impl Operator {
/// It could be used to read large file in a streaming way.
#[napi]
pub fn reader_sync(&self, path: String) -> Result<BlockingReader> {
let meta = self.0.blocking().stat(&path).map_err(format_napi_error)?;
let r = self.0.blocking().reader(&path).map_err(format_napi_error)?;
Ok(BlockingReader {
inner: r.into_std_read(0..meta.content_length()),
inner: r.into_std_read(..).map_err(format_napi_error)?,
})
}

Expand Down
4 changes: 2 additions & 2 deletions bindings/python/src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ enum FileState {
}

impl File {
pub fn new_reader(reader: ocore::BlockingReader, size: u64, capability: Capability) -> Self {
Self(FileState::Reader(reader.into_std_read(0..size)), capability)
pub fn new_reader(reader: ocore::StdReader, capability: Capability) -> Self {
Self(FileState::Reader(reader), capability)
}

pub fn new_writer(writer: ocore::BlockingWriter, capability: Capability) -> Self {
Expand Down
9 changes: 6 additions & 3 deletions bindings/python/src/operator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,12 @@ impl Operator {
let this = self.0.clone();
let capability = self.capability()?;
if mode == "rb" {
let meta = this.stat(&path).map_err(format_pyerr)?;
let r = this.reader(&path).map_err(format_pyerr)?;
Ok(File::new_reader(r, meta.content_length(), capability))
let r = this
.reader(&path)
.map_err(format_pyerr)?
.into_std_read(..)
.map_err(format_pyerr)?;
Ok(File::new_reader(r, capability))
} else if mode == "wb" {
let w = this.writer(&path).map_err(format_pyerr)?;
Ok(File::new_writer(w, capability))
Expand Down
2 changes: 1 addition & 1 deletion core/fuzz/fuzz_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ impl Arbitrary<'_> for FuzzInput {

for _ in 0..count {
let offset = u.int_in_range(0..=total_size)?;
let size = u.int_in_range(0..=total_size * 2)?;
let size = u.int_in_range(0..=total_size - offset)?;

actions.push(ReadAction::Read(offset, size));
}
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,8 +282,8 @@ impl<I> BlockingWrapper<I> {
}

impl<I: oio::Read + 'static> oio::BlockingRead for BlockingWrapper<I> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.handle.block_on(self.inner.read_at(offset, limit))
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.handle.block_on(self.inner.read_at(offset, size))
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/chaos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,19 +171,19 @@ impl<R> ChaosReader<R> {
}

impl<R: oio::Read> oio::Read for ChaosReader<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
if self.i_feel_lucky() {
self.inner.read_at(offset, limit).await
self.inner.read_at(offset, size).await
} else {
Err(Self::unexpected_eof())
}
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for ChaosReader<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
if self.i_feel_lucky() {
self.inner.read_at(offset, limit)
self.inner.read_at(offset, size)
} else {
Err(Self::unexpected_eof())
}
Expand Down
30 changes: 24 additions & 6 deletions core/src/layers/complete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,22 +587,40 @@ pub type CompleteLister<A, P> =
pub struct CompleteReader<R>(R);

impl<R: oio::Read> oio::Read for CompleteReader<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
if limit == 0 {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
if size == 0 {
return Ok(Buffer::new());
}

self.0.read_at(offset, limit).await
let buf = self.0.read_at(offset, size).await?;
if buf.len() != size {
return Err(Error::new(
ErrorKind::RangeNotSatisfied,
"service didn't return the expected size",
)
.with_context("expect", size.to_string())
.with_context("actual", buf.len().to_string()));
}
Ok(buf)
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for CompleteReader<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
if limit == 0 {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
if size == 0 {
return Ok(Buffer::new());
}

self.0.read_at(offset, limit)
let buf = self.0.read_at(offset, size)?;
if buf.len() != size {
return Err(Error::new(
ErrorKind::RangeNotSatisfied,
"service didn't return the expected size",
)
.with_context("expect", size.to_string())
.with_context("actual", buf.len().to_string()));
}
Ok(buf)
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/concurrent_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,14 +250,14 @@ impl<R> ConcurrentLimitWrapper<R> {
}

impl<R: oio::Read> oio::Read for ConcurrentLimitWrapper<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.inner.read_at(offset, limit).await
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.inner.read_at(offset, size).await
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for ConcurrentLimitWrapper<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.inner.read_at(offset, limit)
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.inner.read_at(offset, size)
}
}

Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/dtrace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,10 +340,10 @@ impl<R> DtraceLayerWrapper<R> {
}

impl<R: oio::Read> oio::Read for DtraceLayerWrapper<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, reader_read_start, c_path.as_ptr());
match self.inner.read_at(offset, limit).await {
match self.inner.read_at(offset, size).await {
Ok(bs) => {
probe_lazy!(opendal, reader_read_ok, c_path.as_ptr(), bs.remaining());
Ok(bs)
Expand All @@ -357,11 +357,11 @@ impl<R: oio::Read> oio::Read for DtraceLayerWrapper<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for DtraceLayerWrapper<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
let c_path = CString::new(self.path.clone()).unwrap();
probe_lazy!(opendal, blocking_reader_read_start, c_path.as_ptr());
self.inner
.read_at(offset, limit)
.read_at(offset, size)
.map(|bs| {
probe_lazy!(
opendal,
Expand Down
12 changes: 6 additions & 6 deletions core/src/layers/error_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,25 +339,25 @@ pub struct ErrorContextWrapper<T> {
}

impl<T: oio::Read> oio::Read for ErrorContextWrapper<T> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.inner.read_at(offset, limit).await.map_err(|err| {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.inner.read_at(offset, size).await.map_err(|err| {
err.with_operation(ReadOperation::Read)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("offset", offset.to_string())
.with_context("limit", limit.to_string())
.with_context("size", size.to_string())
})
}
}

impl<T: oio::BlockingRead> oio::BlockingRead for ErrorContextWrapper<T> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.inner.read_at(offset, limit).map_err(|err| {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
self.inner.read_at(offset, size).map_err(|err| {
err.with_operation(ReadOperation::BlockingRead)
.with_context("service", self.scheme)
.with_context("path", &self.path)
.with_context("offset", offset.to_string())
.with_context("limit", limit.to_string())
.with_context("size", size.to_string())
})
}
}
Expand Down
8 changes: 4 additions & 4 deletions core/src/layers/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -978,8 +978,8 @@ impl<R> Drop for LoggingReader<R> {
}

impl<R: oio::Read> oio::Read for LoggingReader<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
match self.inner.read_at(offset, limit).await {
async fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
match self.inner.read_at(offset, size).await {
Ok(bs) => {
self.read
.fetch_add(bs.remaining() as u64, Ordering::Relaxed);
Expand Down Expand Up @@ -1014,8 +1014,8 @@ impl<R: oio::Read> oio::Read for LoggingReader<R> {
}

impl<R: oio::BlockingRead> oio::BlockingRead for LoggingReader<R> {
fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
match self.inner.read_at(offset, limit) {
fn read_at(&self, offset: u64, size: usize) -> Result<Buffer> {
match self.inner.read_at(offset, size) {
Ok(bs) => {
self.read
.fetch_add(bs.remaining() as u64, Ordering::Relaxed);
Expand Down
4 changes: 2 additions & 2 deletions core/src/layers/madsim.rs
Original file line number Diff line number Diff line change
Expand Up @@ -261,9 +261,9 @@ pub struct MadsimReader {
}

impl oio::Read for MadsimReader {
async fn read_at(&self, offset: u64, limit: usize) -> crate::Result<Buffer> {
async fn read_at(&self, offset: u64, size: usize) -> crate::Result<Buffer> {
if let Some(ref data) = self.data {
let size = min(limit, data.len());
let size = min(size, data.len());
Ok(data.clone().split_to(size).into())
} else {
Ok(Buffer::new())
Expand Down
Loading

0 comments on commit 7d50e3c

Please sign in to comment.