Skip to content

Commit

Permalink
refactor: gc_dropped_db_by_id()
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Sep 28, 2024
1 parent 0189b01 commit b850e3f
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 79 deletions.
1 change: 1 addition & 0 deletions src/common/base/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod headers;
pub mod mem_allocator;
pub mod rangemap;
pub mod runtime;
pub mod vec_ext;
pub mod version;

pub use runtime::dump_backtrace;
Expand Down
27 changes: 27 additions & 0 deletions src/common/base/src/vec_ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

pub trait VecExt<T> {
/// Remove the first element that is equal to the given item.
fn remove_first(&mut self, item: &T) -> Option<T>
where T: PartialEq;
}

impl<T> VecExt<T> for Vec<T> {
fn remove_first(&mut self, item: &T) -> Option<T>
where T: PartialEq {
let pos = self.iter().position(|x| x == item)?;
Some(self.remove(pos))
}
}
143 changes: 64 additions & 79 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use chrono::DateTime;
use chrono::Utc;
use databend_common_base::base::uuid::Uuid;
use databend_common_base::display::display_slice::DisplaySliceExt;
use databend_common_base::vec_ext::VecExt;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::app_error::CommitTableMetaError;
use databend_common_meta_app::app_error::CreateAsDropTableWithoutDropTime;
Expand Down Expand Up @@ -175,7 +176,6 @@ use databend_common_meta_types::ConditionResult;
use databend_common_meta_types::MatchSeqExt;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::MetaId;
use databend_common_meta_types::TxnCondition;
use databend_common_meta_types::TxnGetRequest;
use databend_common_meta_types::TxnGetResponse;
use databend_common_meta_types::TxnOp;
Expand Down Expand Up @@ -218,6 +218,7 @@ use crate::util::deserialize_struct_get_response;
use crate::util::mget_pb_values;
use crate::util::txn_delete_exact;
use crate::util::txn_op_put_pb;
use crate::util::txn_put_pb;
use crate::util::txn_replace_exact;
use crate::util::unknown_database_error;
use crate::SchemaApi;
Expand Down Expand Up @@ -3707,100 +3708,84 @@ async fn gc_dropped_db_by_id(
db_name: String,
) -> Result<(), KVAppError> {
// List tables by tenant, db_id, table_name.
let dbid_idlist = DatabaseIdHistoryIdent::new(tenant, db_name);
let (db_id_list_seq, db_id_list_opt) = kv_api.get_pb_seq_and_value(&dbid_idlist).await?;
let db_id_history_ident = DatabaseIdHistoryIdent::new(tenant, db_name);
let Some(seq_dbid_list) = kv_api.get_pb(&db_id_history_ident).await? else {
return Ok(());
};

let mut db_id_list = seq_dbid_list.data;

// If the db_id is not in the list, return.
if db_id_list.id_list.remove_first(&db_id).is_none() {
return Ok(());
}

let mut db_id_list = match db_id_list_opt {
Some(list) => list,
None => return Ok(()),
let dbid = DatabaseId { db_id };
let Some(seq_db_meta) = kv_api.get_pb(&dbid).await? else {
return Ok(());
};
for (i, dbid) in db_id_list.id_list.iter().enumerate() {
if *dbid != db_id {
continue;
}
let dbid = DatabaseId { db_id };
let (db_meta_seq, _db_meta) = kv_api.get_pb_seq_and_value(&dbid).await?;
if db_meta_seq == 0 {
return Ok(());
}
let id_to_name = DatabaseIdToName { db_id };
let (name_ident_seq, _name_ident) = kv_api.get_pb_seq_and_value(&id_to_name).await?;
if name_ident_seq == 0 {
return Ok(());
}

let dbid_tbname_idlist = TableIdHistoryIdent {
database_id: db_id,
table_name: "".to_string(),
};
// TODO: enable this when gc_in_progress is set.
// if !seq_db_meta.gc_in_progress {
// let err = UnknownDatabaseId::new(
// db_id,
// "database is not in gc_in_progress state, \
// can not gc. \
// First mark the database as gc_in_progress, \
// then gc is allowed.",
// );
// return Err(AppError::from(err).into());
// }

let dir_name = DirName::new(dbid_tbname_idlist);
let id_to_name = DatabaseIdToName { db_id };
let Some(seq_name) = kv_api.get_pb(&id_to_name).await? else {
return Ok(());
};

let table_id_list_keys = list_keys(kv_api, &dir_name).await?;
let keys: Vec<String> = table_id_list_keys
.iter()
.map(|table_id_list_key| {
TableIdHistoryIdent {
database_id: db_id,
table_name: table_id_list_key.table_name.clone(),
}
.to_string_key()
})
.collect();
let table_history_ident = TableIdHistoryIdent {
database_id: db_id,
table_name: "dummy".to_string(),
};
let dir_name = DirName::new(table_history_ident);

let mut txn = TxnRequest::default();
let table_history_items = kv_api.list_pb_vec(&dir_name).await?;

for c in keys.chunks(DEFAULT_MGET_SIZE) {
let tb_id_list_seq_vec: Vec<(u64, Option<TableIdList>)> =
mget_pb_values(kv_api, c).await?;
let mut iter = c.iter();
for (tb_id_list_seq, tb_id_list_opt) in tb_id_list_seq_vec {
let tb_id_list = match tb_id_list_opt {
Some(list) => list,
None => {
continue;
}
};
let mut txn = TxnRequest::default();

for tb_id in tb_id_list.id_list {
let table_id_ident = TableId { table_id: tb_id };
remove_copied_files_for_dropped_table(kv_api, &table_id_ident).await?;
remove_data_for_dropped_table(kv_api, &table_id_ident, &mut txn).await?;
remove_index_for_dropped_table(kv_api, tenant, &table_id_ident, &mut txn)
.await?;
}
for (ident, table_history) in table_history_items {
for tb_id in table_history.id_list.iter() {
let table_id_ident = TableId { table_id: *tb_id };

let id_key = iter.next().unwrap();
txn.if_then.push(TxnOp::delete(id_key));
txn.condition
.push(TxnCondition::eq_seq(id_key, tb_id_list_seq));
}
// TODO: mark table as gc_in_progress

// for id_key in c {
// if_then.push(txn_op_del(id_key));
// }
}
db_id_list.id_list.remove(i);
txn.condition
.push(txn_cond_seq(&dbid_idlist, Eq, db_id_list_seq));
if db_id_list.id_list.is_empty() {
txn.if_then.push(txn_op_del(&dbid_idlist));
} else {
// save new db id list
txn.if_then
.push(txn_op_put(&dbid_idlist, serialize_struct(&db_id_list)?));
remove_copied_files_for_dropped_table(kv_api, &table_id_ident).await?;
remove_data_for_dropped_table(kv_api, &table_id_ident, &mut txn).await?;
remove_index_for_dropped_table(kv_api, tenant, &table_id_ident, &mut txn).await?;
}

txn.condition.push(txn_cond_seq(&dbid, Eq, db_meta_seq));
txn.if_then.push(txn_op_del(&dbid));
txn.condition
.push(txn_cond_seq(&id_to_name, Eq, name_ident_seq));
txn.if_then.push(txn_op_del(&id_to_name));
.push(txn_cond_eq_seq(&ident, table_history.seq));
txn.if_then.push(txn_op_del(&ident));
}

let _resp = kv_api.transaction(txn).await?;
break;
txn.condition
.push(txn_cond_eq_seq(&db_id_history_ident, seq_dbid_list.seq));
if db_id_list.id_list.is_empty() {
txn.if_then.push(txn_op_del(&db_id_history_ident));
} else {
// save new db id list
txn.if_then
.push(txn_put_pb(&db_id_history_ident, &db_id_list)?);
}

txn.condition.push(txn_cond_eq_seq(&dbid, seq_db_meta.seq));
txn.if_then.push(txn_op_del(&dbid));
txn.condition
.push(txn_cond_eq_seq(&id_to_name, seq_name.seq));
txn.if_then.push(txn_op_del(&id_to_name));

let _resp = kv_api.transaction(txn).await?;

Ok(())
}

Expand Down
14 changes: 14 additions & 0 deletions src/meta/api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,20 @@ pub fn txn_cond_seq(key: &impl kvapi::Key, op: ConditionResult, seq: u64) -> Txn
}
}

pub fn txn_put_pb<K>(key: &K, value: &K::ValueType) -> Result<TxnOp, InvalidArgument>
where
K: kvapi::Key,
K::ValueType: FromToProto + 'static,
{
let p = value.to_pb().map_err(|e| InvalidArgument::new(e, ""))?;

let mut buf = vec![];
prost::Message::encode(&p, &mut buf).map_err(|e| InvalidArgument::new(e, ""))?;

Ok(TxnOp::put(key.to_string_key(), buf))
}

/// Deprecate this. Replace it with `txn_put_pb().with_ttl()`
pub fn txn_op_put_pb<K>(
key: &K,
value: &K::ValueType,
Expand Down

0 comments on commit b850e3f

Please sign in to comment.