Skip to content

Commit

Permalink
feat(index): add file_size_hint for remote blob reader
Browse files Browse the repository at this point in the history
  • Loading branch information
WenyXu committed Dec 12, 2024
1 parent d53fbcb commit 55effd0
Show file tree
Hide file tree
Showing 9 changed files with 97 additions and 15 deletions.
17 changes: 17 additions & 0 deletions src/common/base/src/range_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ pub struct Metadata {
/// `RangeReader` reads a range of bytes from a source.
#[async_trait]
pub trait RangeReader: Send + Unpin {
/// Sets the file size hint for the reader.
///
/// It's used to optimize the reading process by reducing the number of remote requests.
fn with_file_size_hint(&mut self, file_size_hint: u64);

/// Returns the metadata of the source.
async fn metadata(&mut self) -> io::Result<Metadata>;

Expand Down Expand Up @@ -70,6 +75,10 @@ pub trait RangeReader: Send + Unpin {

#[async_trait]
impl<R: ?Sized + RangeReader> RangeReader for &mut R {
fn with_file_size_hint(&mut self, file_size_hint: u64) {
(*self).with_file_size_hint(file_size_hint)
}

async fn metadata(&mut self) -> io::Result<Metadata> {
(*self).metadata().await
}
Expand Down Expand Up @@ -186,6 +195,10 @@ impl<R: RangeReader + 'static> AsyncRead for AsyncReadAdapter<R> {

#[async_trait]
impl RangeReader for Vec<u8> {
fn with_file_size_hint(&mut self, _file_size_hint: u64) {
// do nothing
}

async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata {
content_length: self.len() as u64,
Expand Down Expand Up @@ -222,6 +235,10 @@ impl FileReader {

#[async_trait]
impl RangeReader for FileReader {
fn with_file_size_hint(&mut self, _file_size_hint: u64) {
// do nothing
}

async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata {
content_length: self.content_length,
Expand Down
20 changes: 20 additions & 0 deletions src/mito2/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,33 @@ pub enum IndexType {
}

impl FileMeta {
/// Returns true if the file has an inverted index
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)
}

/// Returns true if the file has a fulltext index
pub fn fulltext_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::FulltextIndex)
}

/// Return the size of the inverted index file
pub fn inverted_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.inverted_index_available() {
Some(self.index_file_size)
} else {
None
}
}

/// Returns the size of the fulltext index file
pub fn fulltext_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.fulltext_index_available() {
Some(self.index_file_size)
} else {
None
}
}
}

/// Handle to a SST file.
Expand Down
17 changes: 11 additions & 6 deletions src/mito2/src/sst/index/inverted_index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl InvertedIndexApplier {
}

/// Applies predicates to the provided SST file id and returns the relevant row group ids
pub async fn apply(&self, file_id: FileId) -> Result<ApplyOutput> {
pub async fn apply(&self, file_id: FileId, file_size_hint: Option<u64>) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_INVERTED_INDEX])
.start_timer();
Expand All @@ -129,8 +129,7 @@ impl InvertedIndexApplier {
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}

self.remote_blob_reader(file_id).await?
self.remote_blob_reader(file_id, file_size_hint).await?
}
};

Expand Down Expand Up @@ -181,16 +180,22 @@ impl InvertedIndexApplier {
}

