From e11a5f5d319b688ea3798aa5394522e3a2efa203 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 19 Oct 2022 20:53:27 +0800 Subject: [PATCH 1/2] fix: "optimize purge" may incorrectly delete data of pending txs --- src/query/storages/fuse/src/io/snapshots.rs | 8 +++++++- src/query/storages/fuse/src/operations/gc.rs | 8 ++++---- .../src/table_functions/fuse_snapshots/fuse_snapshot.rs | 2 +- 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/query/storages/fuse/src/io/snapshots.rs b/src/query/storages/fuse/src/io/snapshots.rs index b395935909f9..5ef3ac473c19 100644 --- a/src/query/storages/fuse/src/io/snapshots.rs +++ b/src/query/storages/fuse/src/io/snapshots.rs @@ -17,6 +17,8 @@ use std::collections::HashSet; use std::path::Path; use std::sync::Arc; +use chrono::DateTime; +use chrono::Utc; use common_base::base::tokio::sync::Semaphore; use common_base::base::Runtime; use common_catalog::table_context::TableContext; @@ -113,7 +115,8 @@ impl SnapshotsIO { &self, root_snapshot_file: String, limit: Option, - with_segment_locations: bool, // Return segment location or not + with_segment_locations: bool, + min_snapshot_timestamp: Option>, ) -> Result<(Vec, HashSet)> { let ctx = self.ctx.clone(); let data_accessor = self.operator.clone(); @@ -138,6 +141,9 @@ impl SnapshotsIO { info!("Finish to read_snapshots, chunk:[{}]", idx); for snapshot in results.into_iter().flatten() { + if snapshot.timestamp > min_snapshot_timestamp { + continue; + } let snapshot_lite = TableSnapshotLite::from(snapshot.as_ref()); snapshot_lites.push(snapshot_lite); diff --git a/src/query/storages/fuse/src/operations/gc.rs b/src/query/storages/fuse/src/operations/gc.rs index 781f88cd9558..eef933f1367f 100644 --- a/src/query/storages/fuse/src/operations/gc.rs +++ b/src/query/storages/fuse/src/operations/gc.rs @@ -53,14 +53,14 @@ impl FuseTable { // 1. Root snapshot. let mut segments_referenced_by_root = HashSet::new(); let mut blocks_referenced_by_root = HashSet::new(); - let root_snapshot_id = if let Some(root_snapshot) = snapshot_opt { + let (root_snapshot_id, root_snapshot_ts) = if let Some(root_snapshot) = snapshot_opt { let segments = root_snapshot.segments.clone(); blocks_referenced_by_root = self.get_block_locations(ctx.clone(), &segments).await?; segments_referenced_by_root = HashSet::from_iter(segments); - root_snapshot.snapshot_id + (root_snapshot.snapshot_id, root_snapshot.timestamp) } else { - SnapshotId::new_v4() + (SnapshotId::new_v4(), None) }; // 2. Get all snapshot(including root snapshot). @@ -73,7 +73,7 @@ impl FuseTable { self.snapshot_format_version().await?, ); (all_snapshot_lites, all_segment_locations) = snapshots_io - .read_snapshot_lites(root_snapshot_location, None, true) + .read_snapshot_lites(root_snapshot_location, None, true, root_snapshot_ts) .await?; } diff --git a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs index 379a74570ac0..ba9a4f28628a 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs @@ -45,7 +45,7 @@ impl<'a> FuseSnapshot<'a> { snapshot_version, ); let (snapshots, _) = snapshots_io - .read_snapshot_lites(snapshot_location, limit, false) + .read_snapshot_lites(snapshot_location, limit, false, None) .await?; return self.to_block(&meta_location_generator, &snapshots, snapshot_version); } From d89a06d457e747dcd5f8274ecbdf9b8feb9485b8 Mon Sep 17 00:00:00 2001 From: dantengsky Date: Wed, 19 Oct 2022 21:28:23 +0800 Subject: [PATCH 2/2] fix ut --- .../storages/fuse/table_functions/fuse_snapshot_table.rs | 1 + .../src/table_functions/fuse_snapshots/fuse_snapshot.rs | 8 +++++++- tests/logictest/http_connector.py | 3 +-- tests/logictest/http_runner.py | 1 + 4 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs b/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs index 469526f2556c..1043656c5cfa 100644 --- a/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs +++ b/src/query/service/tests/it/storages/fuse/table_functions/fuse_snapshot_table.rs @@ -147,6 +147,7 @@ async fn test_fuse_snapshot_table_read() -> Result<()> { } { + // previously, inserted 5 blocks, 3 rows per block // another 5 blocks, 15 rows here append_sample_data(5, &fixture).await?; let expected = vec![ diff --git a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs index ba9a4f28628a..415dc5b325c9 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_snapshots/fuse_snapshot.rs @@ -37,6 +37,7 @@ impl<'a> FuseSnapshot<'a> { pub async fn get_snapshots(self, limit: Option) -> Result { let meta_location_generator = self.table.meta_location_generator.clone(); let snapshot_location = self.table.snapshot_loc().await?; + let snapshot = self.table.read_table_snapshot(self.ctx.clone()).await?; if let Some(snapshot_location) = snapshot_location { let snapshot_version = self.table.snapshot_format_version().await?; let snapshots_io = SnapshotsIO::create( @@ -45,7 +46,12 @@ impl<'a> FuseSnapshot<'a> { snapshot_version, ); let (snapshots, _) = snapshots_io - .read_snapshot_lites(snapshot_location, limit, false, None) + .read_snapshot_lites( + snapshot_location, + limit, + false, + snapshot.and_then(|s| s.timestamp), + ) .await?; return self.to_block(&meta_location_generator, &snapshots, snapshot_version); } diff --git a/tests/logictest/http_connector.py b/tests/logictest/http_connector.py index 13aaa44076f2..8eb834415af4 100644 --- a/tests/logictest/http_connector.py +++ b/tests/logictest/http_connector.py @@ -160,8 +160,7 @@ def query_with_session(self, statement): while response['next_uri'] is not None: resp = self.next_page(response['next_uri']) response = json.loads(resp.content) - log.debug( - f"Sql in progress, fetch next_uri content: {response}") + log.debug(f"Sql in progress, fetch next_uri content: {response}") check_error(response) session = response['session'] if session: diff --git a/tests/logictest/http_runner.py b/tests/logictest/http_runner.py index a02ddcc3096c..8e56b2efebdc 100644 --- a/tests/logictest/http_runner.py +++ b/tests/logictest/http_runner.py @@ -4,6 +4,7 @@ import http_connector from mysql.connector.errors import Error + class TestHttp(logictest.SuiteRunner, ABC): def __init__(self, kind, args):