Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(index): add file_size_hint for remote blob reader #5147

Merged
merged 1 commit into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions src/common/base/src/range_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ pub struct Metadata {
/// `RangeReader` reads a range of bytes from a source.
#[async_trait]
pub trait RangeReader: Send + Unpin {
/// Sets the file size hint for the reader.
///
/// It's used to optimize the reading process by reducing the number of remote requests.
fn with_file_size_hint(&mut self, file_size_hint: u64);

/// Returns the metadata of the source.
async fn metadata(&mut self) -> io::Result<Metadata>;

Expand Down Expand Up @@ -70,6 +75,10 @@ pub trait RangeReader: Send + Unpin {

#[async_trait]
impl<R: ?Sized + RangeReader> RangeReader for &mut R {
fn with_file_size_hint(&mut self, file_size_hint: u64) {
(*self).with_file_size_hint(file_size_hint)
}

async fn metadata(&mut self) -> io::Result<Metadata> {
(*self).metadata().await
}
Expand Down Expand Up @@ -186,6 +195,10 @@ impl<R: RangeReader + 'static> AsyncRead for AsyncReadAdapter<R> {

#[async_trait]
impl RangeReader for Vec<u8> {
fn with_file_size_hint(&mut self, _file_size_hint: u64) {
// do nothing
}

async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata {
content_length: self.len() as u64,
Expand Down Expand Up @@ -222,6 +235,10 @@ impl FileReader {

#[async_trait]
impl RangeReader for FileReader {
fn with_file_size_hint(&mut self, _file_size_hint: u64) {
// do nothing
}

async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata {
content_length: self.content_length,
Expand Down
20 changes: 20 additions & 0 deletions src/mito2/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,33 @@ pub enum IndexType {
}

impl FileMeta {
/// Returns true if the file has an inverted index
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)
}

/// Returns true if the file has a fulltext index
pub fn fulltext_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::FulltextIndex)
}

/// Returns the size of the inverted index file
pub fn inverted_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.inverted_index_available() {
Some(self.index_file_size)
} else {
None
}
}

/// Returns the size of the fulltext index file
pub fn fulltext_index_size(&self) -> Option<u64> {
if self.available_indexes.len() == 1 && self.fulltext_index_available() {
Some(self.index_file_size)
} else {
None
}
}
}

/// Handle to a SST file.
Expand Down
17 changes: 11 additions & 6 deletions src/mito2/src/sst/index/inverted_index/applier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ impl InvertedIndexApplier {
}

/// Applies predicates to the provided SST file id and returns the relevant row group ids
pub async fn apply(&self, file_id: FileId) -> Result<ApplyOutput> {
pub async fn apply(&self, file_id: FileId, file_size_hint: Option<u64>) -> Result<ApplyOutput> {
let _timer = INDEX_APPLY_ELAPSED
.with_label_values(&[TYPE_INVERTED_INDEX])
.start_timer();
Expand All @@ -129,8 +129,7 @@ impl InvertedIndexApplier {
if let Err(err) = other {
warn!(err; "An unexpected error occurred while reading the cached index file. Fallback to remote index file.")
}

self.remote_blob_reader(file_id).await?
self.remote_blob_reader(file_id, file_size_hint).await?
}
};

Expand Down Expand Up @@ -181,16 +180,22 @@ impl InvertedIndexApplier {
}

/// Creates a blob reader from the remote index file.
async fn remote_blob_reader(&self, file_id: FileId) -> Result<BlobReader> {
async fn remote_blob_reader(
&self,
file_id: FileId,
file_size_hint: Option<u64>,
) -> Result<BlobReader> {
let puffin_manager = self
.puffin_manager_factory
.build(self.store.clone())
.with_puffin_metadata_cache(self.puffin_metadata_cache.clone());

let file_path = location::index_file_path(&self.region_dir, file_id);
puffin_manager
.reader(&file_path)
.await
.context(PuffinBuildReaderSnafu)?
.with_file_size_hint(file_size_hint)
.blob(INDEX_BLOB_TYPE)
.await
.context(PuffinReadBlobSnafu)?
Expand Down Expand Up @@ -250,7 +255,7 @@ mod tests {
Box::new(mock_index_applier),
puffin_manager_factory,
);
let output = sst_index_applier.apply(file_id).await.unwrap();
let output = sst_index_applier.apply(file_id, None).await.unwrap();
assert_eq!(
output,
ApplyOutput {
Expand Down Expand Up @@ -290,7 +295,7 @@ mod tests {
Box::new(mock_index_applier),
puffin_manager_factory,
);
let res = sst_index_applier.apply(file_id).await;
let res = sst_index_applier.apply(file_id, None).await;
assert!(format!("{:?}", res.unwrap_err()).contains("Blob not found"));
}
}
2 changes: 1 addition & 1 deletion src/mito2/src/sst/index/inverted_index/creator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ mod tests {
.unwrap();
Box::pin(async move {
applier
.apply(sst_file_id)
.apply(sst_file_id, None)
.await
.unwrap()
.matched_segment_ids
Expand Down
21 changes: 17 additions & 4 deletions src/mito2/src/sst/index/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ impl InstrumentedStore {
path: path.to_string(),
read_byte_count,
read_count,
file_size_hint: None,
})
}

Expand Down Expand Up @@ -262,15 +263,27 @@ pub(crate) struct InstrumentedRangeReader<'a> {
path: String,
read_byte_count: &'a IntCounter,
read_count: &'a IntCounter,
file_size_hint: Option<u64>,
}

#[async_trait]
impl RangeReader for InstrumentedRangeReader<'_> {
fn with_file_size_hint(&mut self, file_size_hint: u64) {
self.file_size_hint = Some(file_size_hint);
}

async fn metadata(&mut self) -> io::Result<Metadata> {
let stat = self.store.stat(&self.path).await?;
Ok(Metadata {
content_length: stat.content_length(),
})
match self.file_size_hint {
Some(file_size_hint) => Ok(Metadata {
content_length: file_size_hint,
}),
None => {
let stat = self.store.stat(&self.path).await?;
Ok(Metadata {
content_length: stat.content_length(),
})
}
}
}

async fn read(&mut self, range: Range<u64>) -> io::Result<Bytes> {
Expand Down
7 changes: 5 additions & 2 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,11 @@ impl ParquetReaderBuilder {
if !self.file_handle.meta_ref().inverted_index_available() {
return false;
}

let apply_output = match index_applier.apply(self.file_handle.file_id()).await {
let file_size_hint = self.file_handle.meta_ref().inverted_index_size();
let apply_output = match index_applier
.apply(self.file_handle.file_id(), file_size_hint)
.await
{
Ok(output) => output,
Err(err) => {
if cfg!(any(test, feature = "test")) {
Expand Down
4 changes: 4 additions & 0 deletions src/puffin/src/partial_reader/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ use crate::partial_reader::PartialReader;

#[async_trait]
impl<R: RangeReader> RangeReader for PartialReader<R> {
fn with_file_size_hint(&mut self, _file_size_hint: u64) {
// do nothing
}

async fn metadata(&mut self) -> io::Result<Metadata> {
Ok(Metadata {
content_length: self.size,
Expand Down
3 changes: 2 additions & 1 deletion src/puffin/src/puffin_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,12 @@ pub struct PutOptions {

/// The `PuffinReader` trait provides methods for reading blobs and directories from a Puffin file.
#[async_trait]
#[auto_impl::auto_impl(Arc)]
pub trait PuffinReader {
type Blob: BlobGuard;
type Dir: DirGuard;

fn with_file_size_hint(self, file_size_hint: Option<u64>) -> Self;

/// Reads a blob from the Puffin file.
///
/// The returned `BlobGuard` is used to access the blob data.
Expand Down
21 changes: 20 additions & 1 deletion src/puffin/src/puffin_manager/fs_puffin_manager/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ pub struct FsPuffinReader<S, F> {
/// The name of the puffin file.
puffin_file_name: String,

/// The file size hint.
file_size_hint: Option<u64>,

/// The stager.
stager: S,

Expand All @@ -62,6 +65,7 @@ impl<S, F> FsPuffinReader<S, F> {
) -> Self {
Self {
puffin_file_name,
file_size_hint: None,
stager,
puffin_file_accessor,
puffin_file_metadata_cache,
Expand All @@ -78,11 +82,19 @@ where
type Blob = Either<RandomReadBlob<F>, S::Blob>;
type Dir = S::Dir;

fn with_file_size_hint(mut self, file_size_hint: Option<u64>) -> Self {
self.file_size_hint = file_size_hint;
self
}

async fn blob(&self, key: &str) -> Result<Self::Blob> {
let reader = self
let mut reader = self
.puffin_file_accessor
.reader(&self.puffin_file_name)
.await?;
if let Some(file_size_hint) = self.file_size_hint {
reader.with_file_size_hint(file_size_hint);
}
let mut file = PuffinFileReader::new(reader);

let metadata = self.get_puffin_file_metadata(&mut file).await?;
Expand Down Expand Up @@ -303,6 +315,13 @@ where
A: RangeReader,
B: RangeReader,
{
fn with_file_size_hint(&mut self, file_size_hint: u64) {
match self {
Either::L(a) => a.with_file_size_hint(file_size_hint),
Either::R(b) => b.with_file_size_hint(file_size_hint),
}
}

async fn metadata(&mut self) -> io::Result<Metadata> {
match self {
Either::L(a) => a.metadata().await,
Expand Down
Loading