Skip to content

Commit

Permalink
[ISSUE #2230]💫Implement BrokerOuterAPI#unregisterBrokerAll🧑‍💻 (#2231)
Browse files Browse the repository at this point in the history
  • Loading branch information
mxsm authored Jan 14, 2025
1 parent 9d3e451 commit 1807c33
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions rocketmq-broker/src/out_api/broker_outer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use rocketmq_remoting::protocol::header::lock_batch_mq_request_header::LockBatch
use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header::SendMessageRequestHeader;
use rocketmq_remoting::protocol::header::message_operation_header::send_message_request_header_v2::SendMessageRequestHeaderV2;
use rocketmq_remoting::protocol::header::message_operation_header::send_message_response_header::SendMessageResponseHeader;
use rocketmq_remoting::protocol::header::namesrv::broker_request::UnRegisterBrokerRequestHeader;
use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerRequestHeader;
use rocketmq_remoting::protocol::header::namesrv::register_broker_header::RegisterBrokerResponseHeader;
use rocketmq_remoting::protocol::header::namesrv::topic_operation_header::RegisterTopicRequestHeader;
Expand Down Expand Up @@ -548,6 +549,72 @@ impl BrokerOuterAPI {
Err(e) => Ok((None, e.to_string(), true)),
}
}

pub async fn unregister_broker_all(
&self,
cluster_name: &CheetahString,
broker_name: &CheetahString,
broker_addr: &CheetahString,
broker_id: u64,
) {
let name_server_address_list = self.remoting_client.get_name_server_address_list();
for namesrv_addr in name_server_address_list.iter() {
match self
.unregister_broker(
namesrv_addr,
cluster_name,
broker_addr,
broker_name,
broker_id,
)
.await
{
Ok(_) => {
info!(
"Unregister broker from name remoting_server success, namesrv_addr={}",
namesrv_addr
);
}
Err(e) => {
error!(
"Unregister broker from name remoting_server error, namesrv_addr={}, \
error={}",
namesrv_addr, e
);
}
}
}
}
pub async fn unregister_broker(
&self,
namesrv_addr: &CheetahString,
cluster_name: &CheetahString,
broker_addr: &CheetahString,
broker_name: &CheetahString,
broker_id: u64,
) -> Result<()> {
let request_header = UnRegisterBrokerRequestHeader {
broker_name: broker_name.clone(),
broker_addr: broker_addr.clone(),
cluster_name: cluster_name.clone(),
broker_id,
};
let request =
RemotingCommand::create_request_command(RequestCode::UnregisterBroker, request_header);
let response = self
.remoting_client
.invoke_async(Some(namesrv_addr), request, 3000)
.await?;
if ResponseCode::from(response.code()) == ResponseCode::Success {
Ok(())
} else {
Err(BrokerError::MQBrokerError(
response.code(),
response.remark().map_or("".to_string(), |s| s.to_string()),
broker_addr.to_string(),
))
}
}
}

fn process_pull_result(
Expand Down

0 comments on commit 1807c33

Please sign in to comment.