diff --git a/Cargo.lock b/Cargo.lock index 496eb42ab184..dfd8c5cd904f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3718,6 +3718,7 @@ dependencies = [ "databend-common-proto-conv", "fastrace 0.7.2", "futures", + "itertools 0.10.5", "log", "logcall", "maplit", diff --git a/src/meta/api/Cargo.toml b/src/meta/api/Cargo.toml index ebaa2b0a68e8..b3e1231e4104 100644 --- a/src/meta/api/Cargo.toml +++ b/src/meta/api/Cargo.toml @@ -27,6 +27,7 @@ databend-common-meta-types = { workspace = true } databend-common-proto-conv = { workspace = true } fastrace = { workspace = true } futures = { workspace = true } +itertools = { workspace = true } log = { workspace = true } logcall = { workspace = true } maplit = "1.0.2" diff --git a/src/meta/api/src/kv_pb_api/mod.rs b/src/meta/api/src/kv_pb_api/mod.rs index fc1caae81c9d..67b01bea97fb 100644 --- a/src/meta/api/src/kv_pb_api/mod.rs +++ b/src/meta/api/src/kv_pb_api/mod.rs @@ -40,6 +40,7 @@ use futures::stream; use futures::stream::BoxStream; use futures::stream::StreamExt; use futures::TryStreamExt; +use itertools::Itertools; pub(crate) use self::codec::decode_non_empty_item; pub(crate) use self::codec::decode_seqv; @@ -52,6 +53,9 @@ use crate::kv_pb_api::errors::StreamReadEof; /// This trait provides a way to access a kv store with `kvapi::Key` type key and protobuf encoded value. pub trait KVPbApi: KVApi { + /// The number of keys in one batch get. + const CHUNK_SIZE: usize = 256; + /// Update or insert a protobuf encoded value by kvapi::Key. /// /// The key will be converted to string and the value is encoded by `FromToProto`. @@ -189,21 +193,35 @@ pub trait KVPbApi: KVApi { } /// Same as [`get_pb_values`](Self::get_pb_values) but collect the result in a `Vec` instead of a stream. + /// + /// If the number of keys is larger than [`Self::CHUNK_SIZE`], it will be split into multiple requests. fn get_pb_values_vec( &self, keys: I, ) -> impl Future>>, Self::Error>> + Send where - K: kvapi::Key + 'static, + K: kvapi::Key + Send + 'static, K::ValueType: FromToProto + Send + 'static, I: IntoIterator + Send, + I::IntoIter: Send, Self::Error: From>, { + let it = keys.into_iter(); + let key_chunks = it + .chunks(Self::CHUNK_SIZE) + .into_iter() + .map(|x| x.collect::>()) + .collect::>(); + async move { - self.get_pb_values(keys) - .await? - .try_collect::>() - .await + let mut res = vec![]; + for chunk in key_chunks { + let strm = self.get_pb_values(chunk).await?; + + let vec = strm.try_collect::>().await?; + res.extend(vec); + } + Ok(res) } } @@ -241,6 +259,8 @@ pub trait KVPbApi: KVApi { } /// Same as [`get_pb_stream`](Self::get_pb_stream) but collect the result in a `Vec` instead of a stream. + /// + /// If the number of keys is larger than [`Self::CHUNK_SIZE`], it will be split into multiple requests. fn get_pb_vec( &self, keys: I, @@ -249,11 +269,25 @@ pub trait KVPbApi: KVApi { K: kvapi::Key + Send + 'static, K::ValueType: FromToProto + Send + 'static, I: IntoIterator + Send, + I::IntoIter: Send, Self::Error: From>, { + let it = keys.into_iter(); + let key_chunks = it + .chunks(Self::CHUNK_SIZE) + .into_iter() + .map(|x| x.collect::>()) + .collect::>(); + async move { - let kvs = self.get_pb_stream(keys).await?.try_collect().await?; - Ok(kvs) + let mut res = vec![]; + for chunk in key_chunks { + let strm = self.get_pb_stream(chunk).await?; + + let vec = strm.try_collect::>().await?; + res.extend(vec); + } + Ok(res) } } @@ -497,14 +531,14 @@ mod tests { use crate::kv_pb_api::KVPbApi; // - struct Foo { + struct FooKV { /// Whether to return without exhausting the input for `get_kv_stream`. early_return: Option, kvs: BTreeMap, } #[async_trait] - impl KVApi for Foo { + impl KVApi for FooKV { type Error = MetaError; async fn upsert_kv(&self, _req: UpsertKVReq) -> Result { @@ -560,7 +594,7 @@ mod tests { }; let v = catalog_meta.to_pb()?.encode_to_vec(); - let foo = Foo { + let foo = FooKV { early_return: Some(2), kvs: vec![ (s("__fd_catalog_by_id/1"), SeqV::new(1, v.clone())), @@ -613,7 +647,7 @@ mod tests { }; let v = catalog_meta.to_pb()?.encode_to_vec(); - let foo = Foo { + let foo = FooKV { early_return: None, kvs: vec![ (s("__fd_catalog_by_id/1"), SeqV::new(1, v.clone())), @@ -675,6 +709,61 @@ mod tests { Ok(()) } + #[tokio::test] + async fn test_get_pb_vec_span_chunk() -> anyhow::Result<()> { + let catalog_meta = CatalogMeta { + catalog_option: CatalogOption::Hive(HiveCatalogOption { + address: "127.0.0.1:10000".to_string(), + storage_params: None, + }), + created_on: DateTime::::MIN_UTC, + }; + let catalog_bytes = catalog_meta.to_pb()?.encode_to_vec(); + + let n = 1024; + let mut kvs = vec![]; + for i in 1..=n { + let key = s(format!("__fd_catalog_by_id/{}", i)); + let value = SeqV::new(i, catalog_bytes.clone()); + kvs.push((key, value)); + } + + let foo = FooKV { + early_return: None, + kvs: kvs.into_iter().collect(), + }; + + assert!(FooKV::CHUNK_SIZE < n as usize); + + let tenant = Tenant::new_literal("dummy"); + + { + let got = foo + .get_pb_vec((1..=n).map(|i| CatalogIdIdent::new(&tenant, i))) + .await?; + + for i in 1..=n { + let key = CatalogIdIdent::new(&tenant, i); + assert_eq!(key, got[i as usize - 1].0.clone()); + let value = got[i as usize - 1].1.clone().unwrap(); + assert_eq!(i, value.seq()); + } + } + + { + let got = foo + .get_pb_values_vec((1..=n).map(|i| CatalogIdIdent::new(&tenant, i))) + .await?; + + for i in 1..=n { + let value = got[i as usize - 1].clone().unwrap(); + assert_eq!(i, value.seq()); + } + } + + Ok(()) + } + fn s(x: impl ToString) -> String { x.to_string() } diff --git a/src/meta/api/src/schema_api.rs b/src/meta/api/src/schema_api.rs index 679f2aedd7e8..40dc98419346 100644 --- a/src/meta/api/src/schema_api.rs +++ b/src/meta/api/src/schema_api.rs @@ -124,14 +124,15 @@ pub trait SchemaApi: Send + Sync { async fn list_databases( &self, req: ListDatabaseReq, - ) -> Result>, KVAppError>; + ) -> Result>, MetaError>; async fn rename_database( &self, req: RenameDatabaseReq, ) -> Result; - /// Retrieves all databases for a specific tenant, including those marked as dropped. + /// Retrieves all databases for a specific tenant, + /// optionally including those marked as dropped. /// /// * `include_non_retainable` - /// If true, includes databases that are beyond the retention period. @@ -140,7 +141,7 @@ pub trait SchemaApi: Send + Sync { &self, req: ListDatabaseReq, include_non_retainable: bool, - ) -> Result>, KVAppError>; + ) -> Result>, MetaError>; // index diff --git a/src/meta/api/src/schema_api_impl.rs b/src/meta/api/src/schema_api_impl.rs index 7ec7a8ade59a..653ba10c9135 100644 --- a/src/meta/api/src/schema_api_impl.rs +++ b/src/meta/api/src/schema_api_impl.rs @@ -638,7 +638,7 @@ impl + ?Sized> SchemaApi for KV { &self, req: ListDatabaseReq, include_non_retainable: bool, - ) -> Result>, KVAppError> { + ) -> Result>, MetaError> { debug!(req :? =(&req); "SchemaApi: {}", func_name!()); let name_ident = DatabaseIdHistoryIdent::new(&req.tenant, "dummy"); @@ -652,26 +652,23 @@ impl + ?Sized> SchemaApi for KV { let ids = db_id_list .id_list .iter() - .map(|db_id| DatabaseId { db_id: *db_id }) - .collect::>(); + .map(|db_id| DatabaseId { db_id: *db_id }); - for db_ids in ids.chunks(DEFAULT_MGET_SIZE) { - let id_metas = self.get_pb_vec(db_ids.iter().cloned()).await?; + let id_metas = self.get_pb_vec(ids).await?; - for (db_id, db_meta) in id_metas { - let Some(db_meta) = db_meta else { - error!("get_database_history cannot find {:?} db_meta", db_id); - continue; - }; + for (db_id, db_meta) in id_metas { + let Some(db_meta) = db_meta else { + error!("get_database_history cannot find {:?} db_meta", db_id); + continue; + }; - let db = DatabaseInfo { - database_id: db_id, - name_ident: DatabaseNameIdent::new_from(db_id_list_key.clone()), - meta: db_meta, - }; + let db = DatabaseInfo { + database_id: db_id, + name_ident: DatabaseNameIdent::new_from(db_id_list_key.clone()), + meta: db_meta, + }; - dbs.insert(db_id.db_id, Arc::new(db)); - } + dbs.insert(db_id.db_id, Arc::new(db)); } } @@ -706,7 +703,7 @@ impl + ?Sized> SchemaApi for KV { async fn list_databases( &self, req: ListDatabaseReq, - ) -> Result>, KVAppError> { + ) -> Result>, MetaError> { debug!(req :? =(&req); "SchemaApi: {}", func_name!()); let name_key = DatabaseNameIdent::new(req.tenant(), "dummy"); @@ -1659,11 +1656,7 @@ impl + ?Sized> SchemaApi for KV { .map(|id| TableId { table_id: id }) .collect::>(); - let mut seq_metas = vec![]; - for chunk in ids.chunks(DEFAULT_MGET_SIZE) { - let got = self.get_pb_values_vec(chunk.to_vec()).await?; - seq_metas.extend(got); - } + let seq_metas = self.get_pb_values_vec(ids.clone()).await?; let res = names .into_iter() @@ -3091,27 +3084,22 @@ async fn get_table_meta_history( ) -> Result)>, KVAppError> { let mut tb_metas = vec![]; - let inner_keys = tb_id_list - .id_list - .into_iter() - .map(TableId::new) - .collect::>(); + let inner_keys = tb_id_list.id_list.into_iter().map(TableId::new); - for c in inner_keys.chunks(DEFAULT_MGET_SIZE) { - let kvs = kv_api.get_pb_vec(c.iter().cloned()).await?; + let kvs = kv_api.get_pb_vec(inner_keys).await?; - for (k, table_meta) in kvs { - let Some(table_meta) = table_meta else { - error!("get_table_history cannot find {:?} table_meta", k); - continue; - }; + for (k, table_meta) in kvs { + let Some(table_meta) = table_meta else { + error!("get_table_history cannot find {:?} table_meta", k); + continue; + }; - if !is_drop_time_retainable(table_meta.drop_on, *now) { - continue; - } - tb_metas.push((k, table_meta)); + if !is_drop_time_retainable(table_meta.drop_on, *now) { + continue; } + tb_metas.push((k, table_meta)); } + Ok(tb_metas) } @@ -3541,30 +3529,29 @@ async fn get_history_tables_for_gc( let mut filter_tb_infos = vec![]; - for chunk in args[..std::cmp::min(limit, args.len())].chunks(DEFAULT_MGET_SIZE) { - let table_id_idents = chunk.iter().map(|(table_id, _)| table_id.clone()); + let limited_args = &args[..std::cmp::min(limit, args.len())]; + let table_id_idents = limited_args.iter().map(|(table_id, _)| table_id.clone()); - let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?; + let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?; - for (seq_meta, (table_id, table_name)) in seq_metas.into_iter().zip(chunk.iter()) { - let Some(seq_meta) = seq_meta else { - error!( - "batch_filter_table_info cannot find {:?} table_meta", - table_id - ); - continue; - }; - - if !drop_time_range.contains(&seq_meta.data.drop_on) { - continue; - } + for (seq_meta, (table_id, table_name)) in seq_metas.into_iter().zip(limited_args.iter()) { + let Some(seq_meta) = seq_meta else { + error!( + "batch_filter_table_info cannot find {:?} table_meta", + table_id + ); + continue; + }; - filter_tb_infos.push(TableNIV::new( - DBIdTableName::new(db_id, table_name.clone()), - table_id.clone(), - seq_meta, - )); + if !drop_time_range.contains(&seq_meta.data.drop_on) { + continue; } + + filter_tb_infos.push(TableNIV::new( + DBIdTableName::new(db_id, table_name.clone()), + table_id.clone(), + seq_meta, + )); } Ok(filter_tb_infos)