Skip to content

Commit

Permalink
Merge pull request #8568 from drmingdrmer/51-inv-reply
Browse files Browse the repository at this point in the history
refactor(meta): return InvalidReply if fail to decode a message
  • Loading branch information
mergify[bot] authored Nov 1, 2022
2 parents 82d07df + c93a4f4 commit 9b61641
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 29 deletions.
35 changes: 12 additions & 23 deletions src/meta/service/src/meta_service/raftmeta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,30 +640,19 @@ impl MetaNode {
Ok(r) => {
let reply = r.into_inner();

if reply.error.is_empty() {
// No error. It does not have to decode an error from an old databend-meta

let res: Result<ForwardResponse, MetaAPIError> = reply.into();
match res {
Ok(v) => {
info!("join cluster via {} success: {:?}", addr, v);
return Ok(Ok(()));
}
Err(e) => {
error!("join cluster via {} fail: {}", addr, e.to_string());
errors.push(
AnyError::new(&e)
.add_context(|| format!("join via: {}", addr.clone())),
);
}
let res: Result<ForwardResponse, MetaAPIError> = reply.into();
match res {
Ok(v) => {
info!("join cluster via {} success: {:?}", addr, v);
return Ok(Ok(()));
}
Err(e) => {
error!("join cluster via {} fail: {}", addr, e.to_string());
errors.push(
AnyError::new(&e)
.add_context(|| format!("join via: {}", addr.clone())),
);
}
} else {
// TODO: workaround: error type changed. new version of databend-meta does not understand old databend-meta error.
error!("join cluster via {} fail: {}", addr, &reply.error);
errors.push(
AnyError::error(&reply.error)
.add_context(|| format!("join via: {}", addr.clone())),
);
}
}
Err(s) => {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/service/tests/it/meta_node/meta_node_seq_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_meta_types::protobuf::raft_service_client::RaftServiceClient;
use common_meta_types::AppliedState;
use common_meta_types::Cmd;
use common_meta_types::LogEntry;
use common_meta_types::RetryableError;
use common_meta_types::MetaError;
use databend_meta::init_meta_ut;
use databend_meta::meta_service::MetaNode;

Expand All @@ -46,7 +46,7 @@ async fn test_meta_node_incr_seq() -> anyhow::Result<()> {
};
let raft_reply = client.write(req).await?.into_inner();

let res: Result<AppliedState, RetryableError> = raft_reply.into();
let res: Result<AppliedState, MetaError> = raft_reply.into();
let resp: AppliedState = res?;
match resp {
AppliedState::Seq { seq } => {
Expand Down
8 changes: 8 additions & 0 deletions src/meta/types/src/errors/kv_app_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use common_meta_stoerr::MetaStorageError;
use tonic::Status;

use crate::AppError;
use crate::InvalidReply;
use crate::MetaAPIError;
use crate::MetaClientError;
use crate::MetaError;
Expand Down Expand Up @@ -90,3 +91,10 @@ impl From<MetaAPIError> for KVAppError {
Self::MetaError(meta_err)
}
}

impl From<InvalidReply> for KVAppError {
fn from(e: InvalidReply) -> Self {
let meta_err = MetaError::from(e);
Self::MetaError(meta_err)
}
}
8 changes: 8 additions & 0 deletions src/meta/types/src/errors/meta_api_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use openraft::error::ChangeMembershipError;
use openraft::error::Fatal;
use openraft::error::ForwardToLeader;

use crate::InvalidReply;
use crate::MetaNetworkError;

/// Errors raised when meta-service handling a request.
Expand Down Expand Up @@ -125,3 +126,10 @@ impl From<MetaDataReadError> for MetaOperationError {
MetaOperationError::from(de)
}
}

impl From<InvalidReply> for MetaAPIError {
fn from(e: InvalidReply) -> Self {
let net_err = MetaNetworkError::from(e);
Self::NetworkError(net_err)
}
}
8 changes: 8 additions & 0 deletions src/meta/types/src/errors/meta_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use serde::Deserialize;
use serde::Serialize;
use thiserror::Error;

use crate::InvalidReply;
use crate::MetaAPIError;
use crate::MetaClientError;
use crate::MetaNetworkError;
Expand Down Expand Up @@ -58,3 +59,10 @@ impl From<tonic::Status> for MetaError {
MetaError::NetworkError(net_err)
}
}

impl From<InvalidReply> for MetaError {
fn from(e: InvalidReply) -> Self {
let api_err = MetaAPIError::from(e);
Self::APIError(api_err)
}
}
68 changes: 64 additions & 4 deletions src/meta/types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::AppliedState;
use crate::Endpoint;
use crate::GetKVReply;
use crate::GetKVReq;
use crate::InvalidReply;
use crate::ListKVReply;
use crate::ListKVReq;
use crate::LogEntry;
Expand Down Expand Up @@ -215,14 +216,16 @@ impl From<AppliedState> for RaftReply {
impl<T, E> From<RaftReply> for Result<T, E>
where
T: DeserializeOwned,
E: DeserializeOwned,
E: DeserializeOwned + From<InvalidReply>,
{
fn from(msg: RaftReply) -> Self {
if !msg.data.is_empty() {
let resp: T = serde_json::from_str(&msg.data).expect("fail to deserialize");
Ok(resp)
let res: T = serde_json::from_str(&msg.data)
.map_err(|e| InvalidReply::new("can not decode RaftReply.data", &e))?;
Ok(res)
} else {
let err: E = serde_json::from_str(&msg.error).expect("fail to deserialize");
let err: E = serde_json::from_str(&msg.error)
.map_err(|e| InvalidReply::new("can not decode RaftReply.error", &e))?;
Err(err)
}
}
Expand Down Expand Up @@ -267,3 +270,60 @@ where E: DeserializeOwned
}
}
}

#[cfg(test)]
mod tests {

#[derive(serde::Serialize, serde::Deserialize)]
struct Foo {
i: i32,
}

use crate::protobuf::RaftReply;
use crate::MetaNetworkError;

#[test]
fn test_valid_reply() -> anyhow::Result<()> {
// Unable to decode `.data`

let msg = RaftReply {
data: "foo".to_string(),
error: "".to_string(),
};
let res: Result<Foo, MetaNetworkError> = msg.into();
match res {
Err(MetaNetworkError::InvalidReply(inv_reply)) => {
assert!(
inv_reply
.to_string()
.starts_with("InvalidReply: can not decode RaftReply.data")
);
}
_ => {
unreachable!("expect InvalidReply")
}
}

// Unable to decode `.error`

let msg = RaftReply {
data: "".to_string(),
error: "foo".to_string(),
};
let res: Result<Foo, MetaNetworkError> = msg.into();
match res {
Err(MetaNetworkError::InvalidReply(inv_reply)) => {
assert!(
inv_reply
.to_string()
.starts_with("InvalidReply: can not decode RaftReply.error")
);
}
_ => {
unreachable!("expect InvalidReply")
}
}

Ok(())
}
}

1 comment on commit 9b61641

@vercel
Copy link

@vercel vercel bot commented on 9b61641 Nov 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.rs
databend-git-main-databend.vercel.app
databend-databend.vercel.app
databend.vercel.app

Please sign in to comment.