Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Only when all tables are included, the db can be dropped #16539

Merged
merged 1 commit into from
Sep 28, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 18 additions & 95 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2684,7 +2684,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let table_infos = do_get_table_history(self, db_filter, capacity).await?;

// A DB can be removed only when all its tables are removed.
if vacuum_db && capacity >= table_infos.len() {
if vacuum_db && capacity > table_infos.len() {
vacuum_ids.push(DroppedId::Db {
db_id: db_info.database_id.db_id,
db_name: db_info.name_ident.database_name().to_string(),
Expand Down Expand Up @@ -3578,120 +3578,42 @@ async fn batch_filter_table_info(
Ok(())
}

type TableFilterInfoList<'a> = Vec<(
Range<Option<DateTime<Utc>>>,
&'a Arc<DatabaseInfo>,
u64,
String,
)>;

#[logcall::logcall(input = "")]
#[fastrace::trace]
async fn get_gc_table_info(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
limit: usize,
table_id_list: &TableFilterInfoList<'_>,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let table_id_list = &table_id_list[..std::cmp::min(limit, table_id_list.len())];

let mut filter_tb_infos = vec![];

for chunk in table_id_list.chunks(DEFAULT_MGET_SIZE) {
batch_filter_table_info(kv_api, chunk, &mut filter_tb_infos).await?;
}

Ok(filter_tb_infos)
}

#[logcall::logcall(input = "")]
#[fastrace::trace]
async fn do_get_table_history(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
db_filter: (Range<Option<DateTime<Utc>>>, Arc<DatabaseInfo>),
limit: usize,
) -> Result<Vec<(Arc<TableInfo>, u64)>, KVAppError> {
let mut filter_tb_infos = vec![];

// step 1: list db table name with db id
let mut filter_db_info_with_table_id_key_list: Vec<_> = vec![];

let (drop_time_range, db_info) = db_filter;
let db_id = db_info.database_id.db_id;

// List tables by tenant, db_id, table_name.
let dbid_tbname_idlist = TableIdHistoryIdent {
database_id: db_id,
table_name: "dummy".to_string(),
};
let dir_name = DirName::new(dbid_tbname_idlist);
let strm = kv_api.list_pb_keys(&dir_name).await?;
let table_id_list_keys = strm.try_collect::<Vec<_>>().await?;

let keys = table_id_list_keys
.iter()
.map(|table_id_list_key| (drop_time_range.clone(), &db_info, table_id_list_key.clone()))
.collect::<Vec<_>>();

filter_db_info_with_table_id_key_list.extend(keys);

// step 2: list all table id of table by table name
let keys = filter_db_info_with_table_id_key_list
.iter()
.map(|(_, db_info, table_id_list_key)| TableIdHistoryIdent {
database_id: db_info.database_id.db_id,
table_name: table_id_list_key.table_name.clone(),
})
.collect::<Vec<_>>();

let mut filter_db_info_with_table_id_list: TableFilterInfoList<'_> = vec![];
let mut table_id_list_keys_iter = filter_db_info_with_table_id_key_list.into_iter();
for c in keys.chunks(DEFAULT_MGET_SIZE) {
let strm = kv_api.get_pb_values(c.to_vec()).await?;
let table_id_list_vec = strm
.try_filter_map(|x| async move { Ok(x) })
.try_collect::<Vec<_>>()
.await?;
let table_history_kvs = kv_api.list_pb_vec(&dir_name).await?;

for seq_table_id_list in table_id_list_vec {
let (filter, db_info, table_id_list_key) = table_id_list_keys_iter.next().unwrap();
let tb_id_list = seq_table_id_list.data;

let id_list: Vec<_> = tb_id_list
.id_list
.iter()
.map(|id| {
(
filter.clone(),
db_info,
*id,
table_id_list_key.table_name.clone(),
)
})
.collect();
let mut the_list = vec![];

filter_db_info_with_table_id_list.extend(id_list);
if filter_db_info_with_table_id_list.len() < DEFAULT_MGET_SIZE {
continue;
}

let ret = get_gc_table_info(kv_api, limit, &filter_db_info_with_table_id_list).await?;
filter_tb_infos.extend(ret);
filter_db_info_with_table_id_list.clear();

if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
for (ident, table_history) in table_history_kvs {
for table_id in table_history.id_list.iter() {
the_list.push((
drop_time_range.clone(),
&db_info,
*table_id,
ident.table_name.clone(),
));
}
}

if !filter_db_info_with_table_id_list.is_empty() {
let ret = get_gc_table_info(kv_api, limit, &filter_db_info_with_table_id_list).await?;
filter_tb_infos.extend(ret);
filter_db_info_with_table_id_list.clear();
let mut filter_tb_infos = vec![];

if filter_tb_infos.len() >= limit {
return Ok(filter_tb_infos);
}
}
for c in the_list[..std::cmp::min(limit, the_list.len())].chunks(DEFAULT_MGET_SIZE) {
let mut infos = vec![];
batch_filter_table_info(kv_api, c, &mut infos).await?;
filter_tb_infos.extend(infos);
}

Ok(filter_tb_infos)
Expand Down Expand Up @@ -3880,6 +3802,7 @@ async fn update_txn_to_remove_table_history(
txn.if_then
.push(txn_op_put_pb(table_id_history_ident, &history, None)?);
}

Ok(())
}

Expand Down
Loading