Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: auto split large mget_database_names_by_ids into chunks with KVPbApi::get_pb_values_vec() #17011

Merged
merged 2 commits into from
Dec 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 23 additions & 51 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1648,38 +1648,23 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
) -> Result<Vec<Option<String>>, KVAppError> {
debug!(req :? =(&table_ids); "SchemaApi: {}", func_name!());

let mut id_name_kv_keys = Vec::with_capacity(table_ids.len());
for id in table_ids {
let k = TableIdToName { table_id: *id }.to_string_key();
id_name_kv_keys.push(k);
}

// Batch get all table-name by id
let seq_names = self.mget_kv(&id_name_kv_keys).await?;
let mut table_names = Vec::with_capacity(table_ids.len());

for seq_name in seq_names {
if let Some(seq_name) = seq_name {
let name_ident: DBIdTableName = deserialize_struct(&seq_name.data)?;
table_names.push(Some(name_ident.table_name));
} else {
table_names.push(None);
}
}
let id_to_name_idents = table_ids.iter().map(|id| TableIdToName { table_id: *id });

let mut meta_kv_keys = Vec::with_capacity(table_ids.len());
for id in table_ids {
let k = TableId { table_id: *id }.to_string_key();
meta_kv_keys.push(k);
}
let seq_names = self.get_pb_values_vec(id_to_name_idents).await?;
let mut table_names = seq_names
.into_iter()
.map(|seq_name| seq_name.map(|s| s.data.table_name))
.collect::<Vec<_>>();

let seq_metas = self.mget_kv(&meta_kv_keys).await?;
let id_idents = table_ids.iter().map(|id| TableId { table_id: *id });
let seq_metas = self.get_pb_values_vec(id_idents).await?;
for (i, seq_meta_opt) in seq_metas.iter().enumerate() {
if let Some(seq_meta) = seq_meta_opt {
let table_meta: TableMeta = deserialize_struct(&seq_meta.data)?;
if table_meta.drop_on.is_some() {
if seq_meta.data.drop_on.is_some() {
table_names[i] = None;
}
} else {
table_names[i] = None;
}
}

Expand Down Expand Up @@ -1714,39 +1699,26 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
) -> Result<Vec<Option<String>>, KVAppError> {
debug!(req :? =(&db_ids); "SchemaApi: {}", func_name!());

let mut kv_keys = Vec::with_capacity(db_ids.len());
for id in db_ids {
let k = DatabaseIdToName { db_id: *id }.to_string_key();
kv_keys.push(k);
}
let id_to_name_keys = db_ids.iter().map(|id| DatabaseIdToName { db_id: *id });

// Batch get all table-name by id
let seq_names = self.mget_kv(&kv_keys).await?;
// If multi drop/create db the capacity may not same
let mut db_names = Vec::with_capacity(db_ids.len());
let seq_names = self.get_pb_values_vec(id_to_name_keys).await?;

for seq_name in seq_names {
if let Some(seq_name) = seq_name {
let name_ident: DatabaseNameIdentRaw = deserialize_struct(&seq_name.data)?;
db_names.push(Some(name_ident.database_name().to_string()));
} else {
db_names.push(None);
}
}
let mut db_names = seq_names
.into_iter()
.map(|seq_name| seq_name.map(|s| s.data.database_name().to_string()))
.collect::<Vec<_>>();

let mut meta_kv_keys = Vec::with_capacity(db_ids.len());
for id in db_ids {
let k = DatabaseId { db_id: *id }.to_string_key();
meta_kv_keys.push(k);
}
let id_keys = db_ids.iter().map(|id| DatabaseId { db_id: *id });

let seq_metas = self.get_pb_values_vec(id_keys).await?;

let seq_metas = self.mget_kv(&meta_kv_keys).await?;
for (i, seq_meta_opt) in seq_metas.iter().enumerate() {
if let Some(seq_meta) = seq_meta_opt {
let db_meta: DatabaseMeta = deserialize_struct(&seq_meta.data)?;
if db_meta.drop_on.is_some() {
if seq_meta.data.drop_on.is_some() {
db_names[i] = None;
}
} else {
db_names[i] = None;
}
}
Ok(db_names)
Expand Down
Loading