diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs index 0f2b2bcf8cb7..082cbc6a20ac 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs @@ -48,6 +48,9 @@ impl BlockReader { let column_array_cache = CacheManager::instance().get_table_data_array_cache(); let mut cached_column_data = vec![]; let mut cached_column_array = vec![]; + + let column_cache_key_builder = ColumnCacheKeyBuilder::new(location); + for (_index, (column_id, ..)) in self.project_indices.iter() { if let Some(ignore_column_ids) = ignore_column_ids { if ignore_column_ids.contains(column_id) { @@ -58,7 +61,7 @@ impl BlockReader { if let Some(column_meta) = columns_meta.get(column_id) { let (offset, len) = column_meta.offset_length(); - let column_cache_key = TableDataCacheKey::new(location, *column_id, offset, len); + let column_cache_key = column_cache_key_builder.cache_cache(column_id, column_meta); // first, check in memory table data cache // column_array_cache @@ -91,20 +94,25 @@ impl BlockReader { .await?; if self.put_cache { - let table_data_cache = CacheManager::instance().get_table_data_cache(); // add raw data (compressed raw bytes) to column cache for (column_id, (chunk_idx, range)) in &merge_io_result.columns_chunk_offsets { - let cache_key = TableDataCacheKey::new( - &merge_io_result.block_path, - *column_id, - range.start as u64, - (range.end - range.start) as u64, - ); + // Should NOT use `range.start` as part of the cache key, + // as they are not stable and can vary for the same column depending on the query's projection. + // For instance: + // - `SELECT col1, col2 FROM t;` + // - `SELECT col2 FROM t;` + // may result in different ranges for `col2` + // This can lead to cache missing or INCONSISTENCIES + + // Safe to unwrap here, since this column has been fetched, its meta must be present. + let column_meta = columns_meta.get(column_id).unwrap(); + let column_cache_key = column_cache_key_builder.cache_cache(column_id, column_meta); + let chunk_data = merge_io_result .owner_memory .get_chunk(*chunk_idx, &merge_io_result.block_path)?; let data = chunk_data.slice(range.clone()); - table_data_cache.insert(cache_key.as_ref().to_owned(), data); + column_data_cache.insert(column_cache_key.as_ref().to_owned(), data); } } @@ -116,3 +124,17 @@ impl BlockReader { Ok(block_read_res) } } + +struct ColumnCacheKeyBuilder<'a> { + block_path: &'a str, +} + +impl<'a> ColumnCacheKeyBuilder<'a> { + fn new(block_path: &'a str) -> Self { + Self { block_path } + } + fn cache_cache(&self, column_id: &ColumnId, column_meta: &ColumnMeta) -> TableDataCacheKey { + let (offset, len) = column_meta.offset_length(); + TableDataCacheKey::new(self.block_path, *column_id, offset, len) + } +}