From 1b0556b80bb18487f67c8e856668b7e446c6dd21 Mon Sep 17 00:00:00 2001 From: Happy-debug-lang Date: Thu, 5 Dec 2024 22:51:55 +0800 Subject: [PATCH] =?UTF-8?q?[ISSUE=20#1507]=E2=99=BB=EF=B8=8FRefactor=20Bro?= =?UTF-8?q?kerHeartbeatRequestHeader=20with=20derive=20marco=20RequestHead?= =?UTF-8?q?erCodec?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../broker/broker_heartbeat_request_header.rs | 308 +++++++++--------- 1 file changed, 157 insertions(+), 151 deletions(-) diff --git a/rocketmq-remoting/src/protocol/header/broker/broker_heartbeat_request_header.rs b/rocketmq-remoting/src/protocol/header/broker/broker_heartbeat_request_header.rs index a37b4dcf..59a516b8 100644 --- a/rocketmq-remoting/src/protocol/header/broker/broker_heartbeat_request_header.rs +++ b/rocketmq-remoting/src/protocol/header/broker/broker_heartbeat_request_header.rs @@ -1,151 +1,157 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -use std::collections::HashMap; - -use anyhow::Error; -use cheetah_string::CheetahString; -use serde::Deserialize; -use serde::Serialize; - -use crate::protocol::command_custom_header::CommandCustomHeader; -use crate::protocol::command_custom_header::FromMap; - -#[derive(Debug, Serialize, Deserialize, Default)] -pub struct BrokerHeartbeatRequestHeader { - #[serde(rename = "clusterName")] - pub cluster_name: CheetahString, - #[serde(rename = "brokerAddr")] - pub broker_addr: CheetahString, - #[serde(rename = "brokerName")] - pub broker_name: CheetahString, - #[serde(rename = "brokerId")] - pub broker_id: Option, - pub epoch: Option, - #[serde(rename = "maxOffset")] - pub max_offset: Option, - #[serde(rename = "confirmOffset")] - pub confirm_offset: Option, - #[serde(rename = "heartbeatTimeoutMills")] - pub heartbeat_timeout_mills: Option, - #[serde(rename = "electionPriority")] - pub election_priority: Option, -} - -impl BrokerHeartbeatRequestHeader { - const BROKER_ADDR: &'static str = "brokerAddr"; - const BROKER_ID: &'static str = "brokerId"; - const BROKER_NAME: &'static str = "brokerName"; - const CLUSTER_NAME: &'static str = "clusterName"; - const CONFIRM_OFFSET: &'static str = "confirmOffset"; - const ELECTION_PRIORITY: &'static str = "electionPriority"; - const EPOCH: &'static str = "epoch"; - const HEARTBEAT_TIMEOUT_MILLIS: &'static str = "heartbeatTimeoutMills"; - const MAX_OFFSET: &'static str = "maxOffset"; - - pub fn new( - cluster_name: CheetahString, - broker_addr: CheetahString, - broker_name: CheetahString, - broker_id: Option, - epoch: Option, - max_offset: Option, - confirm_offset: Option, - heartbeat_timeout_mills: Option, - election_priority: Option, - ) -> Self { - Self { - cluster_name, - broker_addr, - broker_name, - broker_id, - epoch, - max_offset, - confirm_offset, - heartbeat_timeout_mills, - election_priority, - } - } -} - -impl FromMap for BrokerHeartbeatRequestHeader { - type Error = crate::remoting_error::RemotingError; - - type Target = BrokerHeartbeatRequestHeader; - - fn from(map: &HashMap) -> Result { - Ok(BrokerHeartbeatRequestHeader { - cluster_name: map - .get(&CheetahString::from_static_str( - BrokerHeartbeatRequestHeader::CLUSTER_NAME, - )) - .cloned() - .unwrap_or_default(), - broker_addr: map - .get(&CheetahString::from_static_str( - BrokerHeartbeatRequestHeader::BROKER_ADDR, - )) - .cloned() - .unwrap_or_default(), - broker_name: map - .get(&CheetahString::from_static_str( - BrokerHeartbeatRequestHeader::BROKER_NAME, - )) - .cloned() - .unwrap_or_default(), - broker_id: map - .get(&CheetahString::from_static_str( - BrokerHeartbeatRequestHeader::BROKER_ID, - )) - .and_then(|s| s.parse::().ok()), - epoch: map - .get(&CheetahString::from_static_str( - BrokerHeartbeatRequestHeader::EPOCH, - )) - .and_then(|s| s.parse::().ok()), - max_offset: map - .get(&CheetahString::from_static_str( - BrokerHeartbeatRequestHeader::MAX_OFFSET, - )) - .and_then(|s| s.parse::().ok()), - confirm_offset: map - .get(&CheetahString::from_static_str( - BrokerHeartbeatRequestHeader::CONFIRM_OFFSET, - )) - .and_then(|s| s.parse::().ok()), - heartbeat_timeout_mills: map - .get(&CheetahString::from_static_str( - BrokerHeartbeatRequestHeader::HEARTBEAT_TIMEOUT_MILLIS, - )) - .and_then(|s| s.parse::().ok()), - election_priority: map - .get(&CheetahString::from_static_str( - BrokerHeartbeatRequestHeader::ELECTION_PRIORITY, - )) - .and_then(|s| s.parse::().ok()), - }) - } -} - -impl CommandCustomHeader for BrokerHeartbeatRequestHeader { - fn check_fields(&self) -> anyhow::Result<(), Error> { - todo!() - } - - fn to_map(&self) -> Option> { - todo!() - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +use cheetah_string::CheetahString; +use rocketmq_macros::RequestHeaderCodec; +use serde::Deserialize; +use serde::Serialize; + +#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodec)] +pub struct BrokerHeartbeatRequestHeader { + #[serde(rename = "clusterName")] + #[required] + pub cluster_name: CheetahString, + + #[serde(rename = "brokerAddr")] + #[required] + pub broker_addr: CheetahString, + + #[serde(rename = "brokerName")] + #[required] + pub broker_name: CheetahString, + + #[serde(rename = "brokerId")] + pub broker_id: Option, + + pub epoch: Option, + + #[serde(rename = "maxOffset")] + pub max_offset: Option, + + #[serde(rename = "confirmOffset")] + pub confirm_offset: Option, + + #[serde(rename = "heartbeatTimeoutMills")] + pub heartbeat_timeout_mills: Option, + + #[serde(rename = "electionPriority")] + pub election_priority: Option, +} + +#[cfg(test)] +mod tests { + use cheetah_string::CheetahString; + + use super::*; + + #[test] + fn broker_heartbeat_request_header_with_required_fields() { + let header = BrokerHeartbeatRequestHeader { + cluster_name: CheetahString::from("testCluster"), + broker_addr: CheetahString::from("testAddr"), + broker_name: CheetahString::from("testBroker"), + broker_id: Some(1), + epoch: Some(1), + max_offset: Some(100), + confirm_offset: Some(50), + heartbeat_timeout_mills: Some(3000), + election_priority: Some(1), + }; + assert_eq!(header.cluster_name, CheetahString::from("testCluster")); + assert_eq!(header.broker_addr, CheetahString::from("testAddr")); + assert_eq!(header.broker_name, CheetahString::from("testBroker")); + assert_eq!(header.broker_id, Some(1)); + assert_eq!(header.epoch, Some(1)); + assert_eq!(header.max_offset, Some(100)); + assert_eq!(header.confirm_offset, Some(50)); + assert_eq!(header.heartbeat_timeout_mills, Some(3000)); + assert_eq!(header.election_priority, Some(1)); + } + + #[test] + fn broker_heartbeat_request_header_with_optional_fields() { + let header = BrokerHeartbeatRequestHeader { + cluster_name: CheetahString::from("testCluster"), + broker_addr: CheetahString::from("testAddr"), + broker_name: CheetahString::from("testBroker"), + broker_id: None, + epoch: None, + max_offset: None, + confirm_offset: None, + heartbeat_timeout_mills: None, + election_priority: None, + }; + assert_eq!(header.cluster_name, CheetahString::from("testCluster")); + assert_eq!(header.broker_addr, CheetahString::from("testAddr")); + assert_eq!(header.broker_name, CheetahString::from("testBroker")); + assert!(header.broker_id.is_none()); + assert!(header.epoch.is_none()); + assert!(header.max_offset.is_none()); + assert!(header.confirm_offset.is_none()); + assert!(header.heartbeat_timeout_mills.is_none()); + assert!(header.election_priority.is_none()); + } + + #[test] + fn broker_heartbeat_request_header_with_empty_values() { + let header = BrokerHeartbeatRequestHeader { + cluster_name: CheetahString::from(""), + broker_addr: CheetahString::from(""), + broker_name: CheetahString::from(""), + broker_id: None, + epoch: None, + max_offset: None, + confirm_offset: None, + heartbeat_timeout_mills: None, + election_priority: None, + }; + assert_eq!(header.cluster_name, CheetahString::from("")); + assert_eq!(header.broker_addr, CheetahString::from("")); + assert_eq!(header.broker_name, CheetahString::from("")); + assert!(header.broker_id.is_none()); + assert!(header.epoch.is_none()); + assert!(header.max_offset.is_none()); + assert!(header.confirm_offset.is_none()); + assert!(header.heartbeat_timeout_mills.is_none()); + assert!(header.election_priority.is_none()); + } + + #[test] + fn broker_heartbeat_request_header_with_long_values() { + let long_string = "a".repeat(1000); + let header = BrokerHeartbeatRequestHeader { + cluster_name: CheetahString::from(&long_string), + broker_addr: CheetahString::from(&long_string), + broker_name: CheetahString::from(&long_string), + broker_id: Some(1), + epoch: Some(1), + max_offset: Some(100), + confirm_offset: Some(50), + heartbeat_timeout_mills: Some(3000), + election_priority: Some(1), + }; + assert_eq!(header.cluster_name, CheetahString::from(&long_string)); + assert_eq!(header.broker_addr, CheetahString::from(&long_string)); + assert_eq!(header.broker_name, CheetahString::from(&long_string)); + assert_eq!(header.broker_id, Some(1)); + assert_eq!(header.epoch, Some(1)); + assert_eq!(header.max_offset, Some(100)); + assert_eq!(header.confirm_offset, Some(50)); + assert_eq!(header.heartbeat_timeout_mills, Some(3000)); + assert_eq!(header.election_priority, Some(1)); + } +}