Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Add item count check to KVPbApi::get_pb_stream() #16377

Merged
merged 2 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/meta/api/src/kv_pb_api/errors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,4 +23,5 @@ pub use self::decode_error::PbDecodeError;
pub use self::encode_error::PbEncodeError;
pub use self::read_error::NoneValue;
pub use self::read_error::PbApiReadError;
pub use self::read_error::StreamReadEof;
pub use self::write_error::PbApiWriteError;
22 changes: 22 additions & 0 deletions src/meta/api/src/kv_pb_api/errors/read_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use databend_common_meta_kvapi::kvapi;
use databend_common_meta_types::anyerror::AnyError;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::MetaError;

Expand All @@ -33,13 +34,31 @@ impl NoneValue {
}
}

#[derive(Clone, Debug, PartialEq, thiserror::Error)]
#[error("StreamReadEOF: expected {expected} items but only received {received} items")]
pub struct StreamReadEof {
expected: u64,
received: u64,
}

impl StreamReadEof {
pub fn new(expected: u64, received: u64) -> Self {
StreamReadEof { expected, received }
}

pub fn set_received(&mut self, received: u64) {
self.received = received;
}
}

/// An error occurs when reading protobuf encoded value from kv store.
#[derive(Clone, Debug, PartialEq, thiserror::Error)]
#[error("PbApiReadError: {0}")]
pub enum PbApiReadError<E> {
PbDecodeError(#[from] PbDecodeError),
KeyError(#[from] kvapi::KeyError),
NoneValue(#[from] NoneValue),
StreamReadEof(#[from] StreamReadEof),
/// Error returned from KVApi.
KvApiError(E),
}
Expand All @@ -54,6 +73,9 @@ impl From<PbApiReadError<MetaError>> for MetaError {
PbApiReadError::PbDecodeError(e) => MetaError::from(e),
PbApiReadError::KeyError(e) => MetaError::from(InvalidReply::new("", &e)),
PbApiReadError::NoneValue(e) => MetaError::from(InvalidReply::new("", &e)),
PbApiReadError::StreamReadEof(e) => {
MetaError::from(InvalidReply::new(e.to_string(), &AnyError::error("")))
}
PbApiReadError::KvApiError(e) => e,
}
}
Expand Down
91 changes: 86 additions & 5 deletions src/meta/api/src/kv_pb_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use databend_common_meta_types::UpsertKV;
use databend_common_proto_conv::FromToProto;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::stream;
use futures::stream::BoxStream;
use futures::stream::StreamExt;
use futures::TryStreamExt;
Expand All @@ -46,6 +47,7 @@ pub(crate) use self::codec::encode_operation;
pub use self::upsert_pb::UpsertPB;
use crate::kv_pb_api::errors::PbApiReadError;
use crate::kv_pb_api::errors::PbApiWriteError;
use crate::kv_pb_api::errors::StreamReadEof;

/// This trait provides a way to access a kv store with `kvapi::Key` type key and protobuf encoded value.
pub trait KVPbApi: KVApi {
Expand Down Expand Up @@ -160,7 +162,9 @@ pub trait KVPbApi: KVApi {
}

/// Same as `get_pb_stream` but does not return keys, only values.
#[deprecated(note = "stream may be closed. The caller must check it")]
///
/// It guaranteed to return the same number of results as the input keys.
/// If the backend stream closed before all keys are processed, the following items is filled with `StreamReadEof` Error.
fn get_pb_values<K, I>(
&self,
keys: I,
Expand Down Expand Up @@ -195,7 +199,9 @@ pub trait KVPbApi: KVApi {
/// The key will be converted to string and the returned value is decoded by `FromToProto`.
/// It returns the same error as `KVApi::Error`,
/// thus it requires KVApi::Error can describe a decoding error, i.e., `impl From<PbApiReadError>`.
#[deprecated(note = "stream may be closed. The caller must check it")]
///
/// It guaranteed to return the same number of results as the input keys.
/// If the backend stream closed before all keys are processed, the following items is filled with `StreamReadEof` Error.
fn get_pb_stream<K, I>(
&self,
keys: I,
Expand Down Expand Up @@ -244,6 +250,8 @@ pub trait KVPbApi: KVApi {
.collect::<Vec<_>>();

async move {
let sent = keys.len();

let strm = self.get_kv_stream(&keys).await?;

let strm = strm.map(|r: Result<StreamItem, Self::Error>| {
Expand All @@ -261,6 +269,26 @@ pub trait KVPbApi: KVApi {
Ok((k, v))
});

// If the backend stream is closed, fill it with `StreamReadEof` error.

let strm = strm
// chain with a stream of `StreamReadEof` error but without received count set.
.chain(stream::once(async move {
Err(PbApiReadError::StreamReadEof(StreamReadEof::new(
sent as u64,
0,
)))
}))
.take(sent)
// set received count for `StreamReadEof` error after `sent`
.enumerate()
.map(move |(i, mut r)| {
if let Err(PbApiReadError::StreamReadEof(e)) = &mut r {
e.set_received(i as u64)
}
r
});

Ok(strm.boxed())
}
}
Expand Down Expand Up @@ -387,6 +415,8 @@ mod tests {

//
struct Foo {
/// Whether to return without exhausting the input for `get_kv_stream`.
early_return: Option<usize>,
kvs: BTreeMap<String, SeqV>,
}

Expand All @@ -403,7 +433,14 @@ mod tests {
keys: &[String],
) -> Result<KVStream<Self::Error>, Self::Error> {
let mut res = Vec::with_capacity(keys.len());
for key in keys {
for (i, key) in keys.iter().enumerate() {
// For tesing early return stream.
if let Some(early_return) = self.early_return {
if i >= early_return {
break;
}
}

let k = key.clone();
let v = self.kvs.get(key).cloned();

Expand All @@ -428,6 +465,51 @@ mod tests {
// TODO: test upsert_kv
// TODO: test list_kv

/// If the backend stream returns early, the returned stream should be filled with error item at the end.
#[tokio::test]
async fn test_mget_early_return() -> anyhow::Result<()> {
let catalog_meta = CatalogMeta {
catalog_option: CatalogOption::Hive(HiveCatalogOption {
address: "127.0.0.1:10000".to_string(),
storage_params: None,
}),
created_on: DateTime::<Utc>::MIN_UTC,
};
let v = catalog_meta.to_pb()?.encode_to_vec();

let foo = Foo {
early_return: Some(2),
kvs: vec![
(s("__fd_catalog_by_id/1"), SeqV::new(1, v.clone())),
(s("__fd_catalog_by_id/2"), SeqV::new(2, v.clone())),
(s("__fd_catalog_by_id/3"), SeqV::new(3, v.clone())),
]
.into_iter()
.collect(),
};

let tenant = Tenant::new_literal("dummy");

// Get key value pairs
{
let strm = foo
.get_pb_stream([
CatalogIdIdent::new(&tenant, 1),
CatalogIdIdent::new(&tenant, 2),
CatalogIdIdent::new(&tenant, 4),
])
.await?;

let got = strm.try_collect::<Vec<_>>().await;
assert_eq!(
got.unwrap_err().to_string(),
r#"InvalidReply: StreamReadEOF: expected 3 items but only received 2 items; source: "#
);
}

Ok(())
}

#[tokio::test]
async fn test_mget() -> anyhow::Result<()> {
let catalog_meta = CatalogMeta {
Expand All @@ -449,6 +531,7 @@ mod tests {
let v = catalog_meta.to_pb()?.encode_to_vec();

let foo = Foo {
early_return: None,
kvs: vec![
(s("__fd_catalog_by_id/1"), SeqV::new(1, v.clone())),
(s("__fd_catalog_by_id/2"), SeqV::new(2, v.clone())),
Expand All @@ -462,7 +545,6 @@ mod tests {

// Get key value pairs
{
#[allow(deprecated)]
let strm = foo
.get_pb_stream([
CatalogIdIdent::new(&tenant, 1),
Expand All @@ -488,7 +570,6 @@ mod tests {

// Get values
{
#[allow(deprecated)]
let strm = foo
.get_pb_values([
CatalogIdIdent::new(&tenant, 1),
Expand Down
15 changes: 0 additions & 15 deletions src/meta/api/src/name_id_value_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@ use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::KVApi;
use databend_common_meta_types::anyerror::AnyError;
use databend_common_meta_types::Change;
use databend_common_meta_types::InvalidReply;
use databend_common_meta_types::MatchSeq;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::Operation;
Expand Down Expand Up @@ -281,22 +279,9 @@ where
id.into_t_ident(tenant)
});

#[allow(deprecated)]
let strm = self.get_pb_values(id_idents).await?;
let seq_metas = strm.try_collect::<Vec<_>>().await?;

if seq_metas.len() != name_ids.len() {
return Err(InvalidReply::new(
"seq_metas.len() {} != name_ids.len() {}",
&AnyError::error(format!(
"seq_metas.len() {} != name_ids.len() {}",
seq_metas.len(),
name_ids.len()
)),
)
.into());
}

let name_id_values =
name_ids
.into_iter()
Expand Down
15 changes: 0 additions & 15 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::anyerror::AnyError;
use databend_common_meta_types::protobuf as pb;
use databend_common_meta_types::seq_value::SeqV;
use databend_common_meta_types::seq_value::SeqValue;
Expand Down Expand Up @@ -3782,23 +3781,9 @@ impl<KV: kvapi::KVApi<Error = MetaError> + ?Sized> SchemaApi for KV {

// Batch get all catalog-metas.
// - A catalog-meta may be already deleted. It is Ok. Just ignore it.
#[allow(deprecated)]
let seq_metas = self.get_pb_values(kv_keys.clone()).await?;
let seq_metas = seq_metas.try_collect::<Vec<_>>().await?;

if seq_metas.len() != kv_keys.len() {
let err = InvalidReply::new(
"list_catalogs",
&AnyError::error(format!(
"mismatched catalog-meta count: got: {}, expect: {}",
seq_metas.len(),
kv_keys.len()
)),
);
let meta_net_err = MetaNetworkError::from(err);
return Err(KVAppError::MetaError(meta_net_err.into()));
}

let mut catalog_infos = Vec::with_capacity(kv_keys.len());

for (i, seq_meta_opt) in seq_metas.into_iter().enumerate() {
Expand Down
2 changes: 1 addition & 1 deletion src/meta/types/src/errors/meta_network_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl InvalidArgument {
}

#[derive(thiserror::Error, serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq)]
#[error("InvalidReply: {msg} source: {source}")]
#[error("InvalidReply: {msg}; source: {source}")]
pub struct InvalidReply {
msg: String,
#[source]
Expand Down
Loading