Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(meta): return InvalidReply if fail to decode a message #8568

Merged
merged 1 commit into from
Nov 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 12 additions & 23 deletions src/meta/service/src/meta_service/raftmeta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,30 +640,19 @@ impl MetaNode {
Ok(r) => {
let reply = r.into_inner();

if reply.error.is_empty() {
// No error. It does not have to decode an error from an old databend-meta

let res: Result<ForwardResponse, MetaAPIError> = reply.into();
match res {
Ok(v) => {
info!("join cluster via {} success: {:?}", addr, v);
return Ok(Ok(()));
}
Err(e) => {
error!("join cluster via {} fail: {}", addr, e.to_string());
errors.push(
AnyError::new(&e)
.add_context(|| format!("join via: {}", addr.clone())),
);
}
let res: Result<ForwardResponse, MetaAPIError> = reply.into();
match res {
Ok(v) => {
info!("join cluster via {} success: {:?}", addr, v);
return Ok(Ok(()));
}
Err(e) => {
error!("join cluster via {} fail: {}", addr, e.to_string());
errors.push(
AnyError::new(&e)
.add_context(|| format!("join via: {}", addr.clone())),
);
}
} else {
// TODO: workaround: error type changed. new version of databend-meta does not understand old databend-meta error.
error!("join cluster via {} fail: {}", addr, &reply.error);
errors.push(
AnyError::error(&reply.error)
.add_context(|| format!("join via: {}", addr.clone())),
);
}
}
Err(s) => {
Expand Down
4 changes: 2 additions & 2 deletions src/meta/service/tests/it/meta_node/meta_node_seq_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_meta_types::protobuf::raft_service_client::RaftServiceClient;
use common_meta_types::AppliedState;
use common_meta_types::Cmd;
use common_meta_types::LogEntry;
use common_meta_types::RetryableError;
use common_meta_types::MetaError;
use databend_meta::init_meta_ut;
use databend_meta::meta_service::MetaNode;

Expand All @@ -46,7 +46,7 @@ async fn test_meta_node_incr_seq() -> anyhow::Result<()> {
};
let raft_reply = client.write(req).await?.into_inner();

let res: Result<AppliedState, RetryableError> = raft_reply.into();
let res: Result<AppliedState, MetaError> = raft_reply.into();
let resp: AppliedState = res?;
match resp {
AppliedState::Seq { seq } => {
Expand Down
8 changes: 8 additions & 0 deletions src/meta/types/src/errors/kv_app_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use common_meta_stoerr::MetaStorageError;
use tonic::Status;

use crate::AppError;
use crate::InvalidReply;
use crate::MetaAPIError;
use crate::MetaClientError;
use crate::MetaError;
Expand Down Expand Up @@ -90,3 +91,10 @@ impl From<MetaAPIError> for KVAppError {
Self::MetaError(meta_err)
}
}

impl From<InvalidReply> for KVAppError {
fn from(e: InvalidReply) -> Self {
let meta_err = MetaError::from(e);
Self::MetaError(meta_err)
}
}
8 changes: 8 additions & 0 deletions src/meta/types/src/errors/meta_api_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use openraft::error::ChangeMembershipError;
use openraft::error::Fatal;
use openraft::error::ForwardToLeader;

use crate::InvalidReply;
use crate::MetaNetworkError;

/// Errors raised when meta-service handling a request.
Expand Down Expand Up @@ -125,3 +126,10 @@ impl From<MetaDataReadError> for MetaOperationError {
MetaOperationError::from(de)
}
}

impl From<InvalidReply> for MetaAPIError {
fn from(e: InvalidReply) -> Self {
let net_err = MetaNetworkError::from(e);
Self::NetworkError(net_err)
}
}
8 changes: 8 additions & 0 deletions src/meta/types/src/errors/meta_errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use serde::Deserialize;
use serde::Serialize;
use thiserror::Error;

use crate::InvalidReply;
use crate::MetaAPIError;
use crate::MetaClientError;
use crate::MetaNetworkError;
Expand Down Expand Up @@ -58,3 +59,10 @@ impl From<tonic::Status> for MetaError {
MetaError::NetworkError(net_err)
}
}

impl From<InvalidReply> for MetaError {
fn from(e: InvalidReply) -> Self {
let api_err = MetaAPIError::from(e);
Self::APIError(api_err)
}
}
68 changes: 64 additions & 4 deletions src/meta/types/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::AppliedState;
use crate::Endpoint;
use crate::GetKVReply;
use crate::GetKVReq;
use crate::InvalidReply;
use crate::ListKVReply;
use crate::ListKVReq;
use crate::LogEntry;
Expand Down Expand Up @@ -215,14 +216,16 @@ impl From<AppliedState> for RaftReply {
impl<T, E> From<RaftReply> for Result<T, E>
where
T: DeserializeOwned,
E: DeserializeOwned,
E: DeserializeOwned + From<InvalidReply>,
{
fn from(msg: RaftReply) -> Self {
if !msg.data.is_empty() {
let resp: T = serde_json::from_str(&msg.data).expect("fail to deserialize");
Ok(resp)
let res: T = serde_json::from_str(&msg.data)
.map_err(|e| InvalidReply::new("can not decode RaftReply.data", &e))?;
Ok(res)
} else {
let err: E = serde_json::from_str(&msg.error).expect("fail to deserialize");
let err: E = serde_json::from_str(&msg.error)
.map_err(|e| InvalidReply::new("can not decode RaftReply.error", &e))?;
Err(err)
}
}
Expand Down Expand Up @@ -267,3 +270,60 @@ where E: DeserializeOwned
}
}
}

#[cfg(test)]
mod tests {

#[derive(serde::Serialize, serde::Deserialize)]
struct Foo {
i: i32,
}

use crate::protobuf::RaftReply;
use crate::MetaNetworkError;

#[test]
fn test_valid_reply() -> anyhow::Result<()> {
// Unable to decode `.data`

let msg = RaftReply {
data: "foo".to_string(),
error: "".to_string(),
};
let res: Result<Foo, MetaNetworkError> = msg.into();
match res {
Err(MetaNetworkError::InvalidReply(inv_reply)) => {
assert!(
inv_reply
.to_string()
.starts_with("InvalidReply: can not decode RaftReply.data")
);
}
_ => {
unreachable!("expect InvalidReply")
}
}

// Unable to decode `.error`

let msg = RaftReply {
data: "".to_string(),
error: "foo".to_string(),
};
let res: Result<Foo, MetaNetworkError> = msg.into();
match res {
Err(MetaNetworkError::InvalidReply(inv_reply)) => {
assert!(
inv_reply
.to_string()
.starts_with("InvalidReply: can not decode RaftReply.error")
);
}
_ => {
unreachable!("expect InvalidReply")
}
}

Ok(())
}
}