diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 86139849d33b..7fd9e9c3ad82 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -642,144 +642,68 @@ impl + ?Sized> SchemaApi for KV { ) -> Result>, KVAppError> { debug!(req :? =(&req); "SchemaApi: {}", func_name!()); - // List tables by tenant, db_id, table_name. - let dbid_tbname_idlist = DatabaseIdHistoryIdent::new(&req.tenant, "dummy"); - let dir_name = DirName::new(dbid_tbname_idlist); - let db_id_list_keys = list_keys(self, &dir_name).await?; + let name_ident = DatabaseIdHistoryIdent::new(&req.tenant, "dummy"); + let dir_name = DirName::new(name_ident); - let mut db_info_list = vec![]; - let now = Utc::now(); - let keys: Vec = db_id_list_keys - .iter() - .map(|db_id_list_key| db_id_list_key.to_string_key()) - .collect(); - let mut db_id_list_keys_iter = db_id_list_keys.into_iter(); - let include_drop_db = matches!(&req.filter, Some(DatabaseInfoFilter::IncludeDropped)); - for c in keys.chunks(DEFAULT_MGET_SIZE) { - let db_id_list_seq_and_list: Vec<(u64, Option)> = - mget_pb_values(self, c).await?; + let name_idlists = self.list_pb_vec(&dir_name).await?; - for (db_id_list_seq, db_id_list_opt) in db_id_list_seq_and_list { - let db_id_list_key = db_id_list_keys_iter.next().unwrap(); - let db_id_list = if db_id_list_seq == 0 { - continue; - } else { - match db_id_list_opt { - Some(list) => list, - None => { - continue; - } - } - }; + let mut dbs = BTreeMap::new(); - let inner_keys: Vec = db_id_list - .id_list - .iter() - .map(|db_id| DatabaseId { db_id: *db_id }.to_string_key()) - .collect(); - let mut db_id_list_iter = db_id_list.id_list.into_iter(); - for c in inner_keys.chunks(DEFAULT_MGET_SIZE) { - let db_meta_seq_meta_vec: Vec<(u64, Option)> = - mget_pb_values(self, c).await?; - - for (db_meta_seq, db_meta) in db_meta_seq_meta_vec { - let db_id = db_id_list_iter.next().unwrap(); - if db_meta_seq == 0 || db_meta.is_none() { - error!("get_database_history cannot find {:?} db_meta", db_id); - continue; - } - let db_meta = db_meta.unwrap(); - // if include drop db, then no need to fill out of retention time db - if !include_drop_db - && is_drop_time_out_of_retention_time(&db_meta.drop_on, &now) - { - continue; - } + for (db_id_list_key, db_id_list) in name_idlists { + let ids = db_id_list + .id_list + .iter() + .map(|db_id| DatabaseId { db_id: *db_id }) + .collect::>(); - let db = DatabaseInfo { - database_id: DatabaseId::new(db_id), - name_ident: DatabaseNameIdent::new_from(db_id_list_key.clone()), - meta: SeqV::new(db_meta_seq, db_meta), - }; + for db_ids in ids.chunks(DEFAULT_MGET_SIZE) { + let id_metas = self.get_pb_vec(db_ids.iter().cloned()).await?; - db_info_list.push(Arc::new(db)); - } + for (db_id, db_meta) in id_metas { + let Some(db_meta) = db_meta else { + error!("get_database_history cannot find {:?} db_meta", db_id); + continue; + }; + + let db = DatabaseInfo { + database_id: db_id, + name_ident: DatabaseNameIdent::new_from(db_id_list_key.clone()), + meta: db_meta, + }; + + dbs.insert(db_id.db_id, Arc::new(db)); } } } - // `list_database` can list db which has no `DbIdListKey` - if include_drop_db { - // if `include_drop_db` is true, return all db info which not exist in db_info_list - let db_id_set: HashSet = db_info_list - .iter() - .map(|db_info| db_info.database_id.db_id) - .collect(); + // Find out dbs that are not included in any DbIdListKey. + // Because the DbIdListKey function is added after the first release of the system. + // There may be dbs do not have a corresponding DbIdListKey. - let all_dbs = self.list_databases(req).await?; - for db_info in all_dbs { - if !db_id_set.contains(&db_info.database_id.db_id) { - warn!( - "get db history db:{:?}, db_id:{:?} has no DbIdListKey", - db_info.name_ident, db_info.database_id.db_id - ); - db_info_list.push(db_info); - } - } - } else { - // if `include_drop_db` is false, filter out db which drop_on time out of retention time - let db_id_set: HashSet = db_info_list - .iter() - .map(|db_info| db_info.database_id.db_id) - .collect(); + let list_dbs = self.list_databases(req.clone()).await?; + for db_info in list_dbs { + dbs.entry(db_info.database_id.db_id).or_insert_with(|| { + warn!( + "get db history db:{:?}, db_id:{:?} has no DbIdListKey", + db_info.name_ident, db_info.database_id.db_id + ); - let all_dbs = self.list_databases(req).await?; - let mut add_dbinfo_map = HashMap::new(); - let mut db_id_list = Vec::new(); - for db_info in all_dbs { - if !db_id_set.contains(&db_info.database_id.db_id) { - warn!( - "get db history db:{:?}, db_id:{:?} has no DbIdListKey", - db_info.name_ident, db_info.database_id.db_id - ); - db_id_list.push(DatabaseId { - db_id: db_info.database_id.db_id, - }); - add_dbinfo_map.insert(db_info.database_id.db_id, db_info); - } - } - let inner_keys: Vec = db_id_list - .iter() - .map(|db_id| db_id.to_string_key()) - .collect(); - let mut db_id_list_iter = db_id_list.into_iter(); - for c in inner_keys.chunks(DEFAULT_MGET_SIZE) { - let db_meta_seq_meta_vec: Vec<(u64, Option)> = - mget_pb_values(self, c).await?; - - for (db_meta_seq, db_meta) in db_meta_seq_meta_vec { - let db_id = db_id_list_iter.next().unwrap().db_id; - if db_meta_seq == 0 || db_meta.is_none() { - error!("get_database_history cannot find {:?} db_meta", db_id); - continue; - } - let db_meta = db_meta.unwrap(); - // if include drop db, then no need to fill out of retention time db - if is_drop_time_out_of_retention_time(&db_meta.drop_on, &now) { - continue; - } - if let Some(db_info) = add_dbinfo_map.get(&db_id) { - warn!( - "get db history db:{:?}, db_id:{:?} has no DbIdListKey", - db_info.name_ident, db_info.database_id.db_id - ); - db_info_list.push(db_info.clone()); - } - } - } + db_info + }); } - return Ok(db_info_list); + let now = Utc::now(); + + // Returns not only retainable subjects, also non-retainable subjects. + let include_non_retainable = + matches!(&req.filter, Some(DatabaseInfoFilter::IncludeNonRetainable)); + + let dbs = dbs + .into_values() + .filter(|x| include_non_retainable || is_drop_time_retainable(x.meta.drop_on, now)) + .collect(); + + return Ok(dbs); } #[logcall::logcall] @@ -2781,7 +2705,7 @@ impl + ?Sized> SchemaApi for KV { .get_database_history(ListDatabaseReq { tenant: req.inner.tenant().clone(), // need to get all db(include drop db) - filter: Some(DatabaseInfoFilter::IncludeDropped), + filter: Some(DatabaseInfoFilter::IncludeNonRetainable), }) .await?; @@ -3255,7 +3179,7 @@ async fn get_table_meta_history( continue; }; - if is_drop_time_out_of_retention_time(&table_meta.drop_on, now) { + if !is_drop_time_retainable(table_meta.drop_on, *now) { continue; } tb_metas.push((k, table_meta)); @@ -3561,17 +3485,25 @@ async fn list_table_copied_files( Ok(copied_files) } -// Return true if drop time is out of `DATA_RETENTION_TIME_IN_DAYS option, -// use DEFAULT_DATA_RETENTION_SECONDS by default. -fn is_drop_time_out_of_retention_time( - drop_on: &Option>, - now: &DateTime, -) -> bool { - if let Some(drop_on) = drop_on { - return now.timestamp() - drop_on.timestamp() >= DEFAULT_DATA_RETENTION_SECONDS; - } +/// Get the retention boundary time before which the data can be permanently removed. +fn get_retention_boundary(now: DateTime) -> DateTime { + now - Duration::from_secs(DEFAULT_DATA_RETENTION_SECONDS as u64) +} - false +/// Determines if an item is within the retention period based on its drop time. +/// +/// # Arguments +/// * `drop_on` - The optional timestamp when the item was marked for deletion. +/// * `now` - The current timestamp used as a reference point. +/// +/// Items without a drop time (`None`) are always considered retainable. +/// The retention period is defined by `DATA_RETENTION_TIME_IN_DAYS`. +fn is_drop_time_retainable(drop_on: Option>, now: DateTime) -> bool { + let retention_boundary = get_retention_boundary(now); + + // If it is None, fill it with a very big time. + let drop_on = drop_on.unwrap_or(DateTime::::MAX_UTC); + drop_on > retention_boundary } /// Get db id and its seq by name, returns (db_id_seq, db_id) diff --git a/src/meta/app/src/schema/database.rs b/src/meta/app/src/schema/database.rs index e5997e5416ad..7c041b1e89ae 100644 --- a/src/meta/app/src/schema/database.rs +++ b/src/meta/app/src/schema/database.rs @@ -313,10 +313,10 @@ impl GetDatabaseReq { } } -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +#[derive(Clone, Debug, PartialEq, Eq)] pub enum DatabaseInfoFilter { - // include all dropped databases - IncludeDropped, + /// Include all databases, even those that are before retention boundary time. + IncludeNonRetainable, } #[derive(Clone, Debug, PartialEq, Eq)] diff --git a/src/meta/app/src/schema/database_id_history_ident.rs b/src/meta/app/src/schema/database_id_history_ident.rs index 96922e28bfaf..565d36531cd1 100644 --- a/src/meta/app/src/schema/database_id_history_ident.rs +++ b/src/meta/app/src/schema/database_id_history_ident.rs @@ -15,10 +15,10 @@ use crate::tenant_key::ident::TIdent; /// The key of the list of database ids that have been used in history by a database name. -pub type DatabaseIdHistoryIdent = TIdent; -pub type DatabaseIdHistoryIdentRaw = TIdentRaw; +pub type DatabaseIdHistoryIdent = TIdent; +pub type DatabaseIdHistoryIdentRaw = TIdentRaw; -pub use kvapi_impl::Resource; +pub use kvapi_impl::DatabaseIdHistoryRsc; use crate::tenant_key::raw::TIdentRaw; @@ -44,8 +44,8 @@ mod kvapi_impl { use crate::schema::DbIdList; use crate::tenant_key::resource::TenantResource; - pub struct Resource; - impl TenantResource for Resource { + pub struct DatabaseIdHistoryRsc; + impl TenantResource for DatabaseIdHistoryRsc { const PREFIX: &'static str = "__fd_db_id_list"; const TYPE: &'static str = "DatabaseIdHistoryIdent"; const HAS_TENANT: bool = true;