Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: SchemaApi::get_table_meta_history() #16586

Merged
merged 1 commit into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,16 @@ pub trait SchemaApi: Send + Sync {
/// Get a [`TableNIV`] by `database_id, table_name`.
async fn get_table_in_db(&self, req: &DBIdTableName) -> Result<Option<TableNIV>, MetaError>;

async fn get_table_meta_history(
/// Retrieves the tables under the given `database-id, table-name`
/// that is dropped after retention boundary time, i.e., the table that can be undropped.
async fn get_retainable_tables(
&self,
database_name: &str,
table_id_history: &TableIdHistoryIdent,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, KVAppError>;
history_ident: &TableIdHistoryIdent,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, MetaError>;

async fn get_tables_history(&self, req: ListTableReq) -> Result<Vec<TableNIV>, KVAppError>;
/// Get history of all tables in the specified database,
/// that are dropped after retention boundary time, i.e., the tables that can be undropped.
async fn list_retainable_tables(&self, req: ListTableReq) -> Result<Vec<TableNIV>, KVAppError>;

/// List all tables in the database.
///
Expand Down
47 changes: 15 additions & 32 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1547,36 +1547,20 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

#[logcall::logcall]
#[fastrace::trace]
async fn get_table_meta_history(
async fn get_retainable_tables(
&self,
database_name: &str,
table_id_history: &TableIdHistoryIdent,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, KVAppError> {
let table_name = &table_id_history.table_name;

let (_seq, table_id_list) = self.get_pb_seq_and_value(table_id_history).await?;

let table_id_list = table_id_list.ok_or_else(|| {
AppError::from(UnknownTable::new(
table_name,
format!("get_table_history: {}.{}", database_name, table_name),
))
})?;

let now = Utc::now();

let metas = get_table_meta_history(self, &now, table_id_list).await?;
history_ident: &TableIdHistoryIdent,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, MetaError> {
let Some(seq_table_id_list) = self.get_pb(history_ident).await? else {
Copy link
Collaborator

@TCeason TCeason Oct 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this means : if a table under a db which contains 100 tables is invalid, list table from this db will return empty result?

cc @flaneur2020

I think it's better to print a warn or error log to point the err.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think so. 🤔

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my mistake。the method name should be get-retainable-x x x

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed. Please review it again

return Ok(vec![]);
};

debug!(
name :% =(&table_id_history);
"get_table_meta_history"
);
return Ok(metas);
get_retainable_table_metas(self, &Utc::now(), seq_table_id_list.data).await
}

#[logcall::logcall]
#[fastrace::trace]
async fn get_tables_history(&self, req: ListTableReq) -> Result<Vec<TableNIV>, KVAppError> {
async fn list_retainable_tables(&self, req: ListTableReq) -> Result<Vec<TableNIV>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

// List tables by tenant, db_id.
Expand All @@ -1595,7 +1579,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
for (ident, history) in ident_histories {
debug!(name :% =(&ident); "get_tables_history");

let id_metas = get_table_meta_history(self, &now, history.data).await?;
let id_metas = get_retainable_table_metas(self, &now, history.data).await?;

let table_nivs = id_metas.into_iter().map(|(table_id, seq_meta)| {
let name = DBIdTableName::new(ident.database_id, ident.table_name.clone());
Expand Down Expand Up @@ -3050,27 +3034,26 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
}
}

async fn get_table_meta_history(
async fn get_retainable_table_metas(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
now: &DateTime<Utc>,
tb_id_list: TableIdList,
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, KVAppError> {
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, MetaError> {
let mut tb_metas = vec![];

let inner_keys = tb_id_list.id_list.into_iter().map(TableId::new);
let table_ids = tb_id_list.id_list.into_iter().map(TableId::new);

let kvs = kv_api.get_pb_vec(inner_keys).await?;
let kvs = kv_api.get_pb_vec(table_ids).await?;

for (k, table_meta) in kvs {
let Some(table_meta) = table_meta else {
error!("get_table_history cannot find {:?} table_meta", k);
continue;
};

if !is_drop_time_retainable(table_meta.drop_on, *now) {
continue;
if is_drop_time_retainable(table_meta.drop_on, *now) {
tb_metas.push((k, table_meta));
}
tb_metas.push((k, table_meta));
}

Ok(tb_metas)
Expand Down
26 changes: 13 additions & 13 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1835,7 +1835,7 @@ impl SchemaApiTestSuite {
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;

let got = mt
.get_table_meta_history(db_name, &TableIdHistoryIdent {
.get_retainable_tables(&TableIdHistoryIdent {
database_id: cur_db.database_id.db_id,
table_name: tbl_name.to_string(),
})
Expand Down Expand Up @@ -4003,7 +4003,7 @@ impl SchemaApiTestSuite {
assert!(table_id >= 1, "table id >= 1");

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;

assert_eq!(res.len(), 1);
Expand All @@ -4021,7 +4021,7 @@ impl SchemaApiTestSuite {
upsert_test_data(mt.as_kv_api(), &tbid, data).await?;
// assert not return out of retention time data
let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;

assert_eq!(res.len(), 0);
Expand Down Expand Up @@ -4659,7 +4659,7 @@ impl SchemaApiTestSuite {
assert!(res.table_id >= 1, "table id >= 1");

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;

calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
Expand Down Expand Up @@ -4691,7 +4691,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: DBIdTableName::new(*db_id, tbl_name).to_string_key(),
Expand All @@ -4709,7 +4709,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: DBIdTableName::new(*db_id, tbl_name).to_string_key(),
Expand All @@ -4736,7 +4736,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: DBIdTableName::new(*db_id, tbl_name).to_string_key(),
Expand All @@ -4759,7 +4759,7 @@ impl SchemaApiTestSuite {
assert!(res.table_id >= 1, "table id >= 1");

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;

calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
Expand Down Expand Up @@ -4787,7 +4787,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
name: DBIdTableName::new(*db_id, tbl_name).to_string_key(),
Expand All @@ -4804,7 +4804,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;

calc_and_compare_drop_on_table_result(res, vec![DroponInfo {
Expand Down Expand Up @@ -4837,7 +4837,7 @@ impl SchemaApiTestSuite {
let old_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
let _res = mt.create_table(req.clone()).await?;
let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
let cur_db = mt.get_database(Self::req_get_db(&tenant, db_name)).await?;
assert!(old_db.meta.seq < cur_db.meta.seq);
Expand Down Expand Up @@ -4876,7 +4876,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![
DroponInfo {
Expand Down Expand Up @@ -4905,7 +4905,7 @@ impl SchemaApiTestSuite {
assert!(old_db.meta.seq < cur_db.meta.seq);

let res = mt
.get_tables_history(ListTableReq::new(&tenant, db_id))
.list_retainable_tables(ListTableReq::new(&tenant, db_id))
.await?;
calc_and_compare_drop_on_table_result(res, vec![
DroponInfo {
Expand Down
13 changes: 5 additions & 8 deletions src/query/service/src/databases/default/default_database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,13 +183,10 @@ impl Database for DefaultDatabase {
let metas = self
.ctx
.meta
.get_table_meta_history(
self.db_info.name_ident.database_name(),
&TableIdHistoryIdent {
database_id: self.db_info.database_id.db_id,
table_name: table_name.to_string(),
},
)
.get_retainable_tables(&TableIdHistoryIdent {
database_id: self.db_info.database_id.db_id,
table_name: table_name.to_string(),
})
.await?;

let table_infos: Vec<Arc<TableInfo>> = metas
Expand Down Expand Up @@ -233,7 +230,7 @@ impl Database for DefaultDatabase {
let mut dropped = self
.ctx
.meta
.get_tables_history(ListTableReq::new(
.list_retainable_tables(ListTableReq::new(
self.get_tenant(),
self.db_info.database_id,
))
Expand Down
Loading