From c9e4f20bbb73ad17181c1fbce2b43e0cea3fc514 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E7=82=8E=E6=B3=BC?= Date: Wed, 18 Sep 2024 20:09:01 +0800 Subject: [PATCH] refactor: refactor vacuum (#16454) refactor: WIP: refactor vacuum --- src/meta/api/src/schema_api_impl.rs | 308 +++++++++--------- src/meta/api/src/schema_api_test_suite.rs | 22 +- src/meta/app/src/schema/table.rs | 13 +- .../builders/builder_copy_into_table.rs | 2 +- 4 files changed, 180 insertions(+), 165 deletions(-) diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 0cb6ce446bd8..97d76d5a2573 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -24,6 +24,7 @@ use std::time::Duration; use chrono::DateTime; use chrono::Utc; use databend_common_base::base::uuid::Uuid; +use databend_common_base::display::display_slice::DisplaySliceExt; use databend_common_meta_app::app_error::AppError; use databend_common_meta_app::app_error::CommitTableMetaError; use databend_common_meta_app::app_error::CreateAsDropTableWithoutDropTime; @@ -157,7 +158,6 @@ use databend_common_meta_app::schema::UpdateMultiTableMetaResult; use databend_common_meta_app::schema::UpdateStreamMetaReq; use databend_common_meta_app::schema::UpdateTableMetaReply; use databend_common_meta_app::schema::UpdateVirtualColumnReq; -use databend_common_meta_app::schema::UpsertTableCopiedFileReq; use databend_common_meta_app::schema::UpsertTableOptionReply; use databend_common_meta_app::schema::UpsertTableOptionReq; use databend_common_meta_app::schema::VirtualColumnIdent; @@ -185,9 +185,11 @@ use databend_common_meta_types::TxnOp; use databend_common_meta_types::TxnRequest; use databend_common_proto_conv::FromToProto; use fastrace::func_name; +use futures::StreamExt; use futures::TryStreamExt; use log::debug; use log::error; +use log::info; use log::warn; use ConditionResult::Eq; @@ -219,6 +221,7 @@ use crate::util::deserialize_struct_get_response; use crate::util::get_table_by_id_or_err; use crate::util::list_tables_from_unshare_db; use crate::util::mget_pb_values; +use crate::util::txn_delete_exact; use crate::util::txn_op_put_pb; use crate::util::unknown_database_error; use crate::SchemaApi; @@ -2390,7 +2393,7 @@ impl + ?Sized> SchemaApi for KV { } = req; let mut tbl_seqs = HashMap::new(); - let mut txn_req = TxnRequest::default(); + let mut txn = TxnRequest::default(); let mut mismatched_tbs = vec![]; let tid_vec = update_table_metas .iter() @@ -2436,13 +2439,10 @@ impl + ?Sized> SchemaApi for KV { new_table_meta.shared_by = table_meta.shared_by.clone(); tbl_seqs.insert(req.0.table_id, *tb_meta_seq); - txn_req - .condition - .push(txn_cond_seq(&tbid, Eq, *tb_meta_seq)); - txn_req - .if_then + txn.condition.push(txn_cond_seq(&tbid, Eq, *tb_meta_seq)); + txn.if_then .push(txn_op_put(&tbid, serialize_struct(&new_table_meta)?)); - txn_req.else_then.push(TxnOp { + txn.else_then.push(TxnOp { request: Some(Request::Get(TxnGetRequest { key: tbid.to_string_key(), })), @@ -2450,16 +2450,35 @@ impl + ?Sized> SchemaApi for KV { new_table_meta_map.insert(req.0.table_id, new_table_meta); } - for (tbid, req) in copied_files { - let tbid = TableId { table_id: tbid }; - let (conditions, match_operations) = build_upsert_table_copied_file_info_conditions( - &tbid, - &req, - tbl_seqs[&tbid.table_id], - req.fail_if_duplicated, - )?; - txn_req.condition.extend(conditions); - txn_req.if_then.extend(match_operations) + + // `remove_table_copied_files` and `upsert_table_copied_file_info` + // all modify `TableCopiedFileInfo`, + // so there used to has `TableCopiedFileLockKey` in these two functions + // to protect TableCopiedFileInfo modification. + // In issue: https://github.com/datafuselabs/databend/issues/8897, + // there is chance that if copy files concurrently, `upsert_table_copied_file_info` + // may return `TxnRetryMaxTimes`. + // So now, in case that `TableCopiedFileInfo` has expire time, remove `TableCopiedFileLockKey` + // in each function. In this case there is chance that some `TableCopiedFileInfo` may not be + // removed in `remove_table_copied_files`, but these data can be purged in case of expire time. + + for (table_id, req) in copied_files { + let tbid = TableId { table_id }; + + let table_meta_seq = tbl_seqs[&tbid.table_id]; + txn.condition.push(txn_cond_eq_seq(&tbid, table_meta_seq)); + + for (file_name, file_info) in req.file_info { + let key = TableCopiedFileNameIdent { + table_id: tbid.table_id, + file: file_name, + }; + + if req.insert_if_not_exists { + txn.condition.push(txn_cond_eq_seq(&key, 0)); + } + txn.if_then.push(txn_op_put_pb(&key, &file_info, req.ttl)?) + } } let sid_vec = update_stream_metas @@ -2500,20 +2519,17 @@ impl + ?Sized> SchemaApi for KV { new_stream_meta.options = req.options.clone(); new_stream_meta.updated_on = Utc::now(); - txn_req - .condition + txn.condition .push(txn_cond_seq(&stream_id, Eq, stream_meta_seq)); - txn_req - .if_then + txn.if_then .push(txn_op_put(&stream_id, serialize_struct(&new_stream_meta)?)); } for deduplicated_label in deduplicated_labels { - txn_req - .if_then + txn.if_then .push(build_upsert_table_deduplicated_label(deduplicated_label)); } - let (succ, responses) = send_txn(self, txn_req).await?; + let (succ, responses) = send_txn(self, txn).await?; if succ { return Ok(Ok(UpdateTableMetaReply {})); } @@ -3521,47 +3537,69 @@ async fn drop_database_meta( Ok(*seq_db_id.data) } -/// remove copied files for a table. +/// Remove copied files for a dropped table. /// -/// Returns number of files that are going to be removed. -async fn remove_table_copied_files( +/// Dropped table can not be accessed by any query, +/// so it is safe to remove all the copied files in multiple sub transactions. +async fn remove_copied_files_for_dropped_table( kv_api: &(impl kvapi::KVApi + ?Sized), - table_id: u64, - txn: &mut TxnRequest, -) -> Result { - let mut n = 0; - let chunk_size = DEFAULT_MGET_SIZE; - - // `list_keys` list all the `TableCopiedFileNameIdent` of the table. - // But if a upsert_table_copied_file_info run concurrently, there is chance that - // `list_keys` may lack of some new inserted TableCopiedFileNameIdent. - // But since TableCopiedFileNameIdent has expire time, they can be purged by expire time. - let copied_files = list_table_copied_files(kv_api, table_id).await?; - - for chunk in copied_files.chunks(chunk_size) { - // Load the `seq` of every copied file - let seqs = { - let str_keys: Vec<_> = chunk.iter().map(|f| f.to_string_key()).collect(); + table_id: &TableId, +) -> Result<(), MetaError> { + let batch_size = 1024; + + // Loop until: + // - all cleaned + // - or table is removed from meta-service + // - or is no longer in `droppped` state. + for i in 0..usize::MAX { + let mut txn = TxnRequest::default(); + + let seq_meta = kv_api.get_pb(table_id).await?; + let Some(seq_table_meta) = seq_meta else { + return Ok(()); + }; - let seq_infos: Vec<(u64, Option)> = - mget_pb_values(kv_api, &str_keys).await?; + // TODO: enable this check. Currently when gc db, the table may not be dropped. + // if seq_table_meta.data.drop_on.is_none() { + // return Ok(()); + // } - seq_infos.into_iter().map(|(seq, _)| seq) + // Make sure the table meta is not changed, such as being un-dropped. + txn.condition + .push(txn_cond_eq_seq(table_id, seq_table_meta.seq)); + + let copied_file_ident = TableCopiedFileNameIdent { + table_id: table_id.table_id, + file: "dummy".to_string(), }; - for (copied_seq, copied_ident) in seqs.zip(chunk) { - if copied_seq == 0 { - continue; - } + let dir_name = DirName::new(copied_file_ident); - txn.condition - .push(txn_cond_seq(copied_ident, Eq, copied_seq)); + let key_stream = kv_api.list_pb_keys(&dir_name).await?; + let copied_files = key_stream.take(batch_size).try_collect::>().await?; + + if copied_files.is_empty() { + return Ok(()); + } + + for copied_ident in copied_files.iter() { + // It is a dropped table, thus there is no data will be written to the table. + // Therefore, we only need to assert the table_meta seq, and there is no need to assert + // seq of each copied file. txn.if_then.push(txn_op_del(copied_ident)); - n += 1; } - } - Ok(n) + info!( + "remove_copied_files_for_dropped_table {}: {}-th batch remove: {} items: {}", + table_id, + i, + copied_files.len(), + copied_files.display() + ); + + send_txn(kv_api, txn).await?; + } + unreachable!() } /// List the copied file identities belonging to a table. @@ -3685,46 +3723,6 @@ fn table_has_to_not_exist( } } -fn build_upsert_table_copied_file_info_conditions( - table_id: &TableId, - req: &UpsertTableCopiedFileReq, - tb_meta_seq: u64, - fail_if_duplicated: bool, -) -> Result<(Vec, Vec), KVAppError> { - let mut condition = vec![txn_cond_seq(table_id, Eq, tb_meta_seq)]; - let mut if_then = vec![]; - - // `remove_table_copied_files` and `upsert_table_copied_file_info` - // all modify `TableCopiedFileInfo`, - // so there used to has `TableCopiedFileLockKey` in these two functions - // to protect TableCopiedFileInfo modification. - // In issue: https://github.com/datafuselabs/databend/issues/8897, - // there is chance that if copy files concurrently, `upsert_table_copied_file_info` - // may return `TxnRetryMaxTimes`. - // So now, in case that `TableCopiedFileInfo` has expire time, remove `TableCopiedFileLockKey` - // in each function. In this case there is chance that some `TableCopiedFileInfo` may not be - // removed in `remove_table_copied_files`, but these data can be purged in case of expire time. - - let file_name_infos = req.file_info.clone().into_iter(); - - for (file_name, file_info) in file_name_infos { - let key = TableCopiedFileNameIdent { - table_id: table_id.table_id, - file: file_name.to_owned(), - }; - if fail_if_duplicated { - // "fail_if_duplicated" mode, assumes files are absent - condition.push(txn_cond_seq(&key, Eq, 0)); - } - if_then.push(TxnOp::put_with_ttl( - key.to_string_key(), - serialize_struct(&file_info)?, - req.ttl, - )) - } - Ok((condition, if_then)) -} - fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp { TxnOp::put_with_ttl( deduplicated_label, @@ -3994,8 +3992,11 @@ async fn gc_dropped_db_by_id( }; for tb_id in tb_id_list.id_list { - gc_dropped_table_data(kv_api, tb_id, &mut txn).await?; - gc_dropped_table_index(kv_api, tenant, tb_id, &mut txn).await?; + let table_id_ident = TableId { table_id: tb_id }; + remove_copied_files_for_dropped_table(kv_api, &table_id_ident).await?; + remove_data_for_dropped_table(kv_api, &table_id_ident, &mut txn).await?; + remove_index_for_dropped_table(kv_api, tenant, &table_id_ident, &mut txn) + .await?; } let id_key = iter.next().unwrap(); @@ -4040,100 +4041,103 @@ async fn gc_dropped_table_by_id( table_name: String, ) -> Result<(), KVAppError> { // first get TableIdList - let dbid_tbname_idlist = TableIdHistoryIdent { + let table_id_history_ident = TableIdHistoryIdent { database_id: db_id, table_name, }; - let (tb_id_list_seq, tb_id_list_opt): (_, Option) = - get_pb_value(kv_api, &dbid_tbname_idlist).await?; - let mut tb_id_list = match tb_id_list_opt { - Some(list) => list, - None => return Ok(()), + + let seq_id_list = kv_api.get_pb(&table_id_history_ident).await?; + + let Some(seq_id_list) = seq_id_list else { + return Ok(()); }; - for (i, tb_id) in tb_id_list.id_list.iter().enumerate() { - if *tb_id != table_id { - continue; - } + let seq = seq_id_list.seq; + let mut tb_id_list = seq_id_list.data; - tb_id_list.id_list.remove(i); + // remove table_id from tb_id_list: + { + let index = tb_id_list.id_list.iter().position(|&x| x == table_id); + let Some(index) = index else { + return Ok(()); + }; - let mut txn = TxnRequest::default(); + tb_id_list.id_list.remove(index); + } - // construct the txn request - txn.condition.push( - // condition: table id list not changed - txn_cond_seq(&dbid_tbname_idlist, Eq, tb_id_list_seq), - ); + let mut txn = TxnRequest::default(); - if tb_id_list.id_list.is_empty() { - txn.if_then.push(txn_op_del(&dbid_tbname_idlist)); - } else { - // save new table id list - txn.if_then.push(txn_op_put( - &dbid_tbname_idlist, - serialize_struct(&tb_id_list)?, - )); - } - gc_dropped_table_data(kv_api, table_id, &mut txn).await?; - gc_dropped_table_index(kv_api, tenant, table_id, &mut txn).await?; + // construct the txn request + txn.condition.push( + // condition: table id list not changed + txn_cond_eq_seq(&table_id_history_ident, seq), + ); - let _resp = kv_api.transaction(txn).await?; - break; + if tb_id_list.id_list.is_empty() { + txn.if_then.push(txn_op_del(&table_id_history_ident)); + } else { + // save new table id list + txn.if_then + .push(txn_op_put_pb(&table_id_history_ident, &tb_id_list, None)?); } + let table_id_ident = TableId { table_id }; + remove_copied_files_for_dropped_table(kv_api, &table_id_ident).await?; + remove_data_for_dropped_table(kv_api, &table_id_ident, &mut txn).await?; + remove_index_for_dropped_table(kv_api, tenant, &table_id_ident, &mut txn).await?; + + let _resp = kv_api.transaction(txn).await?; + Ok(()) } -async fn gc_dropped_table_data( +async fn remove_data_for_dropped_table( kv_api: &(impl kvapi::KVApi + ?Sized), - table_id: u64, + table_id: &TableId, txn: &mut TxnRequest, ) -> Result<(), KVAppError> { - let tbid = TableId { table_id }; - let id_to_name = TableIdToName { table_id }; + let seq_meta = kv_api.get_pb(table_id).await?; - // Get meta data - let (tb_meta_seq, tb_meta): (_, Option) = get_pb_value(kv_api, &tbid).await?; - - if tb_meta_seq == 0 || tb_meta.is_none() { + let Some(seq_meta) = seq_meta else { error!( "gc_dropped_table_by_id cannot find {:?} table_meta", table_id ); return Ok(()); - } + }; + + // TODO: enable this check. Currently when gc db, the table may not be dropped. + // if seq_meta.data.drop_on.is_none() { + // warn!("gc_dropped_table_by_id {:?} is not dropped", table_id); + // return Ok(()); + // } + + txn_delete_exact(txn, table_id, seq_meta.seq); // Get id -> name mapping - let (name_seq, _name): (_, Option) = get_pb_value(kv_api, &id_to_name).await?; + let id_to_name = TableIdToName { + table_id: table_id.table_id, + }; + let seq_name = kv_api.get_pb(&id_to_name).await?; - // table id not changed - txn.condition.push(txn_cond_seq(&tbid, Eq, tb_meta_seq)); // consider only when TableIdToName exist - if name_seq != 0 { - // table id to name not changed - txn.condition.push(txn_cond_seq(&id_to_name, Eq, name_seq)); - // remove table id to name - txn.if_then.push(txn_op_del(&id_to_name)); + if let Some(seq_name) = seq_name { + txn_delete_exact(txn, &id_to_name, seq_name.seq); } - // remove table meta - txn.if_then.push(txn_op_del(&tbid)); - - remove_table_copied_files(kv_api, table_id, txn).await?; Ok(()) } -async fn gc_dropped_table_index( +async fn remove_index_for_dropped_table( kv_api: &(impl kvapi::KVApi + ?Sized), tenant: &Tenant, - table_id: u64, + table_id: &TableId, txn: &mut TxnRequest, ) -> Result<(), KVAppError> { let name_id_metas = kv_api .list_indexes(ListIndexesReq { tenant: tenant.clone(), - table_id: Some(table_id), + table_id: Some(table_id.table_id), }) .await?; diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index 917b99ee945a..45d20efbea1c 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -2599,7 +2599,7 @@ impl SchemaApiTestSuite { let upsert_source_table = UpsertTableCopiedFileReq { file_info, ttl: None, - fail_if_duplicated: true, + insert_if_not_exists: true, }; let req = UpdateTableMetaReq { @@ -2649,7 +2649,7 @@ impl SchemaApiTestSuite { let upsert_source_table = UpsertTableCopiedFileReq { file_info, ttl: None, - fail_if_duplicated: true, + insert_if_not_exists: true, }; let req = UpdateTableMetaReq { table_id, @@ -2698,7 +2698,7 @@ impl SchemaApiTestSuite { let upsert_source_table = UpsertTableCopiedFileReq { file_info, ttl: None, - fail_if_duplicated: true, + insert_if_not_exists: true, }; let req = UpdateTableMetaReq { table_id, @@ -3558,7 +3558,7 @@ impl SchemaApiTestSuite { let copied_file_req = UpsertTableCopiedFileReq { file_info: file_info.clone(), ttl: Some(std::time::Duration::from_secs(86400)), - fail_if_duplicated: true, + insert_if_not_exists: true, }; let req = UpdateTableMetaReq { @@ -3722,7 +3722,7 @@ impl SchemaApiTestSuite { let copied_file_req = UpsertTableCopiedFileReq { file_info: file_info.clone(), ttl: Some(std::time::Duration::from_secs(86400)), - fail_if_duplicated: true, + insert_if_not_exists: true, }; let req = UpdateTableMetaReq { @@ -5734,7 +5734,7 @@ impl SchemaApiTestSuite { let copied_file_req = UpsertTableCopiedFileReq { file_info: file_info.clone(), ttl: Some(std::time::Duration::from_secs(86400)), - fail_if_duplicated: true, + insert_if_not_exists: true, }; let req = UpdateTableMetaReq { @@ -5784,7 +5784,7 @@ impl SchemaApiTestSuite { file_info: file_info.clone(), // Make it expire at once. ttl: Some(std::time::Duration::from_secs(0)), - fail_if_duplicated: true, + insert_if_not_exists: true, }; let req = UpdateTableMetaReq { @@ -7213,7 +7213,7 @@ impl SchemaApiTestSuite { let copied_file_req = UpsertTableCopiedFileReq { file_info: file_info.clone(), ttl: Some(std::time::Duration::from_secs(86400)), - fail_if_duplicated: true, + insert_if_not_exists: true, }; let req = UpdateTableMetaReq { @@ -7271,7 +7271,7 @@ impl SchemaApiTestSuite { let copied_file_req = UpsertTableCopiedFileReq { file_info: file_info.clone(), ttl: Some(std::time::Duration::from_secs(86400)), - fail_if_duplicated: true, + insert_if_not_exists: true, }; let req = UpdateTableMetaReq { @@ -7326,7 +7326,7 @@ impl SchemaApiTestSuite { let copied_file_req = UpsertTableCopiedFileReq { file_info: file_info.clone(), ttl: Some(std::time::Duration::from_secs(86400)), - fail_if_duplicated: false, + insert_if_not_exists: false, }; let req = UpdateTableMetaReq { @@ -7687,7 +7687,7 @@ where MT: SchemaApi + kvapi::AsKVApi let copied_file_req = UpsertTableCopiedFileReq { file_info: file_infos.clone(), ttl: Some(std::time::Duration::from_secs(86400)), - fail_if_duplicated: true, + insert_if_not_exists: true, }; let req = UpdateTableMetaReq { diff --git a/src/meta/app/src/schema/table.rs b/src/meta/app/src/schema/table.rs index b083c0e13535..dab3a40c2a35 100644 --- a/src/meta/app/src/schema/table.rs +++ b/src/meta/app/src/schema/table.rs @@ -961,6 +961,16 @@ pub struct TableCopiedFileNameIdent { pub file: String, } +impl fmt::Display for TableCopiedFileNameIdent { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "TableCopiedFileNameIdent{{table_id:{}, file:{}}}", + self.table_id, self.file + ) + } +} + #[derive(Clone, Debug, Eq, PartialEq, Default)] pub struct TableCopiedFileInfo { pub etag: Option, @@ -984,7 +994,8 @@ pub struct UpsertTableCopiedFileReq { pub file_info: BTreeMap, /// If not None, specifies the time-to-live for the keys. pub ttl: Option, - pub fail_if_duplicated: bool, + /// If there is already existing key, ignore inserting + pub insert_if_not_exists: bool, } #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/src/query/service/src/pipelines/builders/builder_copy_into_table.rs b/src/query/service/src/pipelines/builders/builder_copy_into_table.rs index ecf20454f27f..361cb573b406 100644 --- a/src/query/service/src/pipelines/builders/builder_copy_into_table.rs +++ b/src/query/service/src/pipelines/builders/builder_copy_into_table.rs @@ -231,7 +231,7 @@ impl PipelineBuilder { let req = UpsertTableCopiedFileReq { file_info: copied_file_tree, ttl: Some(Duration::from_hours(expire_hours)), - fail_if_duplicated: !force, + insert_if_not_exists: !force, }; Some(req) }