Skip to content

Commit

Permalink
refactor: auto split large mget_database_names_by_ids into chunks wit…
Browse files Browse the repository at this point in the history
…h `KVPbApi::get_pb_values_vec()` (#17011)

* refactor: auto split large mget_database_names_by_ids into chunks with `KVPbApi::get_pb_values_vec()`

And fix the issue that when db-meta is not found, the returned db-name
should be set to None too.
  • Loading branch information
drmingdrmer authored Dec 7, 2024
1 parent 0b3a480 commit 70c7768
Showing 1 changed file with 23 additions and 51 deletions.
74 changes: 23 additions & 51 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1648,38 +1648,23 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
) -> Result<Vec<Option<String>>, KVAppError> {
debug!(req :? =(&table_ids); "SchemaApi: {}", func_name!());

let mut id_name_kv_keys = Vec::with_capacity(table_ids.len());
for id in table_ids {
let k = TableIdToName { table_id: *id }.to_string_key();
id_name_kv_keys.push(k);
}

// Batch get all table-name by id
let seq_names = self.mget_kv(&id_name_kv_keys).await?;
let mut table_names = Vec::with_capacity(table_ids.len());

for seq_name in seq_names {
if let Some(seq_name) = seq_name {
let name_ident: DBIdTableName = deserialize_struct(&seq_name.data)?;
table_names.push(Some(name_ident.table_name));
} else {
table_names.push(None);
}
}
let id_to_name_idents = table_ids.iter().map(|id| TableIdToName { table_id: *id });

let mut meta_kv_keys = Vec::with_capacity(table_ids.len());
for id in table_ids {
let k = TableId { table_id: *id }.to_string_key();
meta_kv_keys.push(k);
}
let seq_names = self.get_pb_values_vec(id_to_name_idents).await?;
let mut table_names = seq_names
.into_iter()
.map(|seq_name| seq_name.map(|s| s.data.table_name))
.collect::<Vec<_>>();

let seq_metas = self.mget_kv(&meta_kv_keys).await?;
let id_idents = table_ids.iter().map(|id| TableId { table_id: *id });
let seq_metas = self.get_pb_values_vec(id_idents).await?;
for (i, seq_meta_opt) in seq_metas.iter().enumerate() {
if let Some(seq_meta) = seq_meta_opt {
let table_meta: TableMeta = deserialize_struct(&seq_meta.data)?;
if table_meta.drop_on.is_some() {
if seq_meta.data.drop_on.is_some() {
table_names[i] = None;
}
} else {
table_names[i] = None;
}
}

Expand Down Expand Up @@ -1714,39 +1699,26 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
) -> Result<Vec<Option<String>>, KVAppError> {
debug!(req :? =(&db_ids); "SchemaApi: {}", func_name!());

let mut kv_keys = Vec::with_capacity(db_ids.len());
for id in db_ids {
let k = DatabaseIdToName { db_id: *id }.to_string_key();
kv_keys.push(k);
}
let id_to_name_keys = db_ids.iter().map(|id| DatabaseIdToName { db_id: *id });

// Batch get all table-name by id
let seq_names = self.mget_kv(&kv_keys).await?;
// If multi drop/create db the capacity may not same
let mut db_names = Vec::with_capacity(db_ids.len());
let seq_names = self.get_pb_values_vec(id_to_name_keys).await?;

for seq_name in seq_names {
if let Some(seq_name) = seq_name {
let name_ident: DatabaseNameIdentRaw = deserialize_struct(&seq_name.data)?;
db_names.push(Some(name_ident.database_name().to_string()));
} else {
db_names.push(None);
}
}
let mut db_names = seq_names
.into_iter()
.map(|seq_name| seq_name.map(|s| s.data.database_name().to_string()))
.collect::<Vec<_>>();

let mut meta_kv_keys = Vec::with_capacity(db_ids.len());
for id in db_ids {
let k = DatabaseId { db_id: *id }.to_string_key();
meta_kv_keys.push(k);
}
let id_keys = db_ids.iter().map(|id| DatabaseId { db_id: *id });

let seq_metas = self.get_pb_values_vec(id_keys).await?;

let seq_metas = self.mget_kv(&meta_kv_keys).await?;
for (i, seq_meta_opt) in seq_metas.iter().enumerate() {
if let Some(seq_meta) = seq_meta_opt {
let db_meta: DatabaseMeta = deserialize_struct(&seq_meta.data)?;
if db_meta.drop_on.is_some() {
if seq_meta.data.drop_on.is_some() {
db_names[i] = None;
}
} else {
db_names[i] = None;
}
}
Ok(db_names)
Expand Down

0 comments on commit 70c7768

Please sign in to comment.