Skip to content

Commit

Permalink
[ISSUE #1230]🚀Support broker receive transaction message-4
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm committed Nov 19, 2024
1 parent c2281c4 commit de83e75
Show file tree
Hide file tree
Showing 10 changed files with 402 additions and 4 deletions.
13 changes: 11 additions & 2 deletions rocketmq-broker/src/broker_runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use crate::broker::broker_hook::BrokerShutdownHook;
use crate::client::default_consumer_ids_change_listener::DefaultConsumerIdsChangeListener;
use crate::client::manager::consumer_manager::ConsumerManager;
use crate::client::manager::producer_manager::ProducerManager;
use crate::client::net::broker_to_client::Broker2Client;
use crate::client::rebalance::rebalance_lock_manager::RebalanceLockManager;
use crate::filter::manager::consumer_filter_manager::ConsumerFilterManager;
use crate::hook::batch_check_before_put_message::BatchCheckBeforePutMessageHook;
Expand Down Expand Up @@ -126,7 +127,9 @@ pub(crate) struct BrokerRuntime {
#[cfg(feature = "local_file_store")]
transactional_message_service:
Option<ArcMut<DefaultTransactionalMessageService<DefaultMessageStore>>>,
transactional_message_check_listener: Option<Arc<DefaultTransactionalMessageCheckListener>>,
#[cfg(feature = "local_file_store")]
transactional_message_check_listener:
Option<Arc<DefaultTransactionalMessageCheckListener<DefaultMessageStore>>>,
transactional_message_check_service: Option<Arc<TransactionalMessageCheckService>>,
transaction_metrics_flush_service: Option<Arc<TransactionMetricsFlushService>>,
}
Expand Down Expand Up @@ -689,7 +692,13 @@ impl BrokerRuntime {
}
}
self.transactional_message_check_listener =
Some(Arc::new(DefaultTransactionalMessageCheckListener));
Some(Arc::new(DefaultTransactionalMessageCheckListener::new(
self.broker_config.clone(),
self.producer_manager.clone(),
Broker2Client,
self.topic_config_manager.clone(),
self.message_store.as_ref().cloned().unwrap(),
)));
self.transactional_message_check_service = Some(Arc::new(TransactionalMessageCheckService));
self.transaction_metrics_flush_service = Some(Arc::new(TransactionMetricsFlushService));
}
Expand Down
23 changes: 23 additions & 0 deletions rocketmq-broker/src/client/manager/producer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/

use std::collections::HashMap;
use std::sync::atomic::AtomicI32;
use std::sync::Arc;

use cheetah_string::CheetahString;
use rocketmq_common::TimeUtils::get_current_millis;
Expand All @@ -31,13 +33,15 @@ pub struct ProducerManager {
HashMap<CheetahString /* group name */, HashMap<Channel, ClientChannelInfo>>,
>,
client_channel_table: parking_lot::Mutex<HashMap<CheetahString, Channel /* client ip:port */>>,
positive_atomic_counter: Arc<AtomicI32>,
}

impl ProducerManager {
pub fn new() -> Self {
Self {
group_channel_table: parking_lot::Mutex::new(HashMap::new()),
client_channel_table: parking_lot::Mutex::new(HashMap::new()),
positive_atomic_counter: Arc::new(Default::default()),
}
}
}
Expand Down Expand Up @@ -116,4 +120,23 @@ impl ProducerManager {
pub fn find_channel(&self, client_id: &str) -> Option<Channel> {
self.client_channel_table.lock().get(client_id).cloned()
}

pub fn get_available_channel(&self, group: Option<&CheetahString>) -> Option<Channel> {
let group = group?;
let group_channel_table = self.group_channel_table.lock();
let channel_map = group_channel_table.get(group);
if let Some(channel_map) = channel_map {
if channel_map.is_empty() {
return None;
}
let channels = channel_map.keys().collect::<Vec<&Channel>>();
let index = self
.positive_atomic_counter
.fetch_add(1, std::sync::atomic::Ordering::AcqRel);
let index = index.unsigned_abs() as usize % channels.len();
let channel = channels[index].clone();
return Some(channel);
}
None
}
}
31 changes: 31 additions & 0 deletions rocketmq-broker/src/client/net/broker_to_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use cheetah_string::CheetahString;
use rocketmq_common::common::message::message_ext::MessageExt;
use rocketmq_common::MessageDecoder;
use rocketmq_remoting::code::request_code::RequestCode;
use rocketmq_remoting::net::channel::Channel;
use rocketmq_remoting::protocol::header::check_transaction_state_request_header::CheckTransactionStateRequestHeader;
use rocketmq_remoting::protocol::remoting_command::RemotingCommand;

use crate::error::BrokerError::BrokerClientError;
use crate::error::BrokerError::BrokerCommonError;
use crate::Result;

#[derive(Default, Clone)]
Expand All @@ -35,4 +41,29 @@ impl Broker2Client {
Err(e) => Err(BrokerClientError(e)),
}
}

pub async fn check_producer_transaction_state(
&self,
_group: &CheetahString,
channel: &mut Channel,
request_header: CheckTransactionStateRequestHeader,
message_ext: MessageExt,
) -> Result<()> {
let mut request = RemotingCommand::create_request_command(
RequestCode::CheckTransactionState,
request_header,
);
match MessageDecoder::encode(&message_ext, false) {
Ok(body) => {
request.set_body_mut_ref(body);
}
Err(e) => {
return Err(BrokerCommonError(e));
}
}
match channel.send_one_way(request, 100).await {
Ok(_) => Ok(()),
Err(e) => Err(BrokerClientError(e)),
}
}
}
4 changes: 4 additions & 0 deletions rocketmq-broker/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@
*/
use thiserror::Error;

#[allow(clippy::enum_variant_names)]
#[derive(Debug, Error)]
pub enum BrokerError {
#[error("broker client error: {0}")]
BrokerClientError(#[from] rocketmq_remoting::error::Error),

#[error("Common error: {0}")]
BrokerCommonError(#[from] rocketmq_common::error::Error),

#[error("Client exception occurred: CODE:{0}, broker address:{2}, Message:{1}")]
MQBrokerError(i32, String, String),
}
1 change: 1 addition & 0 deletions rocketmq-broker/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,6 @@ pub(crate) mod operation_result;
pub(crate) mod queue;
pub(crate) mod transaction_metrics;
pub(crate) mod transaction_metrics_flush_service;
pub(crate) mod transactional_message_check_listener;
pub(crate) mod transactional_message_check_service;
pub(crate) mod transactional_message_service;
Loading

0 comments on commit de83e75

Please sign in to comment.