Skip to content

Commit

Permalink
[ISSUE #1721]⚡️Optimize EscapeBridge struct logic
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Dec 12, 2024
1 parent 1aa60f8 commit 996e256
Showing 1 changed file with 46 additions and 25 deletions.
71 changes: 46 additions & 25 deletions rocketmq-broker/src/failover/escape_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use rocketmq_rust::ArcMut;
use rocketmq_store::base::message_result::PutMessageResult;
use rocketmq_store::base::message_status_enum::PutMessageStatus;
use rocketmq_store::log_file::MessageStore;
use tracing::error;
use tracing::warn;

use crate::out_api::broker_outer_api::BrokerOuterAPI;
Expand Down Expand Up @@ -144,9 +145,19 @@ where
&& self.broker_config.enable_remote_escape
{
message_ext.set_wait_store_msg_ok(false);
let send_result = self.put_message_to_remote_broker(message_ext, None).await;
transform_send_result2put_result(send_result)
match self.put_message_to_remote_broker(message_ext, None).await {
Ok(send_result) => transform_send_result2put_result(send_result),
Err(e) => {
error!("sendMessageInFailover to remote failed, {}", e);
PutMessageResult::new(PutMessageStatus::PutToRemoteBrokerFail, None, true)

Check warning on line 152 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L148-L152

Added lines #L148 - L152 were not covered by tests
}
}
} else {
warn!(
"Put message failed, enableSlaveActingMaster={}, enableRemoteEscape={}.",
self.broker_config.enable_slave_acting_master,
self.broker_config.enable_remote_escape

Check warning on line 159 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L156-L159

Added lines #L156 - L159 were not covered by tests
);
PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable)
}
}
Expand All @@ -155,12 +166,16 @@ where
&mut self,
message_ext: MessageExtBrokerInner,
mut broker_name_to_send: Option<CheetahString>,
) -> Option<SendResult> {
if broker_name_to_send.is_some()
&& self.broker_config.broker_identity.broker_name.as_str()
== broker_name_to_send.as_ref().unwrap().as_str()
) -> crate::Result<Option<SendResult>> {
let broker_name = self.broker_config.broker_identity.broker_name.as_str();
if broker_name.is_empty()
|| broker_name
== broker_name_to_send
.as_ref()
.map_or("", |value| value.as_str())

Check warning on line 175 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L169-L175

Added lines #L169 - L175 were not covered by tests
{
return None;
// not remote broker
return Ok(None);

Check warning on line 178 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L178

Added line #L178 was not covered by tests
}
let is_trans_half_message =
TransactionalMessageUtil::build_half_topic() == message_ext.get_topic();
Expand All @@ -182,7 +197,7 @@ where
message_to_put.get_topic(),
message_to_put.message_ext_inner.msg_id
);
return None;
return Ok(None);

Check warning on line 200 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L200

Added line #L200 was not covered by tests
}
let topic_publish_info = topic_publish_info.unwrap();
let _mq_selected = if !broker_name_to_send
Expand All @@ -204,7 +219,7 @@ where
message_to_put.message_ext_inner.msg_id,
mq.get_broker_name()
);
return None;
return Ok(None);

Check warning on line 222 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L222

Added line #L222 was not covered by tests
}
mq
} else {
Expand All @@ -225,7 +240,7 @@ where
message_to_put.message_ext_inner.msg_id,
broker_name_to_send.as_ref().unwrap()
);
return None;
return Ok(None);

Check warning on line 243 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L243

Added line #L243 was not covered by tests
}
let producer_group = self.get_producer_group(&message_to_put);
let result = self
Expand All @@ -237,17 +252,11 @@ where
producer_group,
SEND_TIMEOUT,
)
.await;
match result {
Ok(value) => {
if value.send_status == SendStatus::SendOk {
Some(value)
} else {
None
}
}
Err(_) => None,
.await?;
if result.send_status == SendStatus::SendOk {
return Ok(Some(result));

Check warning on line 257 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L255-L257

Added lines #L255 - L257 were not covered by tests
}
Ok(None)

Check warning on line 259 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L259

Added line #L259 was not covered by tests
}

fn get_producer_group(&self, message_ext: &MessageExtBrokerInner) -> CheetahString {
Expand Down Expand Up @@ -328,7 +337,7 @@ where
.try_to_find_topic_publish_info(message_ext.get_topic())
.await;
if topic_publish_info.is_none() {
return PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable);
return PutMessageResult::new(PutMessageStatus::PutToRemoteBrokerFail, None, true);

Check warning on line 340 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L340

Added line #L340 was not covered by tests
}
let topic_publish_info = topic_publish_info.unwrap();

Expand All @@ -350,7 +359,7 @@ where
.topic_route_info_manager
.find_broker_address_in_publish(Some(broker_name_to_send));
let producer_group = self.get_producer_group(&message_ext);
let result = self
match self

Check warning on line 362 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L362

Added line #L362 was not covered by tests
.broker_outer_api
.send_message_to_specific_broker(
broker_addr_to_send.as_ref().unwrap(),
Expand All @@ -359,9 +368,20 @@ where
producer_group,
SEND_TIMEOUT,
)
.await;
transform_send_result2put_result(result.ok())
.await

Check warning on line 371 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L371

Added line #L371 was not covered by tests
{
Ok(result) => transform_send_result2put_result(Some(result)),
Err(e) => {
error!("sendMessageInFailover to remote failed, {}", e);
PutMessageResult::new(PutMessageStatus::PutToRemoteBrokerFail, None, true)

Check warning on line 376 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L373-L376

Added lines #L373 - L376 were not covered by tests
}
}
} else {
warn!(
"Put message failed, enableSlaveActingMaster={}, enableRemoteEscape={}.",
self.broker_config.enable_slave_acting_master,
self.broker_config.enable_remote_escape

Check warning on line 383 in rocketmq-broker/src/failover/escape_bridge.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-broker/src/failover/escape_bridge.rs#L380-L383

Added lines #L380 - L383 were not covered by tests
);
PutMessageResult::new_default(PutMessageStatus::ServiceNotAvailable)
}
}
Expand Down Expand Up @@ -390,9 +410,10 @@ fn transform_send_result2put_result(send_result: Option<SendResult>) -> PutMessa
mod tests {
use rocketmq_client_rust::producer::send_result::SendResult;
use rocketmq_client_rust::producer::send_status::SendStatus;
use rocketmq_common::common::message::message_ext_broker_inner::MessageExtBrokerInner;
use rocketmq_common::common::message::MessageConst;

use super::*;

#[test]
fn transform_send_result2put_result_handles_none() {
let result = transform_send_result2put_result(None);
Expand Down

0 comments on commit 996e256

Please sign in to comment.