Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: incorrect table data disk cache key #16837

Merged
merged 2 commits into from
Nov 14, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ impl BlockReader {
let column_array_cache = CacheManager::instance().get_table_data_array_cache();
let mut cached_column_data = vec![];
let mut cached_column_array = vec![];

let column_cache_key_builder = ColumnCacheKeyBuilder::new(location);

for (_index, (column_id, ..)) in self.project_indices.iter() {
if let Some(ignore_column_ids) = ignore_column_ids {
if ignore_column_ids.contains(column_id) {
Expand All @@ -58,7 +61,7 @@ impl BlockReader {
if let Some(column_meta) = columns_meta.get(column_id) {
let (offset, len) = column_meta.offset_length();

let column_cache_key = TableDataCacheKey::new(location, *column_id, offset, len);
let column_cache_key = column_cache_key_builder.cache_cache(column_id, column_meta);

// first, check in memory table data cache
// column_array_cache
Expand Down Expand Up @@ -91,20 +94,25 @@ impl BlockReader {
.await?;

if self.put_cache {
let table_data_cache = CacheManager::instance().get_table_data_cache();
// add raw data (compressed raw bytes) to column cache
for (column_id, (chunk_idx, range)) in &merge_io_result.columns_chunk_offsets {
let cache_key = TableDataCacheKey::new(
&merge_io_result.block_path,
*column_id,
range.start as u64,
(range.end - range.start) as u64,
);
// Should NOT use `range.start` as part of the cache key,
// as they are not stable and can vary for the same column depending on the query's projection.
// For instance:
// - `SELECT col1, col2 FROM t;`
// - `SELECT col2 FROM t;`
// may result in different ranges for `col2`
// This can lead to cache missing or INCONSISTENCIES

// Safe to unwrap here, since this column has been fetched, its meta must be present.
let column_meta = columns_meta.get(column_id).unwrap();
let column_cache_key = column_cache_key_builder.cache_cache(column_id, column_meta);

let chunk_data = merge_io_result
.owner_memory
.get_chunk(*chunk_idx, &merge_io_result.block_path)?;
let data = chunk_data.slice(range.clone());
table_data_cache.insert(cache_key.as_ref().to_owned(), data);
column_data_cache.insert(column_cache_key.as_ref().to_owned(), data);
}
}

Expand All @@ -116,3 +124,17 @@ impl BlockReader {
Ok(block_read_res)
}
}

struct ColumnCacheKeyBuilder<'a> {
block_path: &'a str,
}

impl<'a> ColumnCacheKeyBuilder<'a> {
fn new(block_path: &'a str) -> Self {
Self { block_path }
}
fn cache_cache(&self, column_id: &ColumnId, column_meta: &ColumnMeta) -> TableDataCacheKey {
let (offset, len) = column_meta.offset_length();
TableDataCacheKey::new(self.block_path, *column_id, offset, len)
}
}
Loading