Skip to content

Commit

Permalink
[ISSUE #1452]🔥Refactor rocketmq-remoting crate error handle🚨 (#1455)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Nov 30, 2024
1 parent 0f4413a commit 5b151ff
Show file tree
Hide file tree
Showing 16 changed files with 285 additions and 211 deletions.
2 changes: 1 addition & 1 deletion rocketmq-broker/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use thiserror::Error;
#[derive(Debug, Error)]
pub enum BrokerError {
#[error("broker client error: {0}")]
BrokerClientError(#[from] rocketmq_remoting::error::Error),
BrokerClientError(#[from] rocketmq_remoting::remoting_error::RemotingError),

#[error("Common error: {0}")]
BrokerCommonError(#[from] rocketmq_common::error::Error),
Expand Down
2 changes: 1 addition & 1 deletion rocketmq-broker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ pub(crate) mod topic;
mod transaction;
pub(crate) mod util;

type RemotingError = rocketmq_remoting::error::Error;
type RemotingError = rocketmq_remoting::remoting_error::RemotingError;
type Result<T> = std::result::Result<T, BrokerError>;
2 changes: 1 addition & 1 deletion rocketmq-client/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use rocketmq_remoting::error::Error as RemotingError;
use rocketmq_remoting::remoting_error::RemotingError;
use thiserror::Error;

#[derive(Debug, Error)]
Expand Down
12 changes: 6 additions & 6 deletions rocketmq-remoting/src/clients/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ use crate::base::connection_net_event::ConnectionNetEvent;
use crate::base::response_future::ResponseFuture;
use crate::code::response_code::ResponseCode;
use crate::connection::Connection;
use crate::error::Error::ConnectionInvalid;
use crate::error::Error::Io;
use crate::error::Error::RemoteException;
use crate::net::channel::Channel;
use crate::protocol::remoting_command::RemotingCommand;
use crate::protocol::RemotingCommandType;
use crate::remoting_error::RemotingError::ConnectionInvalid;
use crate::remoting_error::RemotingError::Io;
use crate::remoting_error::RemotingError::RemoteError;
use crate::runtime::connection_handler_context::ConnectionHandlerContextWrapper;
use crate::runtime::processor::RequestProcessor;
use crate::Result;
Expand Down Expand Up @@ -270,11 +270,11 @@ impl Client {
.send((request, Some(tx), Some(timeout_millis)))
.await
{
return Err(RemoteException(err.to_string()));
return Err(RemoteError(err.to_string()));
}
match rx.await {
Ok(value) => value,
Err(error) => Err(RemoteException(error.to_string())),
Err(error) => Err(RemoteError(error.to_string())),
}
}

Expand Down Expand Up @@ -314,7 +314,7 @@ impl Client {
},
}*/
if let Err(err) = self.tx.send((request, None, None)).await {
return Err(RemoteException(err.to_string()));
return Err(RemoteError(err.to_string()));
}
Ok(())
}
Expand Down
12 changes: 6 additions & 6 deletions rocketmq-remoting/src/clients/rocketmq_default_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ use tracing::warn;
use crate::base::connection_net_event::ConnectionNetEvent;
use crate::clients::Client;
use crate::clients::RemotingClient;
use crate::error::Error;
use crate::protocol::remoting_command::RemotingCommand;
use crate::remoting::RemotingService;
use crate::remoting_error::RemotingError;
use crate::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
use crate::runtime::config::client_config::TokioClientConfig;
use crate::runtime::processor::RequestProcessor;
Expand Down Expand Up @@ -333,7 +333,7 @@ impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqD
) -> Result<RemotingCommand> {
let client = self.get_and_create_client(addr).await;
match client {
None => Err(Error::RemoteException("get client failed".to_string())),
None => Err(RemotingError::RemoteError("get client failed".to_string())),
Some(mut client) => {
match self
.client_runtime
Expand All @@ -349,11 +349,11 @@ impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqD
Ok(result) => match result {
Ok(response) => match response {
Ok(value) => Ok(value),
Err(e) => Err(Error::RemoteException(e.to_string())),
Err(e) => Err(RemotingError::RemoteError(e.to_string())),
},
Err(err) => Err(Error::RemoteException(err.to_string())),
Err(err) => Err(RemotingError::RemoteError(err.to_string())),
},
Err(err) => Err(Error::RemoteException(err.to_string())),
Err(err) => Err(RemotingError::RemoteError(err.to_string())),
}
}
}
Expand All @@ -380,7 +380,7 @@ impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqD
.await
{
Ok(_) => Ok(()),
Err(err) => Err(Error::RemoteException(err.to_string())),
Err(err) => Err(RemotingError::RemoteError(err.to_string())),
}
});
}
Expand Down
8 changes: 4 additions & 4 deletions rocketmq-remoting/src/code/broker_request_code.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::str::FromStr;

use crate::error::Error;
use crate::error::Error::FromStrError;
use crate::remoting_error::RemotingError;
use crate::remoting_error::RemotingError::FromStrErr;

