Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #1671]⚡️Optimize MQClientInstance#find_broker_address_in_subscribe method signature🔥 #1673

Merged
merged 1 commit into from
Dec 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@
let broker_id = self.recalculate_pull_from_which_node(mq);
let mut find_broker_result = self
.client_instance
.find_broker_address_in_subscribe(broker_name.as_str(), broker_id, false)
.find_broker_address_in_subscribe(&broker_name, broker_id, false)

Check warning on line 275 in rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs#L275

Added line #L275 was not covered by tests
.await;

if find_broker_result.is_none() {
Expand All @@ -286,11 +286,7 @@
let broker_id_again = self.recalculate_pull_from_which_node(mq);
find_broker_result = self
.client_instance
.find_broker_address_in_subscribe(
broker_name_again.as_str(),
broker_id_again,
false,
)
.find_broker_address_in_subscribe(&broker_name_again, broker_id_again, false)

Check warning on line 289 in rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/pull_api_wrapper.rs#L289

Added line #L289 was not covered by tests
.await;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@
let client = self.client_instance.as_mut().unwrap();
let broker_name = client.get_broker_name_from_message_queue(mq).await;
let find_broker_result = client
.find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true)
.find_broker_address_in_subscribe(&broker_name, mix_all::MASTER_ID, true)

Check warning on line 848 in rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs#L848

Added line #L848 was not covered by tests
.await;
if let Some(find_broker_result) = find_broker_result {
let mut request_body = LockBatchRequestBody {
Expand Down Expand Up @@ -900,11 +900,7 @@
}
let client = client_instance.as_mut().unwrap();
let find_broker_result = client
.find_broker_address_in_subscribe(
broker_name.as_str(),
mix_all::MASTER_ID,
true,
)
.find_broker_address_in_subscribe(&broker_name, mix_all::MASTER_ID, true)

Check warning on line 903 in rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs#L903

Added line #L903 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Add unit tests for broker locking in RebalanceImpl

The broker locking mechanism in RebalanceImpl lacks test coverage. While there are tests for the broker-side locking manager (RebalanceLockManager), the consumer-side implementation in rebalance_impl.rs needs test coverage for:

  • Lock acquisition through find_broker_address_in_subscribe
  • Concurrent access scenarios
  • Lock expiration handling
🔗 Analysis chain

Add test coverage for broker locking mechanism.

This broker locking logic is critical for message queue operations but lacks test coverage. Additionally, verify that the locking mechanism properly handles concurrent access scenarios.


Let me check the actual test coverage by examining the test implementations.


Let me check one more thing to verify if there are any integration tests or related test files for the consumer implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check existing test coverage for broker locking
rg -l "lock_all.*test"

Length of output: 87


Script:

#!/bin/bash
# Check the actual test implementations for lock_all
rg "lock_all" rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs -B 2 -A 5

# Check if there are any tests in the consumer implementation
ast-grep --pattern 'mod tests {
  $$$
}'  rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs

Length of output: 1487


Script:

#!/bin/bash
# Find any test files related to rebalance implementation in the consumer
fd "test.*rebalance" rocketmq-client/src/consumer/
fd "rebalance.*test" rocketmq-client/src/consumer/

# Check for any test patterns in the rebalance implementation file
rg "test|#\[test\]" rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs

Length of output: 199

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 903-903: rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs#L903
Added line #L903 was not covered by tests

.await;
if let Some(find_broker_result) = find_broker_result {
let request_body = LockBatchRequestBody {
Expand Down Expand Up @@ -1018,7 +1014,7 @@
}
let client = self.client_instance.as_mut().unwrap();
let find_broker_result = client
.find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true)
.find_broker_address_in_subscribe(&broker_name, mix_all::MASTER_ID, true)

Check warning on line 1017 in rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs#L1017

Added line #L1017 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Add test coverage for broker unlocking mechanism

The verification confirms that there is no test coverage for the unlock mechanism in both rebalance_impl.rs and rebalance_push_impl.rs. The code includes critical unlocking logic with error handling, logging, and state management that should be tested:

  • unlock_all implementation in rebalance_impl.rs handles broker-wide unlocking
  • unlock method in rebalance_push_impl.rs manages individual message queue unlocking
  • Complex state transitions including force unlocking, lock acquisition failures, and batch operations

Add test cases covering:

  • Normal unlock flow with successful broker response
  • Error handling for broker communication failures
  • Force unlock scenarios with timeout conditions
  • Lock acquisition failures and retry behavior
  • Batch unlock operations across multiple message queues
🔗 Analysis chain

Add test coverage for broker unlocking mechanism.

The broker unlocking logic lacks test coverage. Ensure proper testing of unlock scenarios, especially error cases and potential deadlock situations.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check existing test coverage for broker unlocking
rg -l "unlock_all.*test"

Length of output: 26


Script:

#!/bin/bash
# Search for test files and unlock patterns more broadly
rg -l "test" rocketmq-client/src/consumer/consumer_impl/re_balance/
rg "unlock" rocketmq-client/src/consumer/consumer_impl/re_balance/ -C 2

Length of output: 9476


Script:

#!/bin/bash
# Look for test modules and unlock_all usage
ast-grep --pattern 'mod test { $$$ }'  rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs
ast-grep --pattern 'fn unlock_all($$$) { $$$ }' rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs

Length of output: 233

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 1017-1017: rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_impl.rs#L1017
Added line #L1017 was not covered by tests

.await;
if let Some(find_broker_result) = find_broker_result {
let request_body = UnlockBatchRequestBody {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@
};
let broker_name = client.get_broker_name_from_message_queue(mq).await;
let find_broker_result = client
.find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true)
.find_broker_address_in_subscribe(&broker_name, mix_all::MASTER_ID, true)

Check warning on line 444 in rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs#L444

Added line #L444 was not covered by tests
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Critical unlock functionality lacks test coverage in consumer implementation

The unlock functionality in rebalance_push_impl.rs is not covered by unit tests, while only the broker's lock manager has test coverage. This is concerning because:

  • The unlock method handles critical message queue management
  • The implementation contains error handling for client instance availability
  • There's also an unimplemented unlock_all function marked as todo!()

Action items:

  • Add unit tests for the unlock method in rebalance_push_impl.rs
  • Implement the unlock_all method which is currently marked as todo!()
  • Consider adding integration tests for the consumer-broker unlock interaction
🔗 Analysis chain

Method signature optimization is consistent but critical path needs tests.

The change aligns with the optimization pattern of passing broker names as references. However, this is in the unlock method which is critical for message queue management.

Let's verify the test coverage for the unlock functionality:


Let me gather more information about the unlock functionality and its test coverage.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check test coverage for unlock functionality
# Look for existing tests
rg -l "unlock.*test" 

# Check test patterns
ast-grep --pattern 'fn test_$$$unlock$$$'

Length of output: 129


Script:

#!/bin/bash
# Check the implementation and usage of unlock in rebalance_push_impl.rs
rg -A 5 "fn.*unlock" rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs

# Check test modules in rebalance_push_impl.rs
rg -A 5 "mod test" rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs

# Look for any test files related to rebalance push
fd "rebalance.*test" rocketmq-client/src/consumer/

# Check if unlock is tested in the broker's test file we found earlier
rg -A 5 "unlock.*test" rocketmq-broker/src/client/rebalance/rebalance_lock_manager.rs

Length of output: 1226

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 444-444: rocketmq-client/src/consumer/consumer_impl/re_balance/rebalance_push_impl.rs#L444
Added line #L444 was not covered by tests

.await;
if let Some(find_broker_result) = find_broker_result {
let mut request_body = UnlockBatchRequestBody {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
let mut find_broker_result = self
.client_instance
.mut_from_ref()
.find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, true)
.find_broker_address_in_subscribe(&broker_name, mix_all::MASTER_ID, true)

Check warning on line 65 in rocketmq-client/src/consumer/store/remote_broker_offset_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/store/remote_broker_offset_store.rs#L65

Added line #L65 was not covered by tests
.await;

if find_broker_result.is_none() {
Expand All @@ -77,7 +77,7 @@
find_broker_result = self
.client_instance
.mut_from_ref()
.find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, false)
.find_broker_address_in_subscribe(&broker_name, mix_all::MASTER_ID, false)

Check warning on line 80 in rocketmq-client/src/consumer/store/remote_broker_offset_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/store/remote_broker_offset_store.rs#L80

Added line #L80 was not covered by tests
.await;
}
if let Some(find_broker_result) = find_broker_result {
Expand Down Expand Up @@ -261,7 +261,7 @@
.await;
let mut find_broker_result = self
.client_instance
.find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, false)
.find_broker_address_in_subscribe(&broker_name, mix_all::MASTER_ID, false)

Check warning on line 264 in rocketmq-client/src/consumer/store/remote_broker_offset_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/store/remote_broker_offset_store.rs#L264

Added line #L264 was not covered by tests
.await;

if find_broker_result.is_none() {
Expand All @@ -274,7 +274,7 @@
.await;
find_broker_result = self
.client_instance
.find_broker_address_in_subscribe(broker_name.as_str(), mix_all::MASTER_ID, false)
.find_broker_address_in_subscribe(&broker_name, mix_all::MASTER_ID, false)

Check warning on line 277 in rocketmq-client/src/consumer/store/remote_broker_offset_store.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/consumer/store/remote_broker_offset_store.rs#L277

Added line #L277 was not covered by tests
.await;
}

Expand Down
2 changes: 1 addition & 1 deletion rocketmq-client/src/factory/mq_client_instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1035,7 +1035,7 @@

pub async fn find_broker_address_in_subscribe(
&mut self,
broker_name: &str,
broker_name: &CheetahString,

Check warning on line 1038 in rocketmq-client/src/factory/mq_client_instance.rs

View check run for this annotation

Codecov / codecov/patch

rocketmq-client/src/factory/mq_client_instance.rs#L1038

Added line #L1038 was not covered by tests
broker_id: u64,
only_this_broker: bool,
) -> Option<FindBrokerResult> {
Expand Down
Loading