Skip to content

Commit

Permalink
refactor: simplify vacuum drop table handling
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Sep 12, 2024
1 parent 2ca0036 commit 4ae07ca
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 126 deletions.
8 changes: 2 additions & 6 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ use databend_common_meta_app::schema::TruncateTableReq;
use databend_common_meta_app::schema::UndropDatabaseReply;
use databend_common_meta_app::schema::UndropDatabaseReq;
use databend_common_meta_app::schema::UndropTableByIdReq;
use databend_common_meta_app::schema::UndropTableReply;
use databend_common_meta_app::schema::UndropTableReq;
use databend_common_meta_app::schema::UpdateDictionaryReply;
use databend_common_meta_app::schema::UpdateDictionaryReq;
Expand Down Expand Up @@ -189,12 +188,9 @@ pub trait SchemaApi: Send + Sync {
req: CommitTableMetaReq,
) -> Result<CommitTableMetaReply, KVAppError>;

async fn undrop_table(&self, req: UndropTableReq) -> Result<UndropTableReply, KVAppError>;
async fn undrop_table(&self, req: UndropTableReq) -> Result<(), KVAppError>;

async fn undrop_table_by_id(
&self,
req: UndropTableByIdReq,
) -> Result<UndropTableReply, KVAppError>;
async fn undrop_table_by_id(&self, req: UndropTableByIdReq) -> Result<(), KVAppError>;

async fn rename_table(&self, req: RenameTableReq) -> Result<RenameTableReply, KVAppError>;

Expand Down
159 changes: 70 additions & 89 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,6 @@ use databend_common_meta_app::schema::TruncateTableReq;
use databend_common_meta_app::schema::UndropDatabaseReply;
use databend_common_meta_app::schema::UndropDatabaseReq;
use databend_common_meta_app::schema::UndropTableByIdReq;
use databend_common_meta_app::schema::UndropTableReply;
use databend_common_meta_app::schema::UndropTableReq;
use databend_common_meta_app::schema::UpdateDictionaryReply;
use databend_common_meta_app::schema::UpdateDictionaryReq;
Expand Down Expand Up @@ -210,6 +209,7 @@ use crate::send_txn;
use crate::serialize_struct;
use crate::serialize_u64;
use crate::txn_backoff::txn_backoff;
use crate::txn_cond_eq_seq;
use crate::txn_cond_seq;
use crate::txn_op_del;
use crate::txn_op_get;
Expand All @@ -220,6 +220,7 @@ use crate::util::deserialize_struct_get_response;
use crate::util::get_table_by_id_or_err;
use crate::util::list_tables_from_unshare_db;
use crate::util::mget_pb_values;
use crate::util::txn_op_put_pb;
use crate::util::unknown_database_error;
use crate::SchemaApi;
use crate::DEFAULT_MGET_SIZE;
Expand Down Expand Up @@ -1361,17 +1362,14 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

#[logcall::logcall]
#[fastrace::trace]
async fn undrop_table(&self, req: UndropTableReq) -> Result<UndropTableReply, KVAppError> {
async fn undrop_table(&self, req: UndropTableReq) -> Result<(), KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());
handle_undrop_table(self, req).await
}

#[logcall::logcall]
#[fastrace::trace]
async fn undrop_table_by_id(
&self,
req: UndropTableByIdReq,
) -> Result<UndropTableReply, KVAppError> {
async fn undrop_table_by_id(&self, req: UndropTableByIdReq) -> Result<(), KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());
handle_undrop_table(self, req).await
}
Expand Down Expand Up @@ -3753,51 +3751,52 @@ fn build_upsert_table_deduplicated_label(deduplicated_label: String) -> TxnOp {
#[fastrace::trace]
async fn batch_filter_table_info(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
inner_keys: &[String],
filter_db_info_with_table_name_list: &[(&TableInfoFilter, &Arc<DatabaseInfo>, u64, &String)],
filter_tb_infos: &mut Vec<(Arc<TableInfo>, u64)>,
) -> Result<(), KVAppError> {
let tb_meta_vec: Vec<(u64, Option<TableMeta>)> = mget_pb_values(kv_api, inner_keys).await?;
for (i, (tb_meta_seq, tb_meta)) in tb_meta_vec.iter().enumerate() {
let (filter, db_info, table_id, table_name) = filter_db_info_with_table_name_list[i];
if *tb_meta_seq == 0 || tb_meta.is_none() {
error!("get_table_history cannot find {:?} table_meta", table_id);
let table_id_idents = filter_db_info_with_table_name_list
.iter()
.map(|(_f, _db, table_id, _table_name)| TableId::new(*table_id));

let strm = kv_api.get_pb_values(table_id_idents).await?;
let seq_metas = strm.try_collect::<Vec<_>>().await?;

for (seq_meta, (filter, db_info, table_id, table_name)) in seq_metas
.into_iter()
.zip(filter_db_info_with_table_name_list.into_iter())
{
let Some(seq_meta) = seq_meta else {
error!(
"batch_filter_table_info cannot find {:?} table_meta",
table_id
);
continue;
}
// Safe unwrap() because: tb_meta_seq > 0
let tb_meta = tb_meta.clone().unwrap();
};

if let TableInfoFilter::Dropped(drop_on) = filter {
if let Some(drop_on) = drop_on {
if let Some(meta_drop_on) = &tb_meta.drop_on {
if let Some(meta_drop_on) = &seq_meta.drop_on {
if meta_drop_on.timestamp_millis() >= drop_on.timestamp_millis() {
continue;
}
} else {
continue;
}
} else if tb_meta.drop_on.is_none() {
continue;
} else {
if seq_meta.drop_on.is_none() {
continue;
}
}
}

let db_ident = &db_info.name_ident;

let tenant_dbname_tbname: TableNameIdent =
TableNameIdent::new(db_ident.tenant(), db_ident.database_name(), table_name);

let tb_info = TableInfo {
ident: TableIdent {
table_id,
seq: *tb_meta_seq,
table_id: *table_id,
seq: seq_meta.seq,
},
desc: format!(
"'{}'.'{}'",
db_ident.database_name(),
tenant_dbname_tbname.table_name
),
name: table_name.clone(),
meta: tb_meta,
desc: format!("'{}'.'{}'", db_info.name_ident.database_name(), table_name,),
name: (*table_name).clone(),
meta: seq_meta.data,
db_type: DatabaseType::NormalDB,
catalog_info: Default::default(),
};
Expand All @@ -3818,7 +3817,7 @@ async fn get_gc_table_info(
table_id_list: &TableFilterInfoList<'_>,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let mut filter_tb_infos = vec![];
let mut inner_keys: Vec<String> = vec![];

let mut filter_db_info_with_table_name_list: Vec<(
&TableInfoFilter,
&Arc<DatabaseInfo>,
Expand All @@ -3828,25 +3827,17 @@ async fn get_gc_table_info(

for (filter, db_info, table_id, table_name) in table_id_list {
filter_db_info_with_table_name_list.push((filter, db_info, *table_id, table_name));
inner_keys.push(
TableId {
table_id: *table_id,
}
.to_string_key(),
);
if inner_keys.len() < DEFAULT_MGET_SIZE {
if filter_db_info_with_table_name_list.len() < DEFAULT_MGET_SIZE {
continue;
}

batch_filter_table_info(
kv_api,
&inner_keys,
&filter_db_info_with_table_name_list,
&mut filter_tb_infos,
)
.await?;

inner_keys.clear();
filter_db_info_with_table_name_list.clear();

// check if reach the limit
Expand All @@ -3857,10 +3848,9 @@ async fn get_gc_table_info(
}
}

if !inner_keys.is_empty() {
if !filter_db_info_with_table_name_list.is_empty() {
batch_filter_table_info(
kv_api,
&inner_keys,
&filter_db_info_with_table_name_list,
&mut filter_tb_infos,
)
Expand All @@ -3885,6 +3875,7 @@ async fn do_get_table_history(
&Arc<DatabaseInfo>,
TableIdHistoryIdent,
)> = vec![];

let (filter, db_info) = db_filter;
let db_id = db_info.database_id.db_id;

Expand All @@ -3893,45 +3884,38 @@ async fn do_get_table_history(
database_id: db_id,
table_name: "dummy".to_string(),
};

let dir_name = DirName::new(dbid_tbname_idlist);
let strm = kv_api.list_pb_keys(&dir_name).await?;
let table_id_list_keys = strm.try_collect::<Vec<_>>().await?;

let table_id_list_keys = list_keys(kv_api, &dir_name).await?;

let keys: Vec<(&TableInfoFilter, &Arc<DatabaseInfo>, TableIdHistoryIdent)> = table_id_list_keys
let keys = table_id_list_keys
.iter()
.map(|table_id_list_key| (&filter, &db_info, table_id_list_key.clone()))
.collect();
.collect::<Vec<_>>();

filter_db_info_with_table_id_key_list.extend(keys);

// step 2: list all table id of table by table name
let keys: Vec<String> = filter_db_info_with_table_id_key_list
let keys = filter_db_info_with_table_id_key_list
.iter()
.map(|(_, db_info, table_id_list_key)| {
TableIdHistoryIdent {
database_id: db_info.database_id.db_id,
table_name: table_id_list_key.table_name.clone(),
}
.to_string_key()
.map(|(_, db_info, table_id_list_key)| TableIdHistoryIdent {
database_id: db_info.database_id.db_id,
table_name: table_id_list_key.table_name.clone(),
})
.collect();
.collect::<Vec<_>>();

let mut filter_db_info_with_table_id_list: TableFilterInfoList<'_> = vec![];
let mut table_id_list_keys_iter = filter_db_info_with_table_id_key_list.into_iter();
for c in keys.chunks(DEFAULT_MGET_SIZE) {
let tb_id_list_seq_vec: Vec<(u64, Option<TableIdList>)> = mget_pb_values(kv_api, c).await?;
for (tb_id_list_seq, tb_id_list_opt) in tb_id_list_seq_vec {
let strm = kv_api.get_pb_values(c.to_vec()).await?;
let table_id_list_vec = strm
.try_filter_map(|x| async move { Ok(x) })
.try_collect::<Vec<_>>()
.await?;

for seq_table_id_list in table_id_list_vec {
let (filter, db_info, table_id_list_key) = table_id_list_keys_iter.next().unwrap();
let tb_id_list = if tb_id_list_seq == 0 {
continue;
} else {
match tb_id_list_opt {
Some(list) => list,
None => {
continue;
}
}
};
let tb_id_list = seq_table_id_list.data;

let id_list: Vec<(&TableInfoFilter, &Arc<DatabaseInfo>, u64, String)> = tb_id_list
.id_list
Expand Down Expand Up @@ -4357,7 +4341,7 @@ impl UndropTableStrategy for UndropTableByIdReq {
async fn handle_undrop_table(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
req: impl UndropTableStrategy + std::fmt::Debug,
) -> Result<UndropTableReply, KVAppError> {
) -> Result<(), KVAppError> {
let tenant_dbname_tbname = req.table_name_ident();

let mut trials = txn_backoff(None, func_name!());
Expand All @@ -4366,7 +4350,7 @@ async fn handle_undrop_table(

// Get db by name to ensure presence

let (db_id, db_meta) = req.refresh_target_db_meta(kv_api).await?;
let (db_id, seq_db_meta) = req.refresh_target_db_meta(kv_api).await?;

// Get table by tenant,db_id, table_name to assert presence.

Expand Down Expand Up @@ -4409,7 +4393,12 @@ async fn handle_undrop_table(

// get tb_meta of the last table id
let tbid = TableId { table_id };
let (tb_meta_seq, tb_meta): (_, Option<TableMeta>) = get_pb_value(kv_api, &tbid).await?;
let seq_table_meta = kv_api.get_pb(&tbid).await?;
let Some(mut seq_table_meta) = seq_table_meta else {
return Err(
AppError::from(UnknownTableId::new(tbid.table_id, "when undrop table")).into(),
);
};

debug!(
ident :% =(&tbid),
Expand All @@ -4419,37 +4408,29 @@ async fn handle_undrop_table(

{
// reset drop on time
let mut tb_meta = tb_meta.unwrap();
// undrop a table with no drop_on time
if tb_meta.drop_on.is_none() {
return Err(KVAppError::AppError(AppError::UndropTableWithNoDropTime(
UndropTableWithNoDropTime::new(&tenant_dbname_tbname.table_name),
)));
}
tb_meta.drop_on = None;
seq_table_meta.drop_on = None;

let txn_req = TxnRequest {
let txn = TxnRequest {
condition: vec![
// db has not to change, i.e., no new table is created.
// Renaming db is OK and does not affect the seq of db_meta.
txn_cond_seq(&DatabaseId { db_id }, Eq, db_meta.seq),
txn_cond_eq_seq(&DatabaseId { db_id }, seq_db_meta.seq),
// still this table id
txn_cond_seq(&dbid_tbname, Eq, dbid_tbname_seq),
txn_cond_eq_seq(&dbid_tbname, dbid_tbname_seq),
// table is not changed
txn_cond_seq(&tbid, Eq, tb_meta_seq),
txn_cond_eq_seq(&tbid, seq_table_meta.seq),
],
if_then: vec![
// Changing a table in a db has to update the seq of db_meta,
// to block the batch-delete-tables when deleting a db.
txn_op_put(&DatabaseId { db_id }, serialize_struct(&*db_meta)?), /* (db_id) -> db_meta */
txn_op_put_pb(&DatabaseId { db_id }, &seq_db_meta.data, None)?, /* (db_id) -> db_meta */
txn_op_put(&dbid_tbname, serialize_u64(table_id)?), /* (tenant, db_id, tb_name) -> tb_id */
// txn_op_put(&dbid_tbname_idlist, serialize_struct(&tb_id_list)?)?, // _fd_table_id_list/db_id/table_name -> tb_id_list
txn_op_put(&tbid, serialize_struct(&tb_meta)?), /* (tenant, db_id, tb_id) -> tb_meta */
txn_op_put_pb(&tbid, &seq_table_meta.data, None)?, /* (tenant, db_id, tb_id) -> tb_meta */
],
else_then: vec![],
};

let (succ, _responses) = send_txn(kv_api, txn_req).await?;
let (succ, _responses) = send_txn(kv_api, txn).await?;

debug!(
name :? =(tenant_dbname_tbname),
Expand All @@ -4459,7 +4440,7 @@ async fn handle_undrop_table(
);

if succ {
return Ok(UndropTableReply {});
return Ok(());
}
}
}
Expand Down
1 change: 0 additions & 1 deletion src/meta/app/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ pub use table::TableStatistics;
pub use table::TruncateTableReply;
pub use table::TruncateTableReq;
pub use table::UndropTableByIdReq;
pub use table::UndropTableReply;
pub use table::UndropTableReq;
pub use table::UpdateMultiTableMetaReq;
pub use table::UpdateMultiTableMetaResult;
Expand Down
3 changes: 0 additions & 3 deletions src/meta/app/src/schema/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -653,9 +653,6 @@ impl Display for UndropTableReq {
}
}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct UndropTableReply {}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct RenameTableReq {
pub if_exists: bool,
Expand Down
5 changes: 2 additions & 3 deletions src/query/catalog/src/catalog/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ use databend_common_meta_app::schema::TruncateTableReq;
use databend_common_meta_app::schema::UndropDatabaseReply;
use databend_common_meta_app::schema::UndropDatabaseReq;
use databend_common_meta_app::schema::UndropTableByIdReq;
use databend_common_meta_app::schema::UndropTableReply;
use databend_common_meta_app::schema::UndropTableReq;
use databend_common_meta_app::schema::UpdateDictionaryReply;
use databend_common_meta_app::schema::UpdateDictionaryReq;
Expand Down Expand Up @@ -289,9 +288,9 @@ pub trait Catalog: DynClone + Send + Sync + Debug {

async fn drop_table_by_id(&self, req: DropTableByIdReq) -> Result<DropTableReply>;

async fn undrop_table(&self, req: UndropTableReq) -> Result<UndropTableReply>;
async fn undrop_table(&self, req: UndropTableReq) -> Result<()>;

async fn undrop_table_by_id(&self, _req: UndropTableByIdReq) -> Result<UndropTableReply> {
async fn undrop_table_by_id(&self, _req: UndropTableByIdReq) -> Result<()> {
unimplemented!("TODO")
}

Expand Down
Loading

0 comments on commit 4ae07ca

Please sign in to comment.