#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
pub enum BrokerRequestCode {
Expand Down Expand Up @@ -34,14 +34,14 @@ impl BrokerRequestCode {
}

impl FromStr for BrokerRequestCode {
type Err = Error;
type Err = RemotingError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.to_ascii_uppercase().as_str() {
"REGISTERBROKER" => Ok(BrokerRequestCode::RegisterBroker),
"BROKERHEARTBEAT" => Ok(BrokerRequestCode::BrokerHeartbeat),
"GETBROKERCLUSTERINFO" => Ok(BrokerRequestCode::GetBrokerClusterInfo),
_ => Err(FromStrError(format!(
_ => Err(FromStrErr(format!(
"Parse from string error,Invalid BrokerRequestCode: {}",
s
))),
Expand Down
6 changes: 3 additions & 3 deletions rocketmq-remoting/src/codec/remoting_command_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ use bytes::BytesMut;
use tokio_util::codec::Decoder;
use tokio_util::codec::Encoder;

use crate::error::Error;
use crate::protocol::remoting_command::RemotingCommand;
use crate::remoting_error::RemotingError;

/// Encodes a `RemotingCommand` into a `BytesMut` buffer.
///
Expand Down Expand Up @@ -59,7 +59,7 @@ impl RemotingCommandCodec {
}

impl Decoder for RemotingCommandCodec {
type Error = Error;
type Error = RemotingError;
type Item = RemotingCommand;

/// Decodes a `RemotingCommand` from a `BytesMut` buffer.
Expand Down Expand Up @@ -136,7 +136,7 @@ impl Decoder for RemotingCommandCodec {
}

impl Encoder<RemotingCommand> for RemotingCommandCodec {
type Error = Error;
type Error = RemotingError;

/// Encodes a `RemotingCommand` into a `BytesMut` buffer.
///
Expand Down
145 changes: 0 additions & 145 deletions rocketmq-remoting/src/error.rs

This file was deleted.

6 changes: 3 additions & 3 deletions rocketmq-remoting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,12 @@ pub mod clients;
pub mod code;
pub mod codec;
pub mod connection;
pub mod error;
pub mod net;
pub mod protocol;
pub mod remoting_error;

use crate::error::Error;
pub use crate::protocol::rocketmq_serializable;
use crate::remoting_error::RemotingError;

pub mod base;
pub mod remoting;
Expand All @@ -37,4 +37,4 @@ pub mod request_processor;
pub mod rpc;
pub mod runtime;

pub type Result<T, E = Error> = std::result::Result<T, E>;
pub type Result<T, E = RemotingError> = std::result::Result<T, E>;
10 changes: 5 additions & 5 deletions rocketmq-remoting/src/net/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@ use uuid::Uuid;

use crate::base::response_future::ResponseFuture;
use crate::connection::Connection;
use crate::error::Error;
use crate::error::Error::ChannelSendRequestFailed;
use crate::error::Error::Io;
use crate::protocol::remoting_command::RemotingCommand;
use crate::remoting_error::RemotingError;
use crate::remoting_error::RemotingError::ChannelSendRequestFailed;
use crate::remoting_error::RemotingError::Io;
use crate::Result;

#[derive(Clone)]
Expand Down Expand Up @@ -208,12 +208,12 @@ impl Channel {
Ok(response) => response,
Err(e) => {
self.response_table.remove(&opaque);
Err(Error::ChannelRecvRequestFailed(e.to_string()))
Err(RemotingError::ChannelRecvRequestFailed(e.to_string()))
}
},
Err(e) => {
self.response_table.remove(&opaque);
Err(Error::ChannelRecvRequestFailed(e.to_string()))
Err(RemotingError::ChannelRecvRequestFailed(e.to_string()))
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions rocketmq-remoting/src/protocol/remoting_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ use tracing::error;
use super::RemotingCommandType;
use super::SerializeType;
use crate::code::response_code::RemotingSysResponseCode;
use crate::error::Error;
use crate::protocol::command_custom_header::CommandCustomHeader;
use crate::protocol::command_custom_header::FromMap;
use crate::protocol::LanguageCode;
use crate::remoting_error::RemotingError;
use crate::rocketmq_serializable::RocketMQSerializable;

pub const SERIALIZE_TYPE_PROPERTY: &str = "rocketmq.serialize.type";
Expand Down Expand Up @@ -474,7 +474,7 @@ impl RemotingCommand {
let ori_header_length = cmd_data.get_i32();
let header_length = parse_header_length(ori_header_length);
if header_length > total_size - 4 {
return Err(Error::RemotingCommandDecoderError(format!(
return Err(RemotingError::RemotingCommandDecoderError(format!(
"Header length {} is greater than total size {}",
header_length, total_size
)));
Expand Down Expand Up @@ -504,7 +504,7 @@ impl RemotingCommand {
let cmd =
SerdeJsonUtils::from_json_slice::<RemotingCommand>(src).map_err(|error| {
// Handle deserialization error gracefully
Error::RemotingCommandDecoderError(format!(
RemotingError::RemotingCommandDecoderError(format!(
"Deserialization error: {}",
error
))
Expand Down Expand Up @@ -734,7 +734,7 @@ pub fn parse_header_length(size: i32) -> usize {
pub fn parse_serialize_type(size: i32) -> crate::Result<SerializeType> {
let code = (size >> 24) as u8;
match SerializeType::value_of(code) {
None => Err(Error::NotSupportSerializeType(code)),
None => Err(RemotingError::NotSupportSerializeType(code)),
Some(value) => Ok(value),
}
}
Expand Down
Loading

0 comments on commit 5b151ff

Please sign in to comment.