diff --git a/Cargo.lock b/Cargo.lock index 717106c55f2a..d123ebfe8a8c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7051,9 +7051,9 @@ checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" [[package]] name = "opendal" -version = "0.46.0" +version = "0.47.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "328c4992328e8965e6a6ef102d38438b5fdc7d9b9107eda2377ba05379d9d544" +checksum = "876c6655dd5b410c83e0c9edf38be60fed540a1cc1c2f3a2ac31830eb8a8ff45" dependencies = [ "anyhow", "async-trait", diff --git a/src/common/datasource/src/file_format.rs b/src/common/datasource/src/file_format.rs index c555f763b59b..999830354226 100644 --- a/src/common/datasource/src/file_format.rs +++ b/src/common/datasource/src/file_format.rs @@ -149,7 +149,9 @@ pub fn open_with_decoder DataFusionResult>( .reader(&path) .await .map_err(|e| DataFusionError::External(Box::new(e)))? - .into_bytes_stream(..); + .into_bytes_stream(..) + .await + .map_err(|e| DataFusionError::External(Box::new(e)))?; let mut upstream = compression_type.convert_stream(reader).fuse(); diff --git a/src/common/datasource/src/file_format/csv.rs b/src/common/datasource/src/file_format/csv.rs index ade4e5409e42..1172004a9e19 100644 --- a/src/common/datasource/src/file_format/csv.rs +++ b/src/common/datasource/src/file_format/csv.rs @@ -169,11 +169,14 @@ impl FileFormat for CsvFormat { .stat(path) .await .context(error::ReadObjectSnafu { path })?; + let reader = store .reader(path) .await .context(error::ReadObjectSnafu { path })? .into_futures_async_read(0..meta.content_length()) + .await + .context(error::ReadObjectSnafu { path })? .compat(); let decoded = self.compression_type.convert_async_read(reader); diff --git a/src/common/datasource/src/file_format/json.rs b/src/common/datasource/src/file_format/json.rs index 97057f836200..3599fcd4ec9d 100644 --- a/src/common/datasource/src/file_format/json.rs +++ b/src/common/datasource/src/file_format/json.rs @@ -87,11 +87,14 @@ impl FileFormat for JsonFormat { .stat(path) .await .context(error::ReadObjectSnafu { path })?; + let reader = store .reader(path) .await .context(error::ReadObjectSnafu { path })? .into_futures_async_read(0..meta.content_length()) + .await + .context(error::ReadObjectSnafu { path })? .compat(); let decoded = self.compression_type.convert_async_read(reader); diff --git a/src/common/datasource/src/file_format/parquet.rs b/src/common/datasource/src/file_format/parquet.rs index 7994aafc4260..9988a311f51c 100644 --- a/src/common/datasource/src/file_format/parquet.rs +++ b/src/common/datasource/src/file_format/parquet.rs @@ -52,11 +52,14 @@ impl FileFormat for ParquetFormat { .stat(path) .await .context(error::ReadObjectSnafu { path })?; + let mut reader = store .reader(path) .await .context(error::ReadObjectSnafu { path })? .into_futures_async_read(0..meta.content_length()) + .await + .context(error::ReadObjectSnafu { path })? .compat(); let metadata = reader @@ -129,6 +132,7 @@ impl LazyParquetFileReader { .reader(&self.path) .await? .into_futures_async_read(0..meta.content_length()) + .await? .compat(); self.reader = Some(reader); } diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs index 8b64511598a8..6e6e5bea6813 100644 --- a/src/datanode/src/store.rs +++ b/src/datanode/src/store.rs @@ -20,6 +20,7 @@ mod gcs; mod oss; mod s3; +use std::sync::Arc; use std::time::Duration; use std::{env, path}; @@ -28,7 +29,7 @@ use common_telemetry::info; use object_store::layers::{LruCacheLayer, RetryLayer}; use object_store::services::Fs; use object_store::util::{join_dir, normalize_dir, with_instrument_layers}; -use object_store::{HttpClient, ObjectStore}; +use object_store::{HttpClient, ObjectStore, ObjectStoreBuilder}; use snafu::prelude::*; use crate::config::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; @@ -106,13 +107,14 @@ async fn create_object_store_with_cache( if let Some(path) = cache_path { let atomic_temp_dir = join_dir(path, ".tmp/"); clean_temp_dir(&atomic_temp_dir)?; - let mut builder = Fs::default(); - builder.root(path).atomic_write_dir(&atomic_temp_dir); - let cache_store = ObjectStore::new(builder) - .context(error::InitBackendSnafu)? - .finish(); - let cache_layer = LruCacheLayer::new(cache_store, cache_capacity.0 as usize) + let cache_store = { + let mut builder = Fs::default(); + builder.root(path).atomic_write_dir(&atomic_temp_dir); + builder.build().context(error::InitBackendSnafu)? + }; + + let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) .await .context(error::InitBackendSnafu)?; diff --git a/src/mito2/src/cache/write_cache.rs b/src/mito2/src/cache/write_cache.rs index 26eea8f2d82a..f2731e25d0b2 100644 --- a/src/mito2/src/cache/write_cache.rs +++ b/src/mito2/src/cache/write_cache.rs @@ -188,7 +188,9 @@ impl WriteCache { .reader(&cache_path) .await .context(error::OpenDalSnafu)? - .into_futures_async_read(0..cached_value.content_length()); + .into_futures_async_read(0..cached_value.content_length()) + .await + .context(error::OpenDalSnafu)?; let mut writer = remote_store .writer_with(upload_path) diff --git a/src/mito2/src/sst/index/applier.rs b/src/mito2/src/sst/index/applier.rs index aba4534b2847..da06361568f5 100644 --- a/src/mito2/src/sst/index/applier.rs +++ b/src/mito2/src/sst/index/applier.rs @@ -28,8 +28,8 @@ use store_api::storage::RegionId; use crate::cache::file_cache::{FileCacheRef, FileType, IndexKey}; use crate::error::{ - ApplyIndexSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, PuffinReadMetadataSnafu, - Result, + ApplyIndexSnafu, OpenDalSnafu, PuffinBlobTypeNotFoundSnafu, PuffinReadBlobSnafu, + PuffinReadMetadataSnafu, Result, }; use crate::metrics::{ INDEX_APPLY_ELAPSED, INDEX_APPLY_MEMORY_USAGE, INDEX_PUFFIN_READ_BYTES_TOTAL, @@ -128,11 +128,19 @@ impl SstIndexApplier { return Ok(None); }; - Ok(file_cache + let Some(reader) = file_cache .reader(IndexKey::new(self.region_id, file_id, FileType::Puffin)) .await - .map(|v| v.into_futures_async_read(0..indexed_value.file_size as u64)) - .map(PuffinFileReader::new)) + else { + return Ok(None); + }; + + let reader = reader + .into_futures_async_read(0..indexed_value.file_size as u64) + .await + .context(OpenDalSnafu)?; + + Ok(Some(PuffinFileReader::new(reader))) } /// Helper function to create a [`PuffinFileReader`] from the remote index file. diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs index 9d26118366ad..7dfcdc253cd6 100644 --- a/src/mito2/src/sst/index/store.rs +++ b/src/mito2/src/sst/index/store.rs @@ -67,7 +67,9 @@ impl InstrumentedStore { .reader(path) .await .context(OpenDalSnafu)? - .into_futures_async_read(0..meta.content_length()); + .into_futures_async_read(0..meta.content_length()) + .await + .context(OpenDalSnafu)?; Ok(InstrumentedAsyncRead::new( reader, read_byte_count, diff --git a/src/object-store/Cargo.toml b/src/object-store/Cargo.toml index 5f4c4c98ed8d..00bb5a93acfd 100644 --- a/src/object-store/Cargo.toml +++ b/src/object-store/Cargo.toml @@ -17,7 +17,7 @@ futures.workspace = true lazy_static.workspace = true md5 = "0.7" moka = { workspace = true, features = ["future"] } -opendal = { version = "0.46", features = [ +opendal = { version = "0.47", features = [ "layers-tracing", "services-azblob", "services-fs", diff --git a/src/object-store/src/layers/lru_cache.rs b/src/object-store/src/layers/lru_cache.rs index bcea36603ca6..ded6afe58bb6 100644 --- a/src/object-store/src/layers/lru_cache.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -14,26 +14,26 @@ use std::sync::Arc; -use opendal::raw::oio::ReadDyn; +use opendal::raw::oio::Reader; use opendal::raw::{ Access, Layer, LayeredAccess, OpDelete, OpList, OpRead, OpWrite, RpDelete, RpList, RpRead, RpWrite, }; -use opendal::{Operator, Result}; +use opendal::Result; mod read_cache; use common_telemetry::info; use read_cache::ReadCache; /// An opendal layer with local LRU file cache supporting. #[derive(Clone)] -pub struct LruCacheLayer { +pub struct LruCacheLayer { // The read cache - read_cache: ReadCache, + read_cache: ReadCache, } -impl LruCacheLayer { +impl LruCacheLayer { /// Create a `[LruCacheLayer]` with local file cache and capacity in bytes. - pub async fn new(file_cache: Operator, capacity: usize) -> Result { + pub async fn new(file_cache: Arc, capacity: usize) -> Result { let read_cache = ReadCache::new(file_cache, capacity); let (entries, bytes) = read_cache.recover_cache().await?; @@ -52,12 +52,12 @@ impl LruCacheLayer { /// Returns the read cache statistics info `(EntryCount, SizeInBytes)`. pub async fn read_cache_stat(&self) -> (u64, u64) { - self.read_cache.stat().await + self.read_cache.cache_stat().await } } -impl Layer for LruCacheLayer { - type LayeredAccess = LruCacheAccess; +impl Layer for LruCacheLayer { + type LayeredAccess = LruCacheAccess; fn layer(&self, inner: I) -> Self::LayeredAccess { LruCacheAccess { @@ -68,14 +68,14 @@ impl Layer for LruCacheLayer { } #[derive(Debug)] -pub struct LruCacheAccess { +pub struct LruCacheAccess { inner: I, - read_cache: ReadCache, + read_cache: ReadCache, } -impl LayeredAccess for LruCacheAccess { +impl LayeredAccess for LruCacheAccess { type Inner = I; - type Reader = Arc; + type Reader = Reader; type BlockingReader = I::BlockingReader; type Writer = I::Writer; type BlockingWriter = I::BlockingWriter; @@ -87,7 +87,9 @@ impl LayeredAccess for LruCacheAccess { } async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> { - self.read_cache.read(&self.inner, path, args).await + self.read_cache + .read_from_cache(&self.inner, path, args) + .await } async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { diff --git a/src/object-store/src/layers/lru_cache/read_cache.rs b/src/object-store/src/layers/lru_cache/read_cache.rs index 81415b8039ca..6519adf766f9 100644 --- a/src/object-store/src/layers/lru_cache/read_cache.rs +++ b/src/object-store/src/layers/lru_cache/read_cache.rs @@ -15,12 +15,12 @@ use std::sync::Arc; use common_telemetry::debug; -use futures::{FutureExt, StreamExt}; +use futures::FutureExt; use moka::future::Cache; use moka::notification::ListenerFuture; -use opendal::raw::oio::{Read, ReadDyn, Reader}; -use opendal::raw::{Access, BytesRange, OpRead, OpStat, RpRead}; -use opendal::{Buffer, Error as OpendalError, ErrorKind, Operator, Result}; +use opendal::raw::oio::{List, Read, Reader, Write}; +use opendal::raw::{Access, OpDelete, OpList, OpRead, OpStat, OpWrite, RpRead}; +use opendal::{Error as OpendalError, ErrorKind, Result}; use crate::metrics::{ OBJECT_STORE_LRU_CACHE_BYTES, OBJECT_STORE_LRU_CACHE_ENTRIES, OBJECT_STORE_LRU_CACHE_HIT, @@ -51,23 +51,36 @@ fn can_cache(path: &str) -> bool { !path.ends_with("_last_checkpoint") } -/// Generate an unique cache key for the read path and range. -fn read_cache_key(path: &str, range: BytesRange) -> String { - format!("{:x}.cache-{}", md5::compute(path), range.to_header()) +/// Generate a unique cache key for the read path and range. +fn read_cache_key(path: &str, args: &OpRead) -> String { + format!( + "{:x}.cache-{}", + md5::compute(path), + args.range().to_header() + ) } /// Local read cache for files in object storage -#[derive(Clone, Debug)] -pub(crate) struct ReadCache { +#[derive(Debug)] +pub(crate) struct ReadCache { /// Local file cache backend - file_cache: Operator, + file_cache: Arc, /// Local memory cache to track local cache files mem_cache: Cache, } -impl ReadCache { +impl Clone for ReadCache { + fn clone(&self) -> Self { + Self { + file_cache: self.file_cache.clone(), + mem_cache: self.mem_cache.clone(), + } + } +} + +impl ReadCache { /// Create a [`ReadCache`] with capacity in bytes. - pub(crate) fn new(file_cache: Operator, capacity: usize) -> Self { + pub(crate) fn new(file_cache: Arc, capacity: usize) -> Self { let file_cache_cloned = file_cache.clone(); let eviction_listener = move |read_key: Arc, read_result: ReadResult, cause| -> ListenerFuture { @@ -79,7 +92,7 @@ impl ReadCache { if let ReadResult::Success(size) = read_result { OBJECT_STORE_LRU_CACHE_BYTES.sub(size as i64); - let result = file_cache_cloned.delete(&read_key).await; + let result = file_cache_cloned.delete(&read_key, OpDelete::new()).await; debug!( "Deleted local cache file `{}`, result: {:?}, cause: {:?}.", read_key, result, cause @@ -104,7 +117,7 @@ impl ReadCache { } /// Returns the cache's entry count and total approximate entry size in bytes. - pub(crate) async fn stat(&self) -> (u64, u64) { + pub(crate) async fn cache_stat(&self) -> (u64, u64) { self.mem_cache.run_pending_tasks().await; (self.mem_cache.entry_count(), self.mem_cache.weighted_size()) @@ -129,17 +142,17 @@ impl ReadCache { /// Recover existing cache items from `file_cache` to `mem_cache`. /// Return entry count and total approximate entry size in bytes. pub(crate) async fn recover_cache(&self) -> Result<(u64, u64)> { - let mut pager = self.file_cache.lister("/").await?; + let (_, mut pager) = self.file_cache.list("/", OpList::default()).await?; - while let Some(entry) = pager.next().await.transpose()? { + while let Some(entry) = pager.next().await? { let read_key = entry.path(); // We can't retrieve the metadata from `[opendal::raw::oio::Entry]` directly, // because it's private field. let size = { - let stat = self.file_cache.stat(read_key).await?; + let stat = self.file_cache.stat(read_key, OpStat::default()).await?; - stat.content_length() + stat.into_metadata().content_length() }; OBJECT_STORE_LRU_CACHE_ENTRIES.inc(); @@ -149,26 +162,27 @@ impl ReadCache { .await; } - Ok(self.stat().await) + Ok(self.cache_stat().await) } /// Returns true when the read cache contains the specific file. pub(crate) async fn contains_file(&self, path: &str) -> bool { self.mem_cache.run_pending_tasks().await; - self.mem_cache.contains_key(path) && self.file_cache.stat(path).await.is_ok() + self.mem_cache.contains_key(path) + && self.file_cache.stat(path, OpStat::default()).await.is_ok() } /// Read from a specific path using the OpRead operation. /// It will attempt to retrieve the data from the local cache. /// If the data is not found in the local cache, - /// it will fallback to retrieving it from remote object storage + /// it will fall back to retrieving it from remote object storage /// and cache the result locally. - pub(crate) async fn read( + pub(crate) async fn read_from_cache( &self, inner: &I, path: &str, args: OpRead, - ) -> Result<(RpRead, Arc)> + ) -> Result<(RpRead, Reader)> where I: Access, { @@ -176,46 +190,82 @@ impl ReadCache { return inner.read(path, args).await.map(to_output_reader); } - // FIXME: remove this block after opendal v0.47 released. - let meta = inner.stat(path, OpStat::new()).await?; - let (rp, reader) = inner.read(path, args).await?; - let reader: ReadCacheReader = ReadCacheReader { - path: Arc::new(path.to_string()), - inner_reader: reader, - size: meta.into_metadata().content_length(), - file_cache: self.file_cache.clone(), - mem_cache: self.mem_cache.clone(), - }; - Ok((rp, Arc::new(reader))) + let read_key = read_cache_key(path, &args); + + let read_result = self + .mem_cache + .try_get_with( + read_key.clone(), + self.read_remote(inner, &read_key, path, args.clone()), + ) + .await + .map_err(|e| OpendalError::new(e.kind(), e.to_string()))?; + + match read_result { + ReadResult::Success(_) => { + // There is a concurrent issue here, the local cache may be purged + // while reading, we have to fall back to remote read + match self.file_cache.read(&read_key, OpRead::default()).await { + Ok(ret) => { + OBJECT_STORE_LRU_CACHE_HIT + .with_label_values(&["success"]) + .inc(); + Ok(to_output_reader(ret)) + } + Err(_) => { + OBJECT_STORE_LRU_CACHE_MISS.inc(); + inner.read(path, args).await.map(to_output_reader) + } + } + } + ReadResult::NotFound => { + OBJECT_STORE_LRU_CACHE_HIT + .with_label_values(&["not_found"]) + .inc(); + + Err(OpendalError::new( + ErrorKind::NotFound, + format!("File not found: {path}"), + )) + } + } } -} -pub struct ReadCacheReader { - /// Path of the file - path: Arc, - /// Remote file reader. - inner_reader: I::Reader, - /// FIXME: remove this field after opendal v0.47 released. - /// - /// OpenDAL's read_at takes `offset, limit` which means the underlying storage - /// services could return less data than limit. We store size here as a workaround. - /// - /// This API has been refactor into `offset, size` instead. After opendal v0.47 released, - /// we don't need this anymore. - size: u64, - /// Local file cache backend - file_cache: Operator, - /// Local memory cache to track local cache files - mem_cache: Cache, -} + async fn try_write_cache(&self, mut reader: I::Reader, read_key: &str) -> Result + where + I: Access, + { + let (_, mut writer) = self.file_cache.write(read_key, OpWrite::new()).await?; + let mut total = 0; + loop { + let bytes = reader.read().await?; + if bytes.is_empty() { + break; + } + + total += bytes.len(); + writer.write(bytes).await?; + } + // Call `close` to ensure data is written. + writer.close().await?; + Ok(total) + } -impl ReadCacheReader { - /// TODO: we can return the Buffer directly to avoid another read from cache. - async fn read_remote(&self, offset: u64, limit: usize) -> Result { + /// Read the file from remote storage. If success, write the content into local cache. + async fn read_remote( + &self, + inner: &I, + read_key: &str, + path: &str, + args: OpRead, + ) -> Result + where + I: Access, + { OBJECT_STORE_LRU_CACHE_MISS.inc(); - let buf = self.inner_reader.read_at(offset, limit).await?; - let result = self.try_write_cache(buf, offset).await; + let (_, reader) = inner.read(path, args).await?; + let result = self.try_write_cache::(reader, read_key).await; match result { Ok(read_bytes) => { @@ -242,59 +292,10 @@ impl ReadCacheReader { } } } - - async fn try_write_cache(&self, buf: Buffer, offset: u64) -> Result { - let size = buf.len(); - let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _))); - self.file_cache.write(&read_key, buf).await?; - Ok(size) - } -} - -impl Read for ReadCacheReader { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - let size = self.size.min(offset + limit as u64) - offset; - let read_key = read_cache_key(&self.path, BytesRange::new(offset, Some(size as _))); - - let read_result = self - .mem_cache - .try_get_with(read_key.clone(), self.read_remote(offset, limit)) - .await - .map_err(|e| OpendalError::new(e.kind(), &e.to_string()))?; - - match read_result { - ReadResult::Success(_) => { - // There is a concurrent issue here, the local cache may be purged - // while reading, we have to fallback to remote read - match self.file_cache.read(&read_key).await { - Ok(ret) => { - OBJECT_STORE_LRU_CACHE_HIT - .with_label_values(&["success"]) - .inc(); - Ok(ret) - } - Err(_) => { - OBJECT_STORE_LRU_CACHE_MISS.inc(); - self.inner_reader.read_at(offset, limit).await - } - } - } - ReadResult::NotFound => { - OBJECT_STORE_LRU_CACHE_HIT - .with_label_values(&["not_found"]) - .inc(); - - Err(OpendalError::new( - ErrorKind::NotFound, - &format!("File not found: {}", self.path), - )) - } - } - } } fn to_output_reader(input: (RpRead, R)) -> (RpRead, Reader) { - (input.0, Arc::new(input.1)) + (input.0, Box::new(input.1)) } #[cfg(test)] diff --git a/src/object-store/src/layers/prometheus.rs b/src/object-store/src/layers/prometheus.rs index a609ce7203f5..6cc71a3bebcf 100644 --- a/src/object-store/src/layers/prometheus.rs +++ b/src/object-store/src/layers/prometheus.rs @@ -63,7 +63,7 @@ fn increment_errors_total(op: Operation, kind: ErrorKind) { ); } -/// Please refer to [prometheus](https://docs.rs/prometheus) for every operations. +/// Please refer to [prometheus](https://docs.rs/prometheus) for every operation. /// /// # Prometheus Metrics /// @@ -441,8 +441,8 @@ impl PrometheusMetricWrapper { } impl oio::Read for PrometheusMetricWrapper { - async fn read_at(&self, offset: u64, limit: usize) -> Result { - self.inner.read_at(offset, limit).await.map_err(|err| { + async fn read(&mut self) -> Result { + self.inner.read().await.map_err(|err| { increment_errors_total(self.op, err.kind()); err }) @@ -450,8 +450,8 @@ impl oio::Read for PrometheusMetricWrapper { } impl oio::BlockingRead for PrometheusMetricWrapper { - fn read_at(&self, offset: u64, limit: usize) -> opendal::Result { - self.inner.read_at(offset, limit).map_err(|err| { + fn read(&mut self) -> opendal::Result { + self.inner.read().map_err(|err| { increment_errors_total(self.op, err.kind()); err }) diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index a3d3800054c7..b5cedf6e651a 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -22,8 +22,10 @@ use object_store::layers::LruCacheLayer; use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; use object_store::{ObjectStore, ObjectStoreBuilder}; +use opendal::raw::oio::{List, Read}; +use opendal::raw::{Access, OpList, OpRead}; use opendal::services::{Azblob, Gcs, Oss}; -use opendal::{EntryMode, Operator, OperatorBuilder}; +use opendal::{EntryMode, OperatorBuilder}; async fn test_object_crud(store: &ObjectStore) -> Result<()> { // Create object handler. @@ -227,7 +229,7 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { .root(&data_dir.path().to_string_lossy()) .atomic_write_dir(&tmp_dir.path().to_string_lossy()); - let store = ObjectStore::new(builder).unwrap().finish(); + let store = builder.build().unwrap(); let cache_dir = create_temp_dir("test_file_backend_with_lru_cache"); let cache_layer = { @@ -235,12 +237,14 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { let _ = builder .root(&cache_dir.path().to_string_lossy()) .atomic_write_dir(&cache_dir.path().to_string_lossy()); - let file_cache = Operator::new(builder).unwrap().finish(); + let file_cache = Arc::new(builder.build().unwrap()); LruCacheLayer::new(file_cache, 32).await.unwrap() }; - let store = store.layer(cache_layer.clone()); + let store = OperatorBuilder::new(store) + .layer(cache_layer.clone()) + .finish(); test_object_crud(&store).await?; test_object_list(&store).await?; @@ -250,31 +254,36 @@ async fn test_file_backend_with_lru_cache() -> Result<()> { Ok(()) } -async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) { +async fn assert_lru_cache(cache_layer: &LruCacheLayer, file_names: &[&str]) { for file_name in file_names { assert!(cache_layer.contains_file(file_name).await); } } -async fn assert_cache_files( - store: &Operator, +async fn assert_cache_files( + store: &C, file_names: &[&str], file_contents: &[&str], ) -> Result<()> { - let objects = store.list("/").await?; + let (_, mut lister) = store.list("/", OpList::default()).await?; + let mut objects = vec![]; + while let Some(e) = lister.next().await? { + objects.push(e); + } // compare the cache file with the expected cache file; ignore orders for o in objects { - let position = file_names.iter().position(|&x| x == o.name()); - assert!(position.is_some(), "file not found: {}", o.name()); + let position = file_names.iter().position(|&x| x == o.path()); + assert!(position.is_some(), "file not found: {}", o.path()); let position = position.unwrap(); - let bs = store.read(o.path()).await.unwrap(); + let (_, mut r) = store.read(o.path(), OpRead::default()).await.unwrap(); + let bs = r.read_all().await.unwrap(); assert_eq!( file_contents[position], String::from_utf8(bs.to_vec())?, "file content not match: {}", - o.name() + o.path() ); } @@ -303,7 +312,7 @@ async fn test_object_store_cache_policy() -> Result<()> { .root(&cache_dir.path().to_string_lossy()) .atomic_write_dir(&atomic_temp_dir.path().to_string_lossy()); let file_cache = Arc::new(builder.build().unwrap()); - let cache_store = OperatorBuilder::new(file_cache.clone()).finish(); + let cache_store = file_cache.clone(); // create operator for cache dir to verify cache file let cache_layer = LruCacheLayer::new(cache_store.clone(), 38).await.unwrap(); diff --git a/src/operator/src/statement/copy_table_from.rs b/src/operator/src/statement/copy_table_from.rs index 94892f10a8c9..a0818d6ea35e 100644 --- a/src/operator/src/statement/copy_table_from.rs +++ b/src/operator/src/statement/copy_table_from.rs @@ -156,6 +156,8 @@ impl StatementExecutor { .await .context(error::ReadObjectSnafu { path: &path })? .into_futures_async_read(0..meta.content_length()) + .await + .context(error::ReadObjectSnafu { path: &path })? .compat(); let metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()) .await @@ -301,6 +303,8 @@ impl StatementExecutor { .await .context(error::ReadObjectSnafu { path })? .into_futures_async_read(0..meta.content_length()) + .await + .context(error::ReadObjectSnafu { path })? .compat(); let builder = ParquetRecordBatchStreamBuilder::new_with_metadata(reader, metadata.clone());