From c590b8d931b86b2b59ebd6909485d4fa8c139825 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Fri, 30 Aug 2024 16:07:20 +0800 Subject: [PATCH 1/3] feat: introduce an intermediate version that is compatible with both the old version and vacuum2 --- Cargo.toml | 2 +- src/query/catalog/src/plan/internal_column.rs | 17 +++++------ src/query/catalog/src/plan/stream_column.rs | 22 ++++---------- .../common/table_meta/src/meta/mod.rs | 3 ++ .../common/table_meta/src/meta/utils.rs | 29 +++++++++++++++++++ .../common/table_meta/src/meta/v4/snapshot.rs | 5 ++++ src/query/storages/fuse/src/io/locations.rs | 18 +++++++++--- 7 files changed, 65 insertions(+), 31 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1b222ecd2953..6ce7957f9093 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -322,7 +322,7 @@ tonic = { version = "0.11.0", features = ["transport", "codegen", "prost", "tls- tonic-build = { version = "0.11" } tonic-reflection = { version = "0.11.0" } typetag = "0.2.3" -uuid = { version = "1.1.2", features = ["serde", "v4"] } +uuid = { version = "1.10.0", features = ["serde", "v4", "v7"] } walkdir = "2.3.2" xorfilter-rs = "0.5" diff --git a/src/query/catalog/src/plan/internal_column.rs b/src/query/catalog/src/plan/internal_column.rs index cf2c93635793..2872e0dc4df9 100644 --- a/src/query/catalog/src/plan/internal_column.rs +++ b/src/query/catalog/src/plan/internal_column.rs @@ -12,8 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::path::Path; - use databend_common_arrow::arrow::bitmap::MutableBitmap; use databend_common_exception::ErrorCode; use databend_common_exception::Result; @@ -44,6 +42,7 @@ use databend_common_expression::SEARCH_MATCHED_COLUMN_ID; use databend_common_expression::SEARCH_SCORE_COLUMN_ID; use databend_common_expression::SEGMENT_NAME_COLUMN_ID; use databend_common_expression::SNAPSHOT_NAME_COLUMN_ID; +use databend_storages_common_table_meta::meta::try_extract_uuid_str_from_path; use databend_storages_common_table_meta::meta::NUM_BLOCK_ID_BITS; // Segment and Block id Bits when generate internal column `_row_id` @@ -261,13 +260,13 @@ impl InternalColumn { ) } InternalColumnType::BaseRowId => { - let file_stem = Path::new(&meta.block_location).file_stem().unwrap(); - let file_strs = file_stem - .to_str() - .unwrap_or("") - .split('_') - .collect::>(); - let uuid = file_strs[0]; + let uuid = + try_extract_uuid_str_from_path(&meta.block_location).unwrap_or_else(|e| { + panic!( + "Internal error: block_location {} should be a valid table object key: {}", + &meta.block_location, e + ) + }); let mut row_ids = Vec::with_capacity(num_rows); if let Some(offsets) = &meta.offsets { for i in offsets { diff --git a/src/query/catalog/src/plan/stream_column.rs b/src/query/catalog/src/plan/stream_column.rs index 3368c33eae4c..fd10a1c80bd2 100644 --- a/src/query/catalog/src/plan/stream_column.rs +++ b/src/query/catalog/src/plan/stream_column.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::any::Any; -use std::path::Path; use std::sync::Arc; use databend_common_base::base::uuid::Uuid; @@ -40,6 +39,7 @@ use databend_common_expression::ORIGIN_BLOCK_ID_COLUMN_ID; use databend_common_expression::ORIGIN_BLOCK_ROW_NUM_COLUMN_ID; use databend_common_expression::ORIGIN_VERSION_COLUMN_ID; use databend_common_expression::ROW_VERSION_COLUMN_ID; +use databend_storages_common_table_meta::meta::try_extract_uuid_str_from_path; use crate::plan::PartInfo; use crate::plan::PartInfoPtr; @@ -222,22 +222,10 @@ impl StreamColumn { } pub fn block_id_from_location(path: &str) -> Result { - if let Some(file_stem) = Path::new(path).file_stem() { - let file_strs = file_stem - .to_str() - .unwrap_or("") - .split('_') - .collect::>(); - let block_id = Uuid::parse_str(file_strs[0]) - .map_err(|e| e.to_string())? - .as_u128(); - Ok(block_id as i128) - } else { - Err(ErrorCode::Internal(format!( - "Illegal meta file format: {}", - path - ))) - } + let uuid = try_extract_uuid_str_from_path(path) + .map_err(|e| e.add_message(format!("invalid block path {}", path)))?; + let block_id = Uuid::parse_str(uuid).map_err(|e| e.to_string())?.as_u128(); + Ok(block_id as i128) } pub fn gen_mutation_stream_meta( diff --git a/src/query/storages/common/table_meta/src/meta/mod.rs b/src/query/storages/common/table_meta/src/meta/mod.rs index 38ae8b5e03c3..5ec5d5a36c2a 100644 --- a/src/query/storages/common/table_meta/src/meta/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/mod.rs @@ -38,6 +38,9 @@ pub use statistics::*; // currently, used by versioned readers only pub(crate) use testing::*; pub use utils::parse_storage_prefix; +pub use utils::trim_vacuum2_object_prefix; +pub use utils::try_extract_uuid_str_from_path; +pub use utils::VACUUM2_OBJECT_KEY_PREFIX; pub(crate) use utils::*; pub use versions::testify_version; pub use versions::SegmentInfoVersion; diff --git a/src/query/storages/common/table_meta/src/meta/utils.rs b/src/query/storages/common/table_meta/src/meta/utils.rs index 99c274675878..02e2175d0bcc 100644 --- a/src/query/storages/common/table_meta/src/meta/utils.rs +++ b/src/query/storages/common/table_meta/src/meta/utils.rs @@ -14,6 +14,7 @@ use std::collections::BTreeMap; use std::ops::Add; +use std::path::Path; use chrono::DateTime; use chrono::Datelike; @@ -29,6 +30,7 @@ use crate::table::OPT_KEY_STORAGE_PREFIX; use crate::table::OPT_KEY_TEMP_PREFIX; const TEMP_TABLE_STORAGE_PREFIX: &str = "_tmp_tbl"; +pub const VACUUM2_OBJECT_KEY_PREFIX: &str = "g"; pub fn trim_timestamp_to_micro_second(ts: DateTime) -> DateTime { Utc.with_ymd_and_hms( @@ -78,3 +80,30 @@ pub fn parse_storage_prefix(options: &BTreeMap, table_id: u64) - } Ok(prefix) } + +#[inline] +pub fn trim_vacuum2_object_prefix(key: &str) -> &str { + key.strip_prefix(VACUUM2_OBJECT_KEY_PREFIX).unwrap_or(key) +} + +// Extracts the UUID part from the object key. +// For example, given a path like: +// bucket/root/115/122/_b/g0191114d30fd78b89fae8e5c88327725_v2.parquet +// bucket/root/115/122/_b/0191114d30fd78b89fae8e5c88327725_v2.parquet +// The function should return: 0191114d30fd78b89fae8e5c88327725 +pub fn try_extract_uuid_str_from_path(path: &str) -> databend_common_exception::Result<&str> { + if let Some(file_stem) = Path::new(path).file_stem() { + let file_name = file_stem + .to_str() + .unwrap() // path is always valid utf8 string + .split('_') + .collect::>(); + let uuid = trim_vacuum2_object_prefix(file_name[0]); + Ok(uuid) + } else { + Err(ErrorCode::StorageOther(format!( + "Illegal object key, no file stem found: {}", + path + ))) + } +} diff --git a/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs b/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs index fd484c1d3087..e6fc53ae206a 100644 --- a/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs +++ b/src/query/storages/common/table_meta/src/meta/v4/snapshot.rs @@ -87,6 +87,8 @@ pub struct TableSnapshot { /// The metadata of the cluster keys. pub cluster_key_meta: Option, pub table_statistics_location: Option, + + pub least_visible_timestamp: Option>, } impl TableSnapshot { @@ -120,6 +122,7 @@ impl TableSnapshot { segments, cluster_key_meta, table_statistics_location, + least_visible_timestamp: None, } } @@ -234,6 +237,7 @@ impl From for TableSnapshot { segments: s.segments, cluster_key_meta: s.cluster_key_meta, table_statistics_location: s.table_statistics_location, + least_visible_timestamp: None, } } } @@ -256,6 +260,7 @@ where T: Into segments: s.segments, cluster_key_meta: s.cluster_key_meta, table_statistics_location: s.table_statistics_location, + least_visible_timestamp: None, } } } diff --git a/src/query/storages/fuse/src/io/locations.rs b/src/query/storages/fuse/src/io/locations.rs index 64bbb81cc639..234e0ee27dd7 100644 --- a/src/query/storages/fuse/src/io/locations.rs +++ b/src/query/storages/fuse/src/io/locations.rs @@ -16,12 +16,15 @@ use std::marker::PhantomData; use databend_common_exception::Result; use databend_common_expression::DataBlock; +use databend_storages_common_table_meta::meta::trim_vacuum2_object_prefix; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SegmentInfo; use databend_storages_common_table_meta::meta::SnapshotVersion; use databend_storages_common_table_meta::meta::TableSnapshotStatisticsVersion; use databend_storages_common_table_meta::meta::Versioned; +use databend_storages_common_table_meta::meta::VACUUM2_OBJECT_KEY_PREFIX; use uuid::Uuid; +use uuid::Version; use crate::constants::FUSE_TBL_BLOCK_PREFIX; use crate::constants::FUSE_TBL_SEGMENT_PREFIX; @@ -34,7 +37,6 @@ use crate::FUSE_TBL_AGG_INDEX_PREFIX; use crate::FUSE_TBL_INVERTED_INDEX_PREFIX; use crate::FUSE_TBL_LAST_SNAPSHOT_HINT; use crate::FUSE_TBL_XOR_BLOOM_INDEX_PREFIX; - static SNAPSHOT_V0: SnapshotVersion = SnapshotVersion::V0(PhantomData); static SNAPSHOT_V1: SnapshotVersion = SnapshotVersion::V1(PhantomData); static SNAPSHOT_V2: SnapshotVersion = SnapshotVersion::V2(PhantomData); @@ -170,7 +172,7 @@ impl TableMetaLocationGenerator { let splits = loc.split('/').collect::>(); let len = splits.len(); let prefix = splits[..len - 2].join("/"); - let block_name = splits[len - 1]; + let block_name = trim_vacuum2_object_prefix(splits[len - 1]); format!("{prefix}/{FUSE_TBL_AGG_INDEX_PREFIX}/{index_id}/{block_name}") } @@ -182,7 +184,7 @@ impl TableMetaLocationGenerator { let splits = loc.split('/').collect::>(); let len = splits.len(); let prefix = splits[..len - 2].join("/"); - let block_name = splits[len - 1]; + let block_name = trim_vacuum2_object_prefix(splits[len - 1]); let id: String = block_name.chars().take(32).collect(); let short_ver: String = index_version.chars().take(7).collect(); format!( @@ -204,8 +206,16 @@ trait SnapshotLocationCreator { impl SnapshotLocationCreator for SnapshotVersion { fn create(&self, id: &Uuid, prefix: impl AsRef) -> String { + let vacuum_prefix = if id + .get_version() + .is_some_and(|v| matches!(v, Version::SortRand)) + { + VACUUM2_OBJECT_KEY_PREFIX + } else { + "" + }; format!( - "{}/{}/{}{}", + "{}/{}/{vacuum_prefix}{}{}", prefix.as_ref(), FUSE_TBL_SNAPSHOT_PREFIX, id.simple(), From fb3c4e78d084181f608c45ca7f38109e5559723b Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Wed, 4 Sep 2024 15:01:05 +0800 Subject: [PATCH 2/3] make lint --- src/query/storages/common/table_meta/src/meta/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/query/storages/common/table_meta/src/meta/mod.rs b/src/query/storages/common/table_meta/src/meta/mod.rs index b1a0572fc941..165c8bfe758f 100644 --- a/src/query/storages/common/table_meta/src/meta/mod.rs +++ b/src/query/storages/common/table_meta/src/meta/mod.rs @@ -40,8 +40,8 @@ pub(crate) use testing::*; pub use utils::parse_storage_prefix; pub use utils::trim_vacuum2_object_prefix; pub use utils::try_extract_uuid_str_from_path; -pub use utils::VACUUM2_OBJECT_KEY_PREFIX; pub use utils::TEMP_TABLE_STORAGE_PREFIX; +pub use utils::VACUUM2_OBJECT_KEY_PREFIX; pub(crate) use utils::*; pub use versions::testify_version; pub use versions::SegmentInfoVersion; From da8f9ace26053e05fe6da925ba1f864ff7be6161 Mon Sep 17 00:00:00 2001 From: sky <3374614481@qq.com> Date: Thu, 5 Sep 2024 15:08:33 +0800 Subject: [PATCH 3/3] add ut --- Cargo.lock | 1 + .../common/table_meta/src/meta/utils.rs | 38 +++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 0547f2ed6388..879b4d103179 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3634,6 +3634,7 @@ dependencies = [ "databend-common-meta-stoerr", "databend-common-meta-types", "databend-common-proto-conv", + "enumflags2", "fastrace", "futures", "log", diff --git a/src/query/storages/common/table_meta/src/meta/utils.rs b/src/query/storages/common/table_meta/src/meta/utils.rs index 14fa5b77c52f..f0d6f6edef11 100644 --- a/src/query/storages/common/table_meta/src/meta/utils.rs +++ b/src/query/storages/common/table_meta/src/meta/utils.rs @@ -107,3 +107,41 @@ pub fn try_extract_uuid_str_from_path(path: &str) -> databend_common_exception:: ))) } } + +#[cfg(test)] +mod tests { + use databend_common_base::base::uuid::Uuid; + + use super::*; + + #[test] + fn test_trim_vacuum2_object_prefix() { + let uuid = Uuid::now_v7(); + assert_eq!( + trim_vacuum2_object_prefix(&format!("g{}", uuid)), + uuid.to_string() + ); + assert_eq!( + trim_vacuum2_object_prefix(&uuid.to_string()), + uuid.to_string() + ); + } + + #[test] + fn test_try_extract_uuid_str_from_path() { + let test_cases = vec![ + ( + "bucket/root/115/122/_b/g0191114d30fd78b89fae8e5c88327725_v2.parquet", + "0191114d30fd78b89fae8e5c88327725", + ), + ( + "bucket/root/115/122/_b/0191114d30fd78b89fae8e5c88327725_v2.parquet", + "0191114d30fd78b89fae8e5c88327725", + ), + ]; + + for (input, expected) in test_cases { + assert_eq!(try_extract_uuid_str_from_path(input).unwrap(), expected); + } + } +}