/// Creates a blob reader from the remote index file.
async fn remote_blob_reader(&self, file_id: FileId) -> Result<BlobReader> {
async fn remote_blob_reader(
&self,
file_id: FileId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
let puffin_manager = self
.puffin_manager_factory
.build(self.store.clone())
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());

let file_path = location::index_file_path(&self.region_dir, file_id);
puffin_manager
.reader(&file_path)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.blob(INDEX_BLOB_TYPE)
.await
.context(PuffinReadBlobSnafu)?
Expand Down Expand Up @@ -250,7 +255,7 @@ mod tests {
Box::new(mock_index_applier),
puffin_manager_factory,
);
let output = sst_index_applier.apply(file_id).await.unwrap();
let output = sst_index_applier.apply(file_id, None).await.unwrap();
assert_eq!(
output,
ApplyOutput {
Expand Down Expand Up @@ -290,7 +295,7 @@ mod tests {
Box::new(mock_index_applier),
puffin_manager_factory,
);
let res = sst_index_applier.apply(file_id).await;
let res = sst_index_applier.apply(file_id, None).await;
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
}
}
2 changes: 1 addition & 1 deletion src/mito2/src/sst/index/inverted_index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ mod tests {
.unwrap();
Box::pin(async move {
applier
.apply(sst_file_id)
.apply(sst_file_id, None)
.await
.unwrap()
.matched_segment_ids
Expand Down
21 changes: 17 additions & 4 deletions src/mito2/src/sst/index/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl InstrumentedStore {
path: path.to_string(),
read_byte_count,
read_count,
file_size_hint: None,
})
}

Expand Down Expand Up @@ -262,15 +263,27 @@ pub(crate) struct InstrumentedRangeReader<'a> {
path: String,
read_byte_count: &'a IntCounter,
read_count: &'a IntCounter,
file_size_hint: Option<u64>,
}

#[async_trait]
impl RangeReader for InstrumentedRangeReader<'_> {
fn with_file_size_hint(&mut self, file_size_hint: u64) {
self.file_size_hint = Some(file_size_hint);
}

async fn metadata(&mut self) -> io::Result<Metadata> {
let stat = self.store.stat(&self.path).await?;
Ok(Metadata {
content_length: stat.content_length(),
})
match self.file_size_hint {
Some(file_size_hint) => Ok(Metadata {
content_length: file_size_hint,
}),
None => {
let stat = self.store.stat(&self.path).await?;
Ok(Metadata {
content_length: stat.content_length(),
})
}
}
}

async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,11 @@ impl ParquetReaderBuilder {
if !self.file_handle.meta_ref().inverted_index_available() {
return false;
}

let apply_output = match index_applier.apply(self.file_handle.file_id()).await {
let file_size_hint = self.file_handle.meta_ref().inverted_index_size();
let apply_output = match index_applier
.apply(self.file_handle.file_id(), file_size_hint)
.await
{
Ok(output) => output,
Err(err) => {
if cfg!(any(test, feature = "test")) {
Expand Down
4 changes: 4 additions & 0 deletions src/puffin/src/partial_reader/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ use crate::partial_reader::PartialReader;

#[async_trait]
impl<R: RangeReader> RangeReader for PartialReader<R> {
fn with_file_size_hint(&mut self, _file_size_hint: u64) {
// do nothing
}

async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata {
content_length: self.size,
Expand Down
3 changes: 2 additions & 1 deletion src/puffin/src/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ pub struct PutOptions {

/// The `PuffinReader` trait provides methods for reading blobs and directories from a Puffin file.
#[async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait PuffinReader {
type Blob: BlobGuard;
type Dir: DirGuard;

fn with_file_size_hint(self, file_size_hint: Option<u64>) -> Self;

/// Reads a blob from the Puffin file.
///
/// The returned `BlobGuard` is used to access the blob data.
Expand Down
21 changes: 20 additions & 1 deletion src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub struct FsPuffinReader<S, F> {
/// The name of the puffin file.
puffin_file_name: String,

/// The file size hint.
file_size_hint: Option<u64>,

/// The stager.
stager: S,

Expand All @@ -62,6 +65,7 @@ impl<S, F> FsPuffinReader<S, F> {
) -> Self {
Self {
puffin_file_name,
file_size_hint: None,
stager,
puffin_file_accessor,
puffin_file_metadata_cache,
Expand All @@ -78,11 +82,19 @@ where
type Blob = Either<RandomReadBlob<F>, S::Blob>;
type Dir = S::Dir;

fn with_file_size_hint(mut self, file_size_hint: Option<u64>) -> Self {
self.file_size_hint = file_size_hint;
self
}

async fn blob(&self, key: &str) -> Result<Self::Blob> {
let reader = self
let mut reader = self
.puffin_file_accessor
.reader(&self.puffin_file_name)
.await?;
if let Some(file_size_hint) = self.file_size_hint {
reader.with_file_size_hint(file_size_hint);
}
let mut file = PuffinFileReader::new(reader);

let metadata = self.get_puffin_file_metadata(&mut file).await?;
Expand Down Expand Up @@ -303,6 +315,13 @@ where
A: RangeReader,
B: RangeReader,
{
fn with_file_size_hint(&mut self, file_size_hint: u64) {
match self {
Either::L(a) => a.with_file_size_hint(file_size_hint),
Either::R(b) => b.with_file_size_hint(file_size_hint),
}
}

async fn metadata(&mut self) -> io::Result<Metadata> {
match self {
Either::L(a) => a.metadata().await,
Expand Down

0 comments on commit 55effd0

Please sign in to comment.