Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1496]♻️Refactor SendMessageRequestHeader with RequestHeaderCodec derive macro #1497

Merged
merged 2 commits into from
Dec 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,45 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use std::collections::HashMap;

use cheetah_string::CheetahString;
use rocketmq_macros::RequestHeaderCodec;
use serde::Deserialize;
use serde::Serialize;

use crate::code::request_code::RequestCode;
use crate::protocol::command_custom_header::CommandCustomHeader;
use crate::protocol::command_custom_header::FromMap;
use crate::protocol::header::message_operation_header::send_message_request_header_v2::SendMessageRequestHeaderV2;
use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
use crate::protocol::remoting_command::RemotingCommand;
use crate::rpc::topic_request_header::TopicRequestHeader;

#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Serialize, Deserialize, Default, RequestHeaderCodec)]
#[serde(rename_all = "camelCase")]
pub struct SendMessageRequestHeader {
#[required]
pub producer_group: CheetahString,

#[required]
pub topic: CheetahString,

#[required]
pub default_topic: CheetahString,

#[required]
pub default_topic_queue_nums: i32,

#[required]
pub queue_id: Option<i32>,

#[required]
pub sys_flag: i32,

#[required]
pub born_timestamp: i64,

#[required]
Comment on lines +32 to +53
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Inconsistency with #[required] annotations on optional fields.

The fields queue_id (line 44), which is of type Option<i32>, is marked as #[required]. Typically, required fields should not be optional. This could lead to confusion or runtime errors when the field is expected to be always present.

Consider updating the field to reflect its required status:

