Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Automatically chunk large key sets in KVPbApi methods #16561

Merged
merged 1 commit into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/meta/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ databend-common-meta-types = { workspace = true }
databend-common-proto-conv = { workspace = true }
fastrace = { workspace = true }
futures = { workspace = true }
itertools = { workspace = true }
log = { workspace = true }
logcall = { workspace = true }
maplit = "1.0.2"
Expand Down
111 changes: 100 additions & 11 deletions src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use futures::stream;
use futures::stream::BoxStream;
use futures::stream::StreamExt;
use futures::TryStreamExt;
use itertools::Itertools;

pub(crate) use self::codec::decode_non_empty_item;
pub(crate) use self::codec::decode_seqv;
Expand All @@ -52,6 +53,9 @@ use crate::kv_pb_api::errors::StreamReadEof;

/// This trait provides a way to access a kv store with `kvapi::Key` type key and protobuf encoded value.
pub trait KVPbApi: KVApi {
/// The number of keys in one batch get.
const CHUNK_SIZE: usize = 256;

/// Update or insert a protobuf encoded value by kvapi::Key.
///
/// The key will be converted to string and the value is encoded by `FromToProto`.
Expand Down Expand Up @@ -189,21 +193,35 @@ pub trait KVPbApi: KVApi {
}

/// Same as [`get_pb_values`](Self::get_pb_values) but collect the result in a `Vec` instead of a stream.
///
/// If the number of keys is larger than [`Self::CHUNK_SIZE`], it will be split into multiple requests.
fn get_pb_values_vec<K, I>(
&self,
keys: I,
) -> impl Future<Output = Result<Vec<Option<SeqV<K::ValueType>>>, Self::Error>> + Send
where
K: kvapi::Key + 'static,
K: kvapi::Key + Send + 'static,
K::ValueType: FromToProto + Send + 'static,
I: IntoIterator<Item = K> + Send,
I::IntoIter: Send,
Self::Error: From<PbApiReadError<Self::Error>>,
{
let it = keys.into_iter();
let key_chunks = it
.chunks(Self::CHUNK_SIZE)
.into_iter()
.map(|x| x.collect::<Vec<_>>())
.collect::<Vec<_>>();

async move {
self.get_pb_values(keys)
.await?
.try_collect::<Vec<_>>()
.await
let mut res = vec![];
for chunk in key_chunks {
let strm = self.get_pb_values(chunk).await?;

let vec = strm.try_collect::<Vec<_>>().await?;
res.extend(vec);
}
Ok(res)
}
}

Expand Down Expand Up @@ -241,6 +259,8 @@ pub trait KVPbApi: KVApi {
}

/// Same as [`get_pb_stream`](Self::get_pb_stream) but collect the result in a `Vec` instead of a stream.
///
/// If the number of keys is larger than [`Self::CHUNK_SIZE`], it will be split into multiple requests.
fn get_pb_vec<K, I>(
&self,
keys: I,
Expand All @@ -249,11 +269,25 @@ pub trait KVPbApi: KVApi {
K: kvapi::Key + Send + 'static,
K::ValueType: FromToProto + Send + 'static,
I: IntoIterator<Item = K> + Send,
I::IntoIter: Send,
Self::Error: From<PbApiReadError<Self::Error>>,
{
let it = keys.into_iter();
let key_chunks = it
.chunks(Self::CHUNK_SIZE)
.into_iter()
.map(|x| x.collect::<Vec<_>>())
.collect::<Vec<_>>();

async move {
let kvs = self.get_pb_stream(keys).await?.try_collect().await?;
Ok(kvs)
let mut res = vec![];
for chunk in key_chunks {
let strm = self.get_pb_stream(chunk).await?;

let vec = strm.try_collect::<Vec<_>>().await?;
res.extend(vec);
}
Ok(res)
}
}

Expand Down Expand Up @@ -497,14 +531,14 @@ mod tests {
use crate::kv_pb_api::KVPbApi;

//
struct Foo {
struct FooKV {
/// Whether to return without exhausting the input for `get_kv_stream`.
early_return: Option<usize>,
kvs: BTreeMap<String, SeqV>,
}

#[async_trait]
impl KVApi for Foo {
impl KVApi for FooKV {
type Error = MetaError;

async fn upsert_kv(&self, _req: UpsertKVReq) -> Result<UpsertKVReply, Self::Error> {
Expand Down Expand Up @@ -560,7 +594,7 @@ mod tests {
};
let v = catalog_meta.to_pb()?.encode_to_vec();

let foo = Foo {
let foo = FooKV {
early_return: Some(2),
kvs: vec![
(s("__fd_catalog_by_id/1"), SeqV::new(1, v.clone())),
Expand Down Expand Up @@ -613,7 +647,7 @@ mod tests {
};
let v = catalog_meta.to_pb()?.encode_to_vec();

let foo = Foo {
let foo = FooKV {
early_return: None,
kvs: vec![
(s("__fd_catalog_by_id/1"), SeqV::new(1, v.clone())),
Expand Down Expand Up @@ -675,6 +709,61 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn test_get_pb_vec_span_chunk() -> anyhow::Result<()> {
let catalog_meta = CatalogMeta {
catalog_option: CatalogOption::Hive(HiveCatalogOption {
address: "127.0.0.1:10000".to_string(),
storage_params: None,
}),
created_on: DateTime::<Utc>::MIN_UTC,
};
let catalog_bytes = catalog_meta.to_pb()?.encode_to_vec();

let n = 1024;
let mut kvs = vec![];
for i in 1..=n {
let key = s(format!("__fd_catalog_by_id/{}", i));
let value = SeqV::new(i, catalog_bytes.clone());
kvs.push((key, value));
}

let foo = FooKV {
early_return: None,
kvs: kvs.into_iter().collect(),
};

assert!(FooKV::CHUNK_SIZE < n as usize);

let tenant = Tenant::new_literal("dummy");

{
let got = foo
.get_pb_vec((1..=n).map(|i| CatalogIdIdent::new(&tenant, i)))
.await?;

for i in 1..=n {
let key = CatalogIdIdent::new(&tenant, i);
assert_eq!(key, got[i as usize - 1].0.clone());
let value = got[i as usize - 1].1.clone().unwrap();
assert_eq!(i, value.seq());
}
}

{
let got = foo
.get_pb_values_vec((1..=n).map(|i| CatalogIdIdent::new(&tenant, i)))
.await?;

for i in 1..=n {
let value = got[i as usize - 1].clone().unwrap();
assert_eq!(i, value.seq());
}
}

Ok(())
}

fn s(x: impl ToString) -> String {
x.to_string()
}
Expand Down
7 changes: 4 additions & 3 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,14 +124,15 @@ pub trait SchemaApi: Send + Sync {
async fn list_databases(
&self,
req: ListDatabaseReq,
) -> Result<Vec<Arc<DatabaseInfo>>, KVAppError>;
) -> Result<Vec<Arc<DatabaseInfo>>, MetaError>;

async fn rename_database(
&self,
req: RenameDatabaseReq,
) -> Result<RenameDatabaseReply, KVAppError>;

/// Retrieves all databases for a specific tenant, including those marked as dropped.
/// Retrieves all databases for a specific tenant,
/// optionally including those marked as dropped.
///
/// * `include_non_retainable` -
/// If true, includes databases that are beyond the retention period.
Expand All @@ -140,7 +141,7 @@ pub trait SchemaApi: Send + Sync {
&self,
req: ListDatabaseReq,
include_non_retainable: bool,
) -> Result<Vec<Arc<DatabaseInfo>>, KVAppError>;
) -> Result<Vec<Arc<DatabaseInfo>>, MetaError>;

// index

Expand Down
105 changes: 46 additions & 59 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
&self,
req: ListDatabaseReq,
include_non_retainable: bool,
) -> Result<Vec<Arc<DatabaseInfo>>, KVAppError> {
) -> Result<Vec<Arc<DatabaseInfo>>, MetaError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let name_ident = DatabaseIdHistoryIdent::new(&req.tenant, "dummy");
Expand All @@ -652,26 +652,23 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
let ids = db_id_list
.id_list
.iter()
.map(|db_id| DatabaseId { db_id: *db_id })
.collect::<Vec<_>>();
.map(|db_id| DatabaseId { db_id: *db_id });

for db_ids in ids.chunks(DEFAULT_MGET_SIZE) {
let id_metas = self.get_pb_vec(db_ids.iter().cloned()).await?;
let id_metas = self.get_pb_vec(ids).await?;

for (db_id, db_meta) in id_metas {
let Some(db_meta) = db_meta else {
error!("get_database_history cannot find {:?} db_meta", db_id);
continue;
};
for (db_id, db_meta) in id_metas {
let Some(db_meta) = db_meta else {
error!("get_database_history cannot find {:?} db_meta", db_id);
continue;
};

let db = DatabaseInfo {
database_id: db_id,
name_ident: DatabaseNameIdent::new_from(db_id_list_key.clone()),
meta: db_meta,
};
let db = DatabaseInfo {
database_id: db_id,
name_ident: DatabaseNameIdent::new_from(db_id_list_key.clone()),
meta: db_meta,
};

dbs.insert(db_id.db_id, Arc::new(db));
}
dbs.insert(db_id.db_id, Arc::new(db));
}
}

Expand Down Expand Up @@ -706,7 +703,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
async fn list_databases(
&self,
req: ListDatabaseReq,
) -> Result<Vec<Arc<DatabaseInfo>>, KVAppError> {
) -> Result<Vec<Arc<DatabaseInfo>>, MetaError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let name_key = DatabaseNameIdent::new(req.tenant(), "dummy");
Expand Down Expand Up @@ -1659,11 +1656,7 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {
.map(|id| TableId { table_id: id })
.collect::<Vec<_>>();

let mut seq_metas = vec![];
for chunk in ids.chunks(DEFAULT_MGET_SIZE) {
let got = self.get_pb_values_vec(chunk.to_vec()).await?;
seq_metas.extend(got);
}
let seq_metas = self.get_pb_values_vec(ids.clone()).await?;

let res = names
.into_iter()
Expand Down Expand Up @@ -3091,27 +3084,22 @@ async fn get_table_meta_history(
) -> Result<Vec<(TableId, SeqV<TableMeta>)>, KVAppError> {
let mut tb_metas = vec![];

let inner_keys = tb_id_list
.id_list
.into_iter()
.map(TableId::new)
.collect::<Vec<_>>();
let inner_keys = tb_id_list.id_list.into_iter().map(TableId::new);

for c in inner_keys.chunks(DEFAULT_MGET_SIZE) {
let kvs = kv_api.get_pb_vec(c.iter().cloned()).await?;
let kvs = kv_api.get_pb_vec(inner_keys).await?;

for (k, table_meta) in kvs {
let Some(table_meta) = table_meta else {
error!("get_table_history cannot find {:?} table_meta", k);
continue;
};
for (k, table_meta) in kvs {
let Some(table_meta) = table_meta else {
error!("get_table_history cannot find {:?} table_meta", k);
continue;
};

if !is_drop_time_retainable(table_meta.drop_on, *now) {
continue;
}
tb_metas.push((k, table_meta));
if !is_drop_time_retainable(table_meta.drop_on, *now) {
continue;
}
tb_metas.push((k, table_meta));
}

Ok(tb_metas)
}

Expand Down Expand Up @@ -3541,30 +3529,29 @@ async fn get_history_tables_for_gc(

let mut filter_tb_infos = vec![];

for chunk in args[..std::cmp::min(limit, args.len())].chunks(DEFAULT_MGET_SIZE) {
let table_id_idents = chunk.iter().map(|(table_id, _)| table_id.clone());
let limited_args = &args[..std::cmp::min(limit, args.len())];
let table_id_idents = limited_args.iter().map(|(table_id, _)| table_id.clone());

let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?;
let seq_metas = kv_api.get_pb_values_vec(table_id_idents).await?;

for (seq_meta, (table_id, table_name)) in seq_metas.into_iter().zip(chunk.iter()) {
let Some(seq_meta) = seq_meta else {
error!(
"batch_filter_table_info cannot find {:?} table_meta",
table_id
);
continue;
};

if !drop_time_range.contains(&seq_meta.data.drop_on) {
continue;
}
for (seq_meta, (table_id, table_name)) in seq_metas.into_iter().zip(limited_args.iter()) {
let Some(seq_meta) = seq_meta else {
error!(
"batch_filter_table_info cannot find {:?} table_meta",
table_id
);
continue;
};

filter_tb_infos.push(TableNIV::new(
DBIdTableName::new(db_id, table_name.clone()),
table_id.clone(),
seq_meta,
));
if !drop_time_range.contains(&seq_meta.data.drop_on) {
continue;
}

filter_tb_infos.push(TableNIV::new(
DBIdTableName::new(db_id, table_name.clone()),
table_id.clone(),
seq_meta,
));
}

Ok(filter_tb_infos)
Expand Down
Loading