Skip to content

Commit

Permalink
refactor: get_tableinfos_by_ids()
Browse files Browse the repository at this point in the history
  • Loading branch information
drmingdrmer committed Sep 24, 2024
1 parent 8e79625 commit 351de96
Showing 1 changed file with 17 additions and 77 deletions.
94 changes: 17 additions & 77 deletions src/meta/api/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ use databend_common_meta_app::schema::database_name_ident::DatabaseNameIdent;
use databend_common_meta_app::schema::DBIdTableName;
use databend_common_meta_app::schema::DatabaseType;
use databend_common_meta_app::schema::TableId;
use databend_common_meta_app::schema::TableIdToName;
use databend_common_meta_app::schema::TableIdent;
use databend_common_meta_app::schema::TableInfo;
use databend_common_meta_app::schema::TableMeta;
use databend_common_meta_app::schema::TableNameIdent;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_kvapi::kvapi::UpsertKVReq;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::seq_value::SeqValue;
Expand Down Expand Up @@ -472,98 +470,41 @@ pub async fn get_table_by_id_or_err(
Ok((seq, table_meta))
}

pub async fn get_table_names_by_ids(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
ids: &[u64],
) -> Result<Vec<String>, KVAppError> {
let mut table_names = vec![];

let keys: Vec<String> = ids
.iter()
.map(|id| TableIdToName { table_id: *id }.to_string_key())
.collect();

let mut id_iter = ids.iter();

for c in keys.chunks(DEFAULT_MGET_SIZE) {
let table_seq_name: Vec<(u64, Option<DBIdTableName>)> = mget_pb_values(kv_api, c).await?;
for (_seq, table_name_opt) in table_seq_name {
let id = id_iter.next().unwrap();
match table_name_opt {
Some(table_name) => table_names.push(table_name.table_name),
None => {
return Err(KVAppError::AppError(AppError::UnknownTableId(
UnknownTableId::new(*id, "get_table_names_by_ids"),
)));
}
}
}
}

Ok(table_names)
}

pub async fn get_tableinfos_by_ids(
kv_api: &(impl kvapi::KVApi<Error = MetaError> + ?Sized),
ids: &[u64],
name_ids: Vec<(DBIdTableName, u64)>,
tenant_dbname: &DatabaseNameIdent,
dbid_tbl_names: Option<Vec<DBIdTableName>>,
db_type: DatabaseType,
) -> Result<Vec<Arc<TableInfo>>, KVAppError> {
let mut tb_meta_keys = Vec::with_capacity(ids.len());
for id in ids.iter() {
let table_id = TableId { table_id: *id };

tb_meta_keys.push(table_id.to_string_key());
}

// mget() corresponding table_metas

let mut seq_tbl_metas = Vec::with_capacity(ids.len());
let mut res = Vec::with_capacity(name_ids.len());
let chunk_size = DEFAULT_MGET_SIZE;

for table_ids in tb_meta_keys.chunks(chunk_size) {
let got = kv_api.mget_kv(table_ids).await?;
seq_tbl_metas.extend(got);
}

let mut tbl_infos = Vec::with_capacity(ids.len());

let tbl_names = match dbid_tbl_names {
Some(dbid_tbnames) => Vec::<String>::from_iter(
dbid_tbnames
.into_iter()
.map(|dbid_tbname| dbid_tbname.table_name),
),
for chunk in name_ids.chunks(chunk_size) {
let id_idents = chunk.iter().map(|(_, id)| TableId { table_id: *id });
let seq_metas = kv_api.get_pb_values_vec(id_idents).await?;

None => get_table_names_by_ids(kv_api, ids).await?,
};

for (i, seq_meta_opt) in seq_tbl_metas.iter().enumerate() {
if let Some(seq_meta) = seq_meta_opt {
let tbl_meta: TableMeta = deserialize_struct(&seq_meta.data)?;
for ((name_ident, id), seq_meta) in chunk.iter().zip(seq_metas) {
let table_name = &name_ident.table_name;
let Some(seq_meta) = seq_meta else {
continue;
};

let tb_info = TableInfo {
ident: TableIdent {
table_id: ids[i],
table_id: *id,
seq: seq_meta.seq,
},
desc: format!("'{}'.'{}'", tenant_dbname.database_name(), tbl_names[i]),
meta: tbl_meta,
name: tbl_names[i].clone(),
desc: format!("'{}'.'{}'", tenant_dbname.database_name(), table_name),
meta: seq_meta.data,
name: table_name.clone(),
db_type: db_type.clone(),
catalog_info: Default::default(),
};
tbl_infos.push(Arc::new(tb_info));
} else {
debug!(
k = &tb_meta_keys[i];
"db_meta not found, maybe just deleted after listing names and before listing meta"
);
res.push(Arc::new(tb_info));
}
}

Ok(tbl_infos)
Ok(res)
}

pub async fn list_tables_from_unshare_db(
Expand All @@ -583,9 +524,8 @@ pub async fn list_tables_from_unshare_db(

get_tableinfos_by_ids(
kv_api,
&ids,
dbid_tbnames.into_iter().zip(ids).collect(),
tenant_dbname,
Some(dbid_tbnames),
DatabaseType::NormalDB,
)
.await
Expand Down

0 comments on commit 351de96

Please sign in to comment.