 #[required]
-pub queue_id: Option<i32>,
+pub queue_id: i32,

Alternatively, if the field is truly optional, remove the #[required] annotation:

-#[required]
 pub queue_id: Option<i32>,

Committable suggestion skipped: line range outside the PR's diff.

pub flag: i32,

pub properties: Option<CheetahString>,
pub reconsume_times: Option<i32>,
pub unit_mode: Option<bool>,
Expand All @@ -48,7 +62,7 @@ pub struct SendMessageRequestHeader {
pub topic_request_header: Option<TopicRequestHeader>,
}

impl SendMessageRequestHeader {
/*impl SendMessageRequestHeader {
pub const PRODUCER_GROUP: &'static str = "producerGroup";
pub const TOPIC: &'static str = "topic";
pub const DEFAULT_TOPIC: &'static str = "defaultTopic";
Expand Down Expand Up @@ -239,7 +253,7 @@ impl FromMap for SendMessageRequestHeader {
topic_request_header: Some(<TopicRequestHeader as FromMap>::from(map)?),
})
}
}
}*/

impl TopicRequestHeaderTrait for SendMessageRequestHeader {
fn set_lo(&mut self, lo: Option<bool>) {
Expand Down Expand Up @@ -366,3 +380,273 @@ pub fn parse_request_header(
None => request.decode_command_custom_header::<SendMessageRequestHeader>(),
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;

use cheetah_string::CheetahString;

use super::*;
use crate::code::request_code::RequestCode;
use crate::protocol::command_custom_header::CommandCustomHeader;
use crate::protocol::command_custom_header::FromMap;
use crate::protocol::remoting_command::RemotingCommand;

#[test]
fn parse_request_header_handles_invalid_request_code() {
let request = RemotingCommand::create_remoting_command(RequestCode::SendBatchMessage);
let request_code = RequestCode::SendBatchMessage;
let result = parse_request_header(&request, request_code);
assert!(result.is_err());
}

#[test]
fn parse_request_header_handles_missing_header() {
let request = RemotingCommand::create_remoting_command(RequestCode::SendMessageV2);
let request_code = RequestCode::SendMessageV2;
let result = parse_request_header(&request, request_code);
assert!(result.is_err());
}

#[test]
fn send_message_request_header_serializes_correctly() {
let header = SendMessageRequestHeader {
producer_group: CheetahString::from_static_str("test_producer_group"),
topic: CheetahString::from_static_str("test_topic"),
default_topic: CheetahString::from_static_str("test_default_topic"),
default_topic_queue_nums: 8,
queue_id: Some(1),
sys_flag: 0,
born_timestamp: 1622547800000,
flag: 0,
properties: Some(CheetahString::from_static_str("test_properties")),
reconsume_times: Some(3),
unit_mode: Some(true),
batch: Some(false),
max_reconsume_times: Some(5),
topic_request_header: None,
};
let map = header.to_map().unwrap();
assert_eq!(
map.get(&CheetahString::from_static_str("producerGroup"))
.unwrap(),
"test_producer_group"
);
assert_eq!(
map.get(&CheetahString::from_static_str("topic")).unwrap(),
"test_topic"
);
assert_eq!(
map.get(&CheetahString::from_static_str("defaultTopic"))
.unwrap(),
"test_default_topic"
);
assert_eq!(
map.get(&CheetahString::from_static_str("defaultTopicQueueNums"))
.unwrap(),
"8"
);
assert_eq!(
map.get(&CheetahString::from_static_str("queueId")).unwrap(),
"1"
);
assert_eq!(
map.get(&CheetahString::from_static_str("sysFlag")).unwrap(),
"0"
);
assert_eq!(
map.get(&CheetahString::from_static_str("bornTimestamp"))
.unwrap(),
"1622547800000"
);
assert_eq!(
map.get(&CheetahString::from_static_str("flag")).unwrap(),
"0"
);
assert_eq!(
map.get(&CheetahString::from_static_str("properties"))
.unwrap(),
"test_properties"
);
assert_eq!(
map.get(&CheetahString::from_static_str("reconsumeTimes"))
.unwrap(),
"3"
);
assert_eq!(
map.get(&CheetahString::from_static_str("unitMode"))
.unwrap(),
"true"
);
assert_eq!(
map.get(&CheetahString::from_static_str("batch")).unwrap(),
"false"
);
assert_eq!(
map.get(&CheetahString::from_static_str("maxReconsumeTimes"))
.unwrap(),
"5"
);
}

#[test]
fn send_message_request_header_deserializes_correctly() {
let mut map = HashMap::new();
map.insert(
CheetahString::from_static_str("producerGroup"),
CheetahString::from_static_str("test_producer_group"),
);
map.insert(
CheetahString::from_static_str("topic"),
CheetahString::from_static_str("test_topic"),
);
map.insert(
CheetahString::from_static_str("defaultTopic"),
CheetahString::from_static_str("test_default_topic"),
);
map.insert(
CheetahString::from_static_str("defaultTopicQueueNums"),
CheetahString::from_static_str("8"),
);
map.insert(
CheetahString::from_static_str("queueId"),
CheetahString::from_static_str("1"),
);
map.insert(
CheetahString::from_static_str("sysFlag"),
CheetahString::from_static_str("0"),
);
map.insert(
CheetahString::from_static_str("bornTimestamp"),
CheetahString::from_static_str("1622547800000"),
);
map.insert(
CheetahString::from_static_str("flag"),
CheetahString::from_static_str("0"),
);
map.insert(
CheetahString::from_static_str("properties"),
CheetahString::from_static_str("test_properties"),
);
map.insert(
CheetahString::from_static_str("reconsumeTimes"),
CheetahString::from_static_str("3"),
);
map.insert(
CheetahString::from_static_str("unitMode"),
CheetahString::from_static_str("true"),
);
map.insert(
CheetahString::from_static_str("batch"),
CheetahString::from_static_str("false"),
);
map.insert(
CheetahString::from_static_str("maxReconsumeTimes"),
CheetahString::from_static_str("5"),
);

let header = <SendMessageRequestHeader as FromMap>::from(&map).unwrap();
assert_eq!(header.producer_group, "test_producer_group");
assert_eq!(header.topic, "test_topic");
assert_eq!(header.default_topic, "test_default_topic");
assert_eq!(header.default_topic_queue_nums, 8);
assert_eq!(header.queue_id.unwrap(), 1);
assert_eq!(header.sys_flag, 0);
assert_eq!(header.born_timestamp, 1622547800000);
assert_eq!(header.flag, 0);
assert_eq!(header.properties.unwrap(), "test_properties");
assert_eq!(header.reconsume_times.unwrap(), 3);
assert_eq!(header.unit_mode.unwrap(), true);
assert_eq!(header.batch.unwrap(), false);
assert_eq!(header.max_reconsume_times.unwrap(), 5);
}

#[test]
fn send_message_request_header_handles_missing_optional_fields() {
let mut map = HashMap::new();
map.insert(
CheetahString::from_static_str("queueId"),
CheetahString::from_static_str("1"),
);
map.insert(
CheetahString::from_static_str("producerGroup"),
CheetahString::from_static_str("test_producer_group"),
);
map.insert(
CheetahString::from_static_str("topic"),
CheetahString::from_static_str("test_topic"),
);
map.insert(
CheetahString::from_static_str("defaultTopic"),
CheetahString::from_static_str("test_default_topic"),
);
map.insert(
CheetahString::from_static_str("defaultTopicQueueNums"),
CheetahString::from_static_str("8"),
);
map.insert(
CheetahString::from_static_str("sysFlag"),
CheetahString::from_static_str("0"),
);
map.insert(
CheetahString::from_static_str("bornTimestamp"),
CheetahString::from_static_str("1622547800000"),
);
map.insert(
CheetahString::from_static_str("flag"),
CheetahString::from_static_str("0"),
);

let header = <SendMessageRequestHeader as FromMap>::from(&map).unwrap();
assert_eq!(header.producer_group, "test_producer_group");
assert_eq!(header.topic, "test_topic");
assert_eq!(header.default_topic, "test_default_topic");
assert_eq!(header.default_topic_queue_nums, 8);
assert!(header.queue_id.is_some());
assert_eq!(header.sys_flag, 0);
assert_eq!(header.born_timestamp, 1622547800000);
assert_eq!(header.flag, 0);
assert!(header.properties.is_none());
assert!(header.reconsume_times.is_none());
assert!(header.unit_mode.is_none());
assert!(header.batch.is_none());
assert!(header.max_reconsume_times.is_none());
}

#[test]
fn send_message_request_header_handles_invalid_data() {
let mut map = HashMap::new();
map.insert(
CheetahString::from_static_str("producerGroup"),
CheetahString::from_static_str("test_producer_group"),
);
map.insert(
CheetahString::from_static_str("topic"),
CheetahString::from_static_str("test_topic"),
);
map.insert(
CheetahString::from_static_str("defaultTopic"),
CheetahString::from_static_str("test_default_topic"),
);
map.insert(
CheetahString::from_static_str("defaultTopicQueueNums"),
CheetahString::from_static_str("invalid"),
);
map.insert(
CheetahString::from_static_str("sysFlag"),
CheetahString::from_static_str("invalid"),
);
map.insert(
CheetahString::from_static_str("bornTimestamp"),
CheetahString::from_static_str("invalid"),
);
map.insert(
CheetahString::from_static_str("flag"),
CheetahString::from_static_str("invalid"),
);

let result = <SendMessageRequestHeader as FromMap>::from(&map);
assert!(result.is_err());
}
}
Loading