From c368bad78793af11f96c903659868a36a023b341 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 13 Nov 2024 22:22:49 +0800 Subject: [PATCH 1/2] fix: incorrect table data disck cache key should use `offset` and `len` of column meta as corresponding parts of table data cache key --- .../read/block/block_reader_merge_io_async.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs index 0f2b2bcf8cb7..de812771f685 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs @@ -94,12 +94,19 @@ impl BlockReader { let table_data_cache = CacheManager::instance().get_table_data_cache(); // add raw data (compressed raw bytes) to column cache for (column_id, (chunk_idx, range)) in &merge_io_result.columns_chunk_offsets { - let cache_key = TableDataCacheKey::new( - &merge_io_result.block_path, - *column_id, - range.start as u64, - (range.end - range.start) as u64, - ); + // Safe to unwrap here, since this column has been fetched, its meta must be present. + let column_meta = columns_meta.get(column_id).unwrap(); + let (offset, len) = column_meta.offset_length(); + + // Should NOT use `range.start` as part of the cache key, + // as they are not stable and can vary for the same column depending on the query's projection. + // For instance: + // - `SELECT col1, col2 FROM t;` + // - `SELECT col2 FROM t;` + // may result in different ranges for `col2` + // This can lead to cache missing or INCONSISTENCIES + + let cache_key = TableDataCacheKey::new(location, *column_id, offset, len); let chunk_data = merge_io_result .owner_memory .get_chunk(*chunk_idx, &merge_io_result.block_path)?; From 634792fac64bcd3d669f6cb68dbd9d64607849a2 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Thu, 14 Nov 2024 00:36:58 +0800 Subject: [PATCH 2/2] refactor: enforce using same column data key --- .../read/block/block_reader_merge_io_async.rs | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs index de812771f685..082cbc6a20ac 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader_merge_io_async.rs @@ -48,6 +48,9 @@ impl BlockReader { let column_array_cache = CacheManager::instance().get_table_data_array_cache(); let mut cached_column_data = vec![]; let mut cached_column_array = vec![]; + + let column_cache_key_builder = ColumnCacheKeyBuilder::new(location); + for (_index, (column_id, ..)) in self.project_indices.iter() { if let Some(ignore_column_ids) = ignore_column_ids { if ignore_column_ids.contains(column_id) { @@ -58,7 +61,7 @@ impl BlockReader { if let Some(column_meta) = columns_meta.get(column_id) { let (offset, len) = column_meta.offset_length(); - let column_cache_key = TableDataCacheKey::new(location, *column_id, offset, len); + let column_cache_key = column_cache_key_builder.cache_cache(column_id, column_meta); // first, check in memory table data cache // column_array_cache @@ -91,13 +94,8 @@ impl BlockReader { .await?; if self.put_cache { - let table_data_cache = CacheManager::instance().get_table_data_cache(); // add raw data (compressed raw bytes) to column cache for (column_id, (chunk_idx, range)) in &merge_io_result.columns_chunk_offsets { - // Safe to unwrap here, since this column has been fetched, its meta must be present. - let column_meta = columns_meta.get(column_id).unwrap(); - let (offset, len) = column_meta.offset_length(); - // Should NOT use `range.start` as part of the cache key, // as they are not stable and can vary for the same column depending on the query's projection. // For instance: @@ -106,12 +104,15 @@ impl BlockReader { // may result in different ranges for `col2` // This can lead to cache missing or INCONSISTENCIES - let cache_key = TableDataCacheKey::new(location, *column_id, offset, len); + // Safe to unwrap here, since this column has been fetched, its meta must be present. + let column_meta = columns_meta.get(column_id).unwrap(); + let column_cache_key = column_cache_key_builder.cache_cache(column_id, column_meta); + let chunk_data = merge_io_result .owner_memory .get_chunk(*chunk_idx, &merge_io_result.block_path)?; let data = chunk_data.slice(range.clone()); - table_data_cache.insert(cache_key.as_ref().to_owned(), data); + column_data_cache.insert(column_cache_key.as_ref().to_owned(), data); } } @@ -123,3 +124,17 @@ impl BlockReader { Ok(block_read_res) } } + +struct ColumnCacheKeyBuilder<'a> { + block_path: &'a str, +} + +impl<'a> ColumnCacheKeyBuilder<'a> { + fn new(block_path: &'a str) -> Self { + Self { block_path } + } + fn cache_cache(&self, column_id: &ColumnId, column_meta: &ColumnMeta) -> TableDataCacheKey { + let (offset, len) = column_meta.offset_length(); + TableDataCacheKey::new(self.block_path, *column_id, offset, len) + } +}