diff --git a/src/common/base/src/range_read.rs b/src/common/base/src/range_read.rs index 91f865d17ef6..61f28cb629fd 100644 --- a/src/common/base/src/range_read.rs +++ b/src/common/base/src/range_read.rs @@ -36,6 +36,11 @@ pub struct Metadata { /// `RangeReader` reads a range of bytes from a source. #[async_trait] pub trait RangeReader: Send + Unpin { + /// Sets the file size hint for the reader. + /// + /// It's used to optimize the reading process by reducing the number of remote requests. + fn with_file_size_hint(&mut self, file_size_hint: u64); + /// Returns the metadata of the source. async fn metadata(&mut self) -> io::Result; @@ -70,6 +75,10 @@ pub trait RangeReader: Send + Unpin { #[async_trait] impl RangeReader for &mut R { + fn with_file_size_hint(&mut self, file_size_hint: u64) { + (*self).with_file_size_hint(file_size_hint) + } + async fn metadata(&mut self) -> io::Result { (*self).metadata().await } @@ -186,6 +195,10 @@ impl AsyncRead for AsyncReadAdapter { #[async_trait] impl RangeReader for Vec { + fn with_file_size_hint(&mut self, _file_size_hint: u64) { + // do nothing + } + async fn metadata(&mut self) -> io::Result { Ok(Metadata { content_length: self.len() as u64, @@ -222,6 +235,10 @@ impl FileReader { #[async_trait] impl RangeReader for FileReader { + fn with_file_size_hint(&mut self, _file_size_hint: u64) { + // do nothing + } + async fn metadata(&mut self) -> io::Result { Ok(Metadata { content_length: self.content_length, diff --git a/src/mito2/src/sst/file.rs b/src/mito2/src/sst/file.rs index 4353ae55e3e9..5a9932ab433b 100644 --- a/src/mito2/src/sst/file.rs +++ b/src/mito2/src/sst/file.rs @@ -146,13 +146,33 @@ pub enum IndexType { } impl FileMeta { + /// Returns true if the file has an inverted index pub fn inverted_index_available(&self) -> bool { self.available_indexes.contains(&IndexType::InvertedIndex) } + /// Returns true if the file has a fulltext index pub fn fulltext_index_available(&self) -> bool { self.available_indexes.contains(&IndexType::FulltextIndex) } + + /// Returns the size of the inverted index file + pub fn inverted_index_size(&self) -> Option { + if self.available_indexes.len() == 1 && self.inverted_index_available() { + Some(self.index_file_size) + } else { + None + } + } + + /// Returns the size of the fulltext index file + pub fn fulltext_index_size(&self) -> Option { + if self.available_indexes.len() == 1 && self.fulltext_index_available() { + Some(self.index_file_size) + } else { + None + } + } } /// Handle to a SST file. diff --git a/src/mito2/src/sst/index/inverted_index/applier.rs b/src/mito2/src/sst/index/inverted_index/applier.rs index bf5206ef44be..d060d4bec17b 100644 --- a/src/mito2/src/sst/index/inverted_index/applier.rs +++ b/src/mito2/src/sst/index/inverted_index/applier.rs @@ -113,7 +113,7 @@ impl InvertedIndexApplier { } /// Applies predicates to the provided SST file id and returns the relevant row group ids - pub async fn apply(&self, file_id: FileId) -> Result { + pub async fn apply(&self, file_id: FileId, file_size_hint: Option) -> Result { let _timer = INDEX_APPLY_ELAPSED .with_label_values(&[TYPE_INVERTED_INDEX]) .start_timer(); @@ -129,8 +129,7 @@ impl InvertedIndexApplier { if let Err(err) = other { warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.") } - - self.remote_blob_reader(file_id).await? + self.remote_blob_reader(file_id, file_size_hint).await? } }; @@ -181,16 +180,22 @@ impl InvertedIndexApplier { } /// Creates a blob reader from the remote index file. - async fn remote_blob_reader(&self, file_id: FileId) -> Result { + async fn remote_blob_reader( + &self, + file_id: FileId, + file_size_hint: Option, + ) -> Result { let puffin_manager = self .puffin_manager_factory .build(self.store.clone()) .with_puffin_metadata_cache(self.puffin_metadata_cache.clone()); + let file_path = location::index_file_path(&self.region_dir, file_id); puffin_manager .reader(&file_path) .await .context(PuffinBuildReaderSnafu)? + .with_file_size_hint(file_size_hint) .blob(INDEX_BLOB_TYPE) .await .context(PuffinReadBlobSnafu)? @@ -250,7 +255,7 @@ mod tests { Box::new(mock_index_applier), puffin_manager_factory, ); - let output = sst_index_applier.apply(file_id).await.unwrap(); + let output = sst_index_applier.apply(file_id, None).await.unwrap(); assert_eq!( output, ApplyOutput { @@ -290,7 +295,7 @@ mod tests { Box::new(mock_index_applier), puffin_manager_factory, ); - let res = sst_index_applier.apply(file_id).await; + let res = sst_index_applier.apply(file_id, None).await; assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found")); } } diff --git a/src/mito2/src/sst/index/inverted_index/creator.rs b/src/mito2/src/sst/index/inverted_index/creator.rs index 029a0da8484f..43cf54fa2811 100644 --- a/src/mito2/src/sst/index/inverted_index/creator.rs +++ b/src/mito2/src/sst/index/inverted_index/creator.rs @@ -464,7 +464,7 @@ mod tests { .unwrap(); Box::pin(async move { applier - .apply(sst_file_id) + .apply(sst_file_id, None) .await .unwrap() .matched_segment_ids diff --git a/src/mito2/src/sst/index/store.rs b/src/mito2/src/sst/index/store.rs index 2750c69fc249..7322bd4db496 100644 --- a/src/mito2/src/sst/index/store.rs +++ b/src/mito2/src/sst/index/store.rs @@ -68,6 +68,7 @@ impl InstrumentedStore { path: path.to_string(), read_byte_count, read_count, + file_size_hint: None, }) } @@ -262,15 +263,27 @@ pub(crate) struct InstrumentedRangeReader<'a> { path: String, read_byte_count: &'a IntCounter, read_count: &'a IntCounter, + file_size_hint: Option, } #[async_trait] impl RangeReader for InstrumentedRangeReader<'_> { + fn with_file_size_hint(&mut self, file_size_hint: u64) { + self.file_size_hint = Some(file_size_hint); + } + async fn metadata(&mut self) -> io::Result { - let stat = self.store.stat(&self.path).await?; - Ok(Metadata { - content_length: stat.content_length(), - }) + match self.file_size_hint { + Some(file_size_hint) => Ok(Metadata { + content_length: file_size_hint, + }), + None => { + let stat = self.store.stat(&self.path).await?; + Ok(Metadata { + content_length: stat.content_length(), + }) + } + } } async fn read(&mut self, range: Range) -> io::Result { diff --git a/src/mito2/src/sst/parquet/reader.rs b/src/mito2/src/sst/parquet/reader.rs index b73026a7a6e3..02c5c2cf3cba 100644 --- a/src/mito2/src/sst/parquet/reader.rs +++ b/src/mito2/src/sst/parquet/reader.rs @@ -475,8 +475,11 @@ impl ParquetReaderBuilder { if !self.file_handle.meta_ref().inverted_index_available() { return false; } - - let apply_output = match index_applier.apply(self.file_handle.file_id()).await { + let file_size_hint = self.file_handle.meta_ref().inverted_index_size(); + let apply_output = match index_applier + .apply(self.file_handle.file_id(), file_size_hint) + .await + { Ok(output) => output, Err(err) => { if cfg!(any(test, feature = "test")) { diff --git a/src/puffin/src/partial_reader/async.rs b/src/puffin/src/partial_reader/async.rs index 3de40cb3a190..4eedd1ee31f5 100644 --- a/src/puffin/src/partial_reader/async.rs +++ b/src/puffin/src/partial_reader/async.rs @@ -23,6 +23,10 @@ use crate::partial_reader::PartialReader; #[async_trait] impl RangeReader for PartialReader { + fn with_file_size_hint(&mut self, _file_size_hint: u64) { + // do nothing + } + async fn metadata(&mut self) -> io::Result { Ok(Metadata { content_length: self.size, diff --git a/src/puffin/src/puffin_manager.rs b/src/puffin/src/puffin_manager.rs index 17101b1662e8..204bc2c66e2e 100644 --- a/src/puffin/src/puffin_manager.rs +++ b/src/puffin/src/puffin_manager.rs @@ -73,11 +73,12 @@ pub struct PutOptions { /// The `PuffinReader` trait provides methods for reading blobs and directories from a Puffin file. #[async_trait] -#[auto_impl::auto_impl(Arc)] pub trait PuffinReader { type Blob: BlobGuard; type Dir: DirGuard; + fn with_file_size_hint(self, file_size_hint: Option) -> Self; + /// Reads a blob from the Puffin file. /// /// The returned `BlobGuard` is used to access the blob data. diff --git a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs index 2e1ae594adc6..a5da2f75f858 100644 --- a/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs +++ b/src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs @@ -43,6 +43,9 @@ pub struct FsPuffinReader { /// The name of the puffin file. puffin_file_name: String, + /// The file size hint. + file_size_hint: Option, + /// The stager. stager: S, @@ -62,6 +65,7 @@ impl FsPuffinReader { ) -> Self { Self { puffin_file_name, + file_size_hint: None, stager, puffin_file_accessor, puffin_file_metadata_cache, @@ -78,11 +82,19 @@ where type Blob = Either, S::Blob>; type Dir = S::Dir; + fn with_file_size_hint(mut self, file_size_hint: Option) -> Self { + self.file_size_hint = file_size_hint; + self + } + async fn blob(&self, key: &str) -> Result { - let reader = self + let mut reader = self .puffin_file_accessor .reader(&self.puffin_file_name) .await?; + if let Some(file_size_hint) = self.file_size_hint { + reader.with_file_size_hint(file_size_hint); + } let mut file = PuffinFileReader::new(reader); let metadata = self.get_puffin_file_metadata(&mut file).await?; @@ -303,6 +315,13 @@ where A: RangeReader, B: RangeReader, { + fn with_file_size_hint(&mut self, file_size_hint: u64) { + match self { + Either::L(a) => a.with_file_size_hint(file_size_hint), + Either::R(b) => b.with_file_size_hint(file_size_hint), + } + } + async fn metadata(&mut self) -> io::Result { match self { Either::L(a) => a.metadata().await,