Skip to content

Commit

Permalink
catch up changes
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Jun 27, 2024
1 parent b6585e3 commit 6e324fc
Show file tree
Hide file tree
Showing 13 changed files with 67 additions and 57 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion src/common/datasource/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ pub fn open_with_decoder<T: ArrowDecoder, F: Fn() -> DataFusionResult<T>>(
.reader(&path)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?
.into_bytes_stream(..);
.into_bytes_stream(..)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;

let mut upstream = compression_type.convert_stream(reader).fuse();

Expand Down
3 changes: 3 additions & 0 deletions src/common/datasource/src/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,11 +169,14 @@ impl FileFormat for CsvFormat {
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;

let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.await
.context(error::ReadObjectSnafu { path })?
.compat();

let decoded = self.compression_type.convert_async_read(reader);
Expand Down
3 changes: 3 additions & 0 deletions src/common/datasource/src/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,14 @@ impl FileFormat for JsonFormat {
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;

let reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.await
.context(error::ReadObjectSnafu { path })?
.compat();

let decoded = self.compression_type.convert_async_read(reader);
Expand Down
4 changes: 4 additions & 0 deletions src/common/datasource/src/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,14 @@ impl FileFormat for ParquetFormat {
.stat(path)
.await
.context(error::ReadObjectSnafu { path })?;

let mut reader = store
.reader(path)
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.await
.context(error::ReadObjectSnafu { path })?
.compat();

let metadata = reader
Expand Down Expand Up @@ -129,6 +132,7 @@ impl LazyParquetFileReader {
.reader(&self.path)
.await?
.into_futures_async_read(0..meta.content_length())
.await?
.compat();
self.reader = Some(reader);
}
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/cache/write_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,9 @@ impl WriteCache {
.reader(&cache_path)
.await
.context(error::OpenDalSnafu)?
.into_futures_async_read(0..cached_value.content_length());
.into_futures_async_read(0..cached_value.content_length())
.await
.context(error::OpenDalSnafu)?;

let mut writer = remote_store
.writer_with(upload_path)
Expand Down
19 changes: 12 additions & 7 deletions src/mito2/src/sst/index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,7 @@ use snafu::{OptionExt, ResultExt};
use store_api::storage::RegionId;

use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey};
use crate::error::{
ApplyIndexSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, PuffinReadMetadataSnafu,
Result,
};
use crate::error::{ApplyIndexSnafu, OpenDalSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, PuffinReadMetadataSnafu, Result};
use crate::metrics::{
INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL,
INDEX_PUFFIN_READ_OP_TOTAL, INDEX_PUFFIN_SEEK_OP_TOTAL,
Expand Down Expand Up @@ -128,11 +125,19 @@ impl SstIndexApplier {
return Ok(None);
};

Ok(file_cache
let Some(reader) = file_cache
.reader(IndexKey::new(self.region_id, file_id, FileType::Puffin))
.await
.map(|v| v.into_futures_async_read(0..indexed_value.file_size as u64))
.map(PuffinFileReader::new))
else {
return Ok(None);
};

let reader = reader
.into_futures_async_read(0..indexed_value.file_size as u64)
.await
.context(OpenDalSnafu)?;

Ok(Some(PuffinFileReader::new(reader)))
}

/// Helper function to create a [`PuffinFileReader`] from the remote index file.
Expand Down
4 changes: 3 additions & 1 deletion src/mito2/src/sst/index/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,9 @@ impl InstrumentedStore {
.reader(path)
.await
.context(OpenDalSnafu)?
.into_futures_async_read(0..meta.content_length());
.into_futures_async_read(0..meta.content_length())
.await
.context(OpenDalSnafu)?;
Ok(InstrumentedAsyncRead::new(
reader,
read_byte_count,
Expand Down
2 changes: 1 addition & 1 deletion src/object-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ futures.workspace = true
lazy_static.workspace = true
md5 = "0.7"
moka = { workspace = true, features = ["future"] }
opendal = { version = "0.46", features = [
opendal = { version = "0.47", features = [
"layers-tracing",
"services-azblob",
"services-fs",
Expand Down
4 changes: 1 addition & 3 deletions src/object-store/src/layers/lru_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use opendal::raw::oio::ReadDyn;
use opendal::raw::{
Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead,
Expand Down Expand Up @@ -75,7 +73,7 @@ pub struct LruCacheAccess<I> {

impl<I: Access> LayeredAccess for LruCacheAccess<I> {
type Inner = I;
type Reader = Arc<dyn ReadDyn>;
type Reader = Box<dyn ReadDyn>;
type BlockingReader = I::BlockingReader;
type Writer = I::Writer;
type BlockingWriter = I::BlockingWriter;
Expand Down
59 changes: 23 additions & 36 deletions src/object-store/src/layers/lru_cache/read_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ use common_telemetry::debug;
use futures::{FutureExt, StreamExt};
use moka::future::Cache;
use moka::notification::ListenerFuture;
use opendal::raw::oio::{Read, ReadDyn, Reader};
use opendal::raw::{Access, BytesRange, OpRead, OpStat, RpRead};
use opendal::raw::oio::{Read, Reader};
use opendal::raw::{Access, OpRead, RpRead};
use opendal::{Buffer, Error as OpendalError, ErrorKind, Operator, Result};

use crate::metrics::{
Expand Down Expand Up @@ -51,9 +51,9 @@ fn can_cache(path: &str) -> bool {
!path.ends_with("_last_checkpoint")
}

/// Generate an unique cache key for the read path and range.
fn read_cache_key(path: &str, range: BytesRange) -> String {
format!("{:x}.cache-{}", md5::compute(path), range.to_header())
/// Generate a unique cache key for the read path and range.
fn read_cache_key(path: &str) -> String {
format!("{:x}.cache", md5::compute(path))
}

/// Local read cache for files in object storage
Expand Down Expand Up @@ -161,32 +161,29 @@ impl ReadCache {
/// Read from a specific path using the OpRead operation.
/// It will attempt to retrieve the data from the local cache.
/// If the data is not found in the local cache,
/// it will fallback to retrieving it from remote object storage
/// it will fall back to retrieving it from remote object storage
/// and cache the result locally.
pub(crate) async fn read<I>(
&self,
inner: &I,
path: &str,
args: OpRead,
) -> Result<(RpRead, Arc<dyn ReadDyn>)>
) -> Result<(RpRead, Reader)>
where
I: Access,
{
if !can_cache(path) {
return inner.read(path, args).await.map(to_output_reader);
}

// FIXME: remove this block after opendal v0.47 released.
let meta = inner.stat(path, OpStat::new()).await?;
let (rp, reader) = inner.read(path, args).await?;
let reader: ReadCacheReader<I> = ReadCacheReader {
path: Arc::new(path.to_string()),
inner_reader: reader,
size: meta.into_metadata().content_length(),
file_cache: self.file_cache.clone(),
mem_cache: self.mem_cache.clone(),
};
Ok((rp, Arc::new(reader)))
Ok((rp, Box::new(reader)))
}
}

Expand All @@ -195,14 +192,6 @@ pub struct ReadCacheReader<I: Access> {
path: Arc<String>,
/// Remote file reader.
inner_reader: I::Reader,
/// FIXME: remove this field after opendal v0.47 released.
///
/// OpenDAL's read_at takes `offset, limit` which means the underlying storage
/// services could return less data than limit. We store size here as a workaround.
///
/// This API has been refactor into `offset, size` instead. After opendal v0.47 released,
/// we don't need this anymore.
size: u64,
/// Local file cache backend
file_cache: Operator,
/// Local memory cache to track local cache files
Expand All @@ -211,11 +200,11 @@ pub struct ReadCacheReader<I: Access> {

impl<I: Access> ReadCacheReader<I> {
/// TODO: we can return the Buffer directly to avoid another read from cache.
async fn read_remote(&self, offset: u64, limit: usize) -> Result<ReadResult> {
async fn read_remote(&mut self) -> Result<ReadResult> {
OBJECT_STORE_LRU_CACHE_MISS.inc();

let buf = self.inner_reader.read_at(offset, limit).await?;
let result = self.try_write_cache(buf, offset).await;
let buf = self.inner_reader.read_all().await?;
let result = self.try_write_cache(buf).await;

match result {
Ok(read_bytes) => {
Expand Down Expand Up @@ -243,29 +232,27 @@ impl<I: Access> ReadCacheReader<I> {
}
}

async fn try_write_cache(&self, buf: Buffer, offset: u64) -> Result<usize> {
async fn try_write_cache(&self, buf: Buffer) -> Result<usize> {
let size = buf.len();
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _)));
let read_key = read_cache_key(&self.path);
self.file_cache.write(&read_key, buf).await?;
Ok(size)
}
}

impl<I: Access> Read for ReadCacheReader<I> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
let size = self.size.min(offset + limit as u64) - offset;
let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _)));

let read_result = self
.mem_cache
.try_get_with(read_key.clone(), self.read_remote(offset, limit))
async fn read(&mut self) -> Result<Buffer> {
let read_key = read_cache_key(&self.path);
let mem_cache = self.mem_cache.clone();
let read_result = mem_cache
.try_get_with(read_key.clone(), self.read_remote())
.await
.map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?;
.map_err(|e| OpendalError::new(e.kind(), e.to_string()))?;

match read_result {
ReadResult::Success(_) => {
// There is a concurrent issue here, the local cache may be purged
// while reading, we have to fallback to remote read
// while reading, we have to fall back to remote read
match self.file_cache.read(&read_key).await {
Ok(ret) => {
OBJECT_STORE_LRU_CACHE_HIT
Expand All @@ -275,7 +262,7 @@ impl<I: Access> Read for ReadCacheReader<I> {
}
Err(_) => {
OBJECT_STORE_LRU_CACHE_MISS.inc();
self.inner_reader.read_at(offset, limit).await
self.inner_reader.read().await
}
}
}
Expand All @@ -286,15 +273,15 @@ impl<I: Access> Read for ReadCacheReader<I> {

Err(OpendalError::new(
ErrorKind::NotFound,
&format!("File not found: {}", self.path),
format!("File not found: {}", self.path),
))
}
}
}
}

fn to_output_reader<R: Read + 'static>(input: (RpRead, R)) -> (RpRead, Reader) {
(input.0, Arc::new(input.1))
(input.0, Box::new(input.1))
}

#[cfg(test)]
Expand Down
10 changes: 5 additions & 5 deletions src/object-store/src/layers/prometheus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ fn increment_errors_total(op: Operation, kind: ErrorKind) {
);
}

/// Please refer to [prometheus](https://docs.rs/prometheus) for every operations.
/// Please refer to [prometheus](https://docs.rs/prometheus) for every operation.
///
/// # Prometheus Metrics
///
Expand Down Expand Up @@ -441,17 +441,17 @@ impl<R> PrometheusMetricWrapper<R> {
}

impl<R: oio::Read> oio::Read for PrometheusMetricWrapper<R> {
async fn read_at(&self, offset: u64, limit: usize) -> Result<Buffer> {
self.inner.read_at(offset, limit).await.map_err(|err| {
async fn read(&mut self) -> Result<Buffer> {
self.inner.read().await.map_err(|err| {
increment_errors_total(self.op, err.kind());
err
})
}
}

impl<R: oio::BlockingRead> oio::BlockingRead for PrometheusMetricWrapper<R> {
fn read_at(&self, offset: u64, limit: usize) -> opendal::Result<Buffer> {
self.inner.read_at(offset, limit).map_err(|err| {
fn read(&mut self) -> opendal::Result<Buffer> {
self.inner.read().map_err(|err| {
increment_errors_total(self.op, err.kind());
err
})
Expand Down
4 changes: 4 additions & 0 deletions src/operator/src/statement/copy_table_from.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ impl StatementExecutor {
.await
.context(error::ReadObjectSnafu { path: &path })?
.into_futures_async_read(0..meta.content_length())
.await
.context(error::ReadObjectSnafu { path: &path })?
.compat();
let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default())
.await
Expand Down Expand Up @@ -301,6 +303,8 @@ impl StatementExecutor {
.await
.context(error::ReadObjectSnafu { path })?
.into_futures_async_read(0..meta.content_length())
.await
.context(error::ReadObjectSnafu { path })?
.compat();
let builder =
ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());
Expand Down

0 comments on commit 6e324fc

Please sign in to comment.