Skip to content

Commit

Permalink
refactor: Automatically chunk large key sets in KVPbApi methods (#16561)
Browse files Browse the repository at this point in the history
- Modify KVPbApi::get_pb_vec() and KVPbApi::get_pb_values_vec()
  to automatically split large key sets into chunks of 256 keys

- Simplifies caller logic by eliminating the need for manual key set splitting
  • Loading branch information
drmingdrmer authored Oct 9, 2024
1 parent 25c2e4d commit 0c550c5
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 73 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ databend-common-meta-types = { workspace = true }
databend-common-proto-conv = { workspace = true }
fastrace = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
logcall = { workspace = true }
maplit = "1.0.2"
Expand Down
111 changes: 100 additions & 11 deletions src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use futures::stream;
use futures::stream::BoxStream;
use futures::stream::StreamExt;
use futures::TryStreamExt;
use itertools::Itertools;

pub(crate) use self::codec::decode_non_empty_item;
pub(crate) use self::codec::decode_seqv;
Expand All @@ -52,6 +53,9 @@ use crate::kv_pb_api::errors::StreamReadEof;

/// This trait provides a way to access a kv store with `kvapi::Key` type key and protobuf encoded value.
pub trait KVPbApi: KVApi {
/// The number of keys in one batch get.
const CHUNK_SIZE: usize = 256;

/// Update or insert a protobuf encoded value by kvapi::Key.
///
/// The key will be converted to string and the value is encoded by `FromToProto`.
Expand Down Expand Up @@ -189,21 +193,35 @@ pub trait KVPbApi: KVApi {
}

/// Same as [`get_pb_values`](Self::get_pb_values) but collect the result in a `Vec` instead of a stream.
///
/// If the number of keys is larger than [`Self::CHUNK_SIZE`], it will be split into multiple requests.
fn get_pb_values_vec<K, I>(
&self,
keys: I,
) -> impl Future<Output = Result<Vec<Option<SeqV<K::ValueType>>>, Self::Error>> + Send
where
K: kvapi::Key + 'static,
K: kvapi::Key + Send + 'static,
K::ValueType: FromToProto + Send + 'static,
I: IntoIterator<Item = K> + Send,
I::IntoIter: Send,
Self::Error: From<PbApiReadError<Self::Error>>,
{
let it = keys.into_iter();
let key_chunks = it
.chunks(Self::CHUNK_SIZE)
.into_iter()
.map(|x| x.collect::<Vec<_>>())
.collect::<Vec<_>>();

async move {
self.get_pb_values(keys)
.await?
.try_collect::<Vec<_>>()
.await
let mut res = vec![];
for chunk in key_chunks {
let strm = self.get_pb_values(chunk).await?;

let vec = strm.try_collect::<Vec<_>>().await?;
res.extend(vec);
}
Ok(res)
}
}

Expand Down Expand Up @@ -241,6 +259,8 @@ pub trait KVPbApi: KVApi {
}

/// Same as [`get_pb_stream`](Self::get_pb_stream) but collect the result in a `Vec` instead of a stream.
///
/// If the number of keys is larger than [`Self::CHUNK_SIZE`], it will be split into multiple requests.
fn get_pb_vec<K, I>(
&self,
keys: I,
Expand All @@ -249,11 +269,25 @@ pub trait KVPbApi: KVApi {
K: kvapi::Key + Send + 'static,
K::ValueType: FromToProto + Send + 'static,
I: IntoIterator<Item = K> + Send,
I::IntoIter: Send,
Self::Error: From<PbApiReadError<Self::Error>>,
{
let it = keys.into_iter();
let key_chunks = it
.chunks(Self::CHUNK_SIZE)
.into_iter()
.map(|x| x.collect::<Vec<_>>())
.collect::<Vec<_>>();

async move {
let kvs = self.get_pb_stream(keys).await?.try_collect().await?;
Ok(kvs)
let mut res = vec![];
for chunk in key_chunks {
let strm = self.get_pb_stream(chunk).await?;

let vec = strm.try_collect::<Vec<_>>().await?;
res.extend(vec);
}
Ok(res)
}
}

Expand Down Expand Up @@ -497,14 +531,14 @@ mod tests {
use crate::kv_pb_api::KVPbApi;

//
struct Foo {
struct FooKV {
/// Whether to return without exhausting the input for `get_kv_stream`.
early_return: Option<usize>,
kvs: BTreeMap<String, SeqV>,
}

#[async_trait]
impl KVApi for Foo {
impl KVApi for FooKV {
type Error = MetaError;

async fn upsert_kv(&self, _req: UpsertKVReq) -> Result<UpsertKVReply, Self::Error> {
Expand Down Expand Up @@ -560,7 +594,7 @@ mod tests {
};
let v = catalog_meta.to_pb()?.encode_to_vec();

let foo = Foo {
let foo = FooKV {
early_return: Some(2),
kvs: vec![
(s("__fd_catalog_by_id/1"), SeqV::new(1, v.clone())),
Expand Down Expand Up @@ -613,7 +647,7 @@ mod tests {
};
let v = catalog_meta.to_pb()?.encode_to_vec();

let foo = Foo {
let foo = FooKV {
early_return: None,
kvs: vec![
(s("__fd_catalog_by_id/1"), SeqV::new(1, v.clone())),
Expand Down Expand Up @@ -675,6 +709,61 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_get_pb_vec_span_chunk() -> anyhow::Result<()> {
let catalog_meta = CatalogMeta {
catalog_option: CatalogOption::Hive(HiveCatalogOption {
address: "127.0.0.1:10000".to_string(),
storage_params: None,
}),
created_on: DateTime::<Utc>::MIN_UTC,
};
let catalog_bytes = catalog_meta.to_pb()?.encode_to_vec();

let n = 1024;
let mut kvs = vec![];
for i in 1..=n {
let key = s(format!("__fd_catalog_by_id/{}", i));
let value = SeqV::new(i, catalog_bytes.clone());
kvs.push((key, value));
}

let foo = FooKV {
early_return: None,
kvs: kvs.into_iter().collect(),
};

assert!(FooKV::CHUNK_SIZE < n as usize);

let tenant = Tenant::new_literal("dummy");

{
let got = foo
.get_pb_vec((1..=n).map(|i| CatalogIdIdent::new(&tenant, i)))
.await?;

for i in 1..=n {
let key = CatalogIdIdent::new(&tenant, i);
assert_eq!(key, got[i as usize - 1].0.clone());
let value = got[i as usize - 1].1.clone().unwrap();
assert_eq!(i, value.seq());
}
}

{
let got = foo
.get_pb_values_vec((1..=n).map(|i| CatalogIdIdent::new(&tenant, i)))
.await?;

for i in 1..=n {
let value = got[i as usize - 1].clone().unwrap();
assert_eq!(i, value.seq());
}
}

Ok(())
}

fn s(x: impl ToString) -> String {
x.to_string()
}
Expand Down
7 changes: 4 additions & 3 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,15 @@ pub trait SchemaApi: Send + Sync {
async fn list_databases(
&self,
req: ListDatabaseReq,
) -> Result<Vec<Arc<DatabaseInfo>>, KVAppError>;
) -> Result<Vec<Arc<DatabaseInfo>>, MetaError>;

async fn rename_database(
&self,
req: RenameDatabaseReq,
) -> Result<RenameDatabaseReply, KVAppError>;

/// Retrieves all databases for a specific tenant, including those marked as dropped.
/// Retrieves all databases for a specific tenant,
/// optionally including those marked as dropped.
///
/// * `include_non_retainable` -
/// If true, includes databases that are beyond the retention period.
Expand All @@ -140,7 +141,7 @@ pub trait SchemaApi: Send + Sync {
&self,
req: ListDatabaseReq,
include_non_retainable: bool,
) -> Result<Vec<Arc<DatabaseInfo>>, KVAppError>;
) -> Result<Vec<Arc<DatabaseInfo>>, MetaError>;

// index

Expand Down
105 changes: 46 additions & 59 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
&self,
req: ListDatabaseReq,
include_non_retainable: bool,
) -> Result<Vec<Arc<DatabaseInfo>>, KVAppError> {
) -> Result<Vec<Arc<DatabaseInfo>>, MetaError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let name_ident = DatabaseIdHistoryIdent::new(&req.tenant, "dummy");
Expand All @@ -652,26 +652,23 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let ids = db_id_list
.id_list
.iter()
.map(|db_id| DatabaseId { db_id: *db_id })
.collect::<Vec<_>>();
.map(|db_id| DatabaseId { db_id: *db_id });

for db_ids in ids.chunks(DEFAULT_MGET_SIZE) {
let id_metas = self.get_pb_vec(db_ids.iter().cloned()).await?;
let id_metas = self.get_pb_vec(ids).await?;

for (db_id, db_meta) in id_metas {
let Some(db_meta) = db_meta else {
error!("get_database_history cannot find {:?} db_meta", db_id);
continue;
};
for (db_id, db_meta) in id_metas {
let Some(db_meta) = db_meta else {
error!("get_database_history cannot find {:?} db_meta", db_id);
continue;
};

let db = DatabaseInfo {
database_id: db_id,
name_ident: DatabaseNameIdent::new_from(db_id_list_key.clone()),
meta: db_meta,
};
let db = DatabaseInfo {
database_id: db_id,
name_ident: DatabaseNameIdent::new_from(db_id_list_key.clone()),
meta: db_meta,
};

dbs.insert(db_id.db_id, Arc::new(db));
}
dbs.insert(db_id.db_id, Arc::new(db));
}
}

Expand Down Expand Up @@ -706,7 +703,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
async fn list_databases(
&self,
req: ListDatabaseReq,
) -> Result<Vec<Arc<DatabaseInfo>>, KVAppError> {
) -> Result<Vec<Arc<DatabaseInfo>>, MetaError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let name_key = DatabaseNameIdent::new(req.tenant(), "dummy");
Expand Down Expand Up @@ -1659,11 +1656,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
.map(|id| TableId { table_id: id })
.collect::<Vec<_>>();

let mut seq_metas = vec![];
for chunk in ids.chunks(DEFAULT_MGET_SIZE) {
let got = self.get_pb_values_vec(chunk.to_vec()).await?;
seq_metas.extend(got);
}
let seq_metas = self.get_pb_values_vec(ids.clone()).await?;

let res = names
.into_iter()
Expand Down Expand Up @@ -3091,27 +3084,22 @@ async fn get_table_meta_history(
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, KVAppError> {
let mut tb_metas = vec![];

let inner_keys = tb_id_list
.id_list
.into_iter()
.map(TableId::new)
.collect::<Vec<_>>();
let inner_keys = tb_id_list.id_list.into_iter().map(TableId::new);

for c in inner_keys.chunks(DEFAULT_MGET_SIZE) {
let kvs = kv_api.get_pb_vec(c.iter().cloned()).await?;
let kvs = kv_api.get_pb_vec(inner_keys).await?;

for (k, table_meta) in kvs {
let Some(table_meta) = table_meta else {
error!("get_table_history cannot find {:?} table_meta", k);
continue;
};
for (k, table_meta) in kvs {
let Some(table_meta) = table_meta else {
error!("get_table_history cannot find {:?} table_meta", k);
continue;
};

if !is_drop_time_retainable(table_meta.drop_on, *now) {
continue;
}
tb_metas.push((k, table_meta));
if !is_drop_time_retainable(table_meta.drop_on, *now) {
continue;
}
tb_metas.push((k, table_meta));
}

Ok(tb_metas)
}

Expand Down Expand Up @@ -3541,30 +3529,29 @@ async fn get_history_tables_for_gc(

let mut filter_tb_infos = vec![];

for chunk in args[..std::cmp::min(limit, args.len())].chunks(DEFAULT_MGET_SIZE) {
let table_id_idents = chunk.iter().map(|(table_id, _)| table_id.clone());
let limited_args = &args[..std::cmp::min(limit, args.len())];
let table_id_idents = limited_args.iter().map(|(table_id, _)| table_id.clone());

let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?;
let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?;

for (seq_meta, (table_id, table_name)) in seq_metas.into_iter().zip(chunk.iter()) {
let Some(seq_meta) = seq_meta else {
error!(
"batch_filter_table_info cannot find {:?} table_meta",
table_id
);
continue;
};

if !drop_time_range.contains(&seq_meta.data.drop_on) {
continue;
}
for (seq_meta, (table_id, table_name)) in seq_metas.into_iter().zip(limited_args.iter()) {
let Some(seq_meta) = seq_meta else {
error!(
"batch_filter_table_info cannot find {:?} table_meta",
table_id
);
continue;
};

filter_tb_infos.push(TableNIV::new(
DBIdTableName::new(db_id, table_name.clone()),
table_id.clone(),
seq_meta,
));
if !drop_time_range.contains(&seq_meta.data.drop_on) {
continue;
}

filter_tb_infos.push(TableNIV::new(
DBIdTableName::new(db_id, table_name.clone()),
table_id.clone(),
seq_meta,
));
}

Ok(filter_tb_infos)
Expand Down

0 comments on commit 0c550c5

Please sign in to comment.