Skip to content

Commit

Permalink
KafkaSinkCluster: route ListGroups request (#1790)
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Oct 30, 2024
1 parent e3bac43 commit e09cd43
Show file tree
Hide file tree
Showing 6 changed files with 212 additions and 61 deletions.
6 changes: 3 additions & 3 deletions shotover-proxy/tests/kafka_int_tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ async fn cluster_1_rack_multi_shotover_with_1_shotover_down(#[case] driver: Kafk

// create a new connection and produce and consume messages
let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193");
test_cases::cluster_test_suite(&new_connection_builder).await;
test_cases::cluster_test_suite_with_lost_shotover_node(&new_connection_builder).await;

let mut expected_events = multi_shotover_events();
// Other shotover nodes should detect the killed node at least once
Expand Down Expand Up @@ -485,7 +485,7 @@ async fn cluster_3_racks_multi_shotover_with_2_shotover_down(#[case] driver: Kaf

// create a new connection and produce and consume messages
let new_connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9193");
test_cases::cluster_test_suite(&new_connection_builder).await;
test_cases::cluster_test_suite_with_lost_shotover_node(&new_connection_builder).await;

let mut expected_events = multi_shotover_events();
// The UP shotover node should detect the killed nodes at least once
Expand Down Expand Up @@ -537,7 +537,7 @@ async fn cluster_3_racks_multi_shotover_with_1_shotover_missing(#[case] driver:

// Send some produce and consume requests
let connection_builder = KafkaConnectionBuilder::new(driver, "localhost:9192");
test_cases::cluster_test_suite(&connection_builder).await;
test_cases::cluster_test_suite_with_lost_shotover_node(&connection_builder).await;

let mut expected_events = multi_shotover_events();
// Other shotover nodes should detect the missing node at least once
Expand Down
154 changes: 101 additions & 53 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ use std::{collections::HashMap, time::Duration};
use test_helpers::{
connection::kafka::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConfigEntry, ConsumerConfig,
ExpectedResponse, IsolationLevel, KafkaConnectionBuilder, KafkaConsumer, KafkaDriver,
KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata,
OffsetSpec, Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicPartition,
ExpectedResponse, IsolationLevel, KafkaAdmin, KafkaConnectionBuilder, KafkaConsumer,
KafkaDriver, KafkaProducer, ListOffsetsResultInfo, NewPartition, NewTopic,
OffsetAndMetadata, OffsetSpec, Record, ResourcePatternType, ResourceSpecifier,
ResourceType, TopicPartition,
},
docker_compose::DockerCompose,
};
Expand All @@ -25,11 +26,6 @@ async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
num_partitions: 1,
replication_factor: 1,
},
NewTopic {
name: "partitions3",
num_partitions: 3,
replication_factor: 1,
},
NewTopic {
name: "partitions3_case1",
num_partitions: 3,
Expand Down Expand Up @@ -1330,7 +1326,7 @@ async fn test_produce_consume_10_times(producer: &mut KafkaProducer, consumer: &
}
}

pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
async fn standard_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
admin_setup(connection_builder).await;
produce_consume_partitions1(connection_builder, "partitions1").await;
produce_consume_partitions1(connection_builder, "unknown_topic").await;
Expand Down Expand Up @@ -1367,78 +1363,104 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
.await;
produce_consume_partitions1(connection_builder, "partitions1").await;

let results = admin
.list_offsets(HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
OffsetSpec::Latest,
),
]))
.await;
list_offsets(&admin).await;
}

produce_consume_acks0(connection_builder).await;
admin_cleanup(connection_builder).await;
}

let expected = HashMap::from([
async fn list_offsets(admin: &KafkaAdmin) {
let results = admin
.list_offsets(HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 0 },
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
ListOffsetsResultInfo { offset: 0 },
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
ListOffsetsResultInfo { offset: 0 },
OffsetSpec::Earliest,
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 11 },
OffsetSpec::Latest,
),
]);
assert_eq!(results, expected);
}
]))
.await;

produce_consume_acks0(connection_builder).await;
admin_cleanup(connection_builder).await;
let expected = HashMap::from([
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 1,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions3_case3".to_owned(),
partition: 2,
},
ListOffsetsResultInfo { offset: 0 },
),
(
TopicPartition {
topic_name: "partitions1".to_owned(),
partition: 0,
},
ListOffsetsResultInfo { offset: 11 },
),
]);
assert_eq!(results, expected);
}

pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
standard_test_suite(connection_builder).await;
async fn list_groups(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
let mut consumer = connection_builder
.connect_consumer(
ConsumerConfig::consume_from_topics(vec!["partitions1".to_owned()])
.with_group("list_groups_test"),
)
.await;
consumer
.assert_consume(ExpectedResponse {
message: "initial".to_owned(),
key: Some("Key".to_owned()),
topic_name: "partitions1".to_owned(),
offset: Some(0),
})
.await;

let actual_results = admin.list_groups().await;
if !actual_results.contains(&"list_groups_test".to_owned()) {
panic!("Expected to find list_groups_test in {actual_results:?} but was misisng")
}
}

async fn cluster_test_suite_base(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
.create_topics_and_wait(&[
Expand All @@ -1458,6 +1480,32 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
produce_consume_partitions3(connection_builder, "partitions3_rf3", 1, 500).await;
}

pub async fn tests_requiring_all_shotover_nodes(connection_builder: &KafkaConnectionBuilder) {
// rdkafka-rs doesnt support these methods
#[allow(irrefutable_let_patterns)]
if let KafkaConnectionBuilder::Java(_) = connection_builder {
list_groups(connection_builder).await;
}
}

pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
standard_test_suite_base(connection_builder).await;
tests_requiring_all_shotover_nodes(connection_builder).await;
}

pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
standard_test_suite_base(connection_builder).await;
cluster_test_suite_base(connection_builder).await;
tests_requiring_all_shotover_nodes(connection_builder).await;
}

pub async fn cluster_test_suite_with_lost_shotover_node(
connection_builder: &KafkaConnectionBuilder,
) {
standard_test_suite_base(connection_builder).await;
cluster_test_suite_base(connection_builder).await;
}

pub async fn setup_basic_user_acls(connection: &KafkaConnectionBuilder, username: &str) {
let admin = connection.connect_admin().await;
admin
Expand Down
54 changes: 51 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ use kafka_protocol::messages::{
AddOffsetsToTxnRequest, AddPartitionsToTxnRequest, AddPartitionsToTxnResponse, ApiKey,
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListOffsetsRequest,
ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListGroupsResponse,
ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest,
ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse,
SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
Expand All @@ -48,7 +48,7 @@ use scram_over_mtls::{
use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, ListGroupsSplitAndRouter,
ListOffsetsRequestSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
Expand Down Expand Up @@ -986,6 +986,11 @@ impl KafkaSinkCluster {
..
})) => self.route_to_controller(request),

Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListGroups(_),
..
})) => self.split_and_route_request::<ListGroupsSplitAndRouter>(request)?,

// route to random broker
Some(Frame::Kafka(KafkaFrame::Request {
body:
Expand Down Expand Up @@ -1400,6 +1405,26 @@ impl KafkaSinkCluster {
result
}

fn split_request_by_routing_to_all_brokers(&mut self) -> HashMap<BrokerId, ()> {
let mut result: HashMap<BrokerId, ()> = Default::default();

for broker in self.nodes.iter().filter(|node| {
node.is_up()
&& node
.rack
.as_ref()
.map(|rack| rack == &self.rack)
// If the cluster is not using racks, include all brokers in the list.
// This ensure we get full coverage of the cluster.
// The client driver can filter out the resulting duplicates.
.unwrap_or(true)
}) {
result.insert(broker.broker_id, ());
}

result
}

/// This method removes all groups from the OffsetFetch request and returns them split up by their destination.
/// If any groups are unroutable they will have their BrokerId set to -1
fn split_offset_fetch_request_by_destination(
Expand Down Expand Up @@ -1973,6 +1998,10 @@ impl KafkaSinkCluster {
body: ResponseBody::OffsetFetch(base),
..
})) => Self::combine_offset_fetch(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListGroups(base),
..
})) => Self::combine_list_groups(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddPartitionsToTxn(base),
version,
Expand Down Expand Up @@ -2193,6 +2222,25 @@ impl KafkaSinkCluster {
Ok(())
}

fn combine_list_groups(
base_list_groups: &mut ListGroupsResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::ListGroups(next_list_groups),
..
})) = next.frame()
{
base_list_groups
.groups
.extend(std::mem::take(&mut next_list_groups.groups));
}
}

Ok(())
}

fn combine_add_partitions_to_txn(
base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse,
drain: impl Iterator<Item = Message>,
Expand Down
32 changes: 30 additions & 2 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use kafka_protocol::messages::{
add_partitions_to_txn_request::AddPartitionsToTxnTransaction,
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListOffsetsRequest,
OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListGroupsRequest,
ListOffsetsRequest, OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -166,6 +166,34 @@ impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter {
}
}

pub struct ListGroupsSplitAndRouter;

impl RequestSplitAndRouter for ListGroupsSplitAndRouter {
type Request = ListGroupsRequest;
type SubRequests = ();

fn split_by_destination(
transform: &mut KafkaSinkCluster,
_request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_request_by_routing_to_all_brokers()
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::ListGroups(request),
..
})) => Some(request),
_ => None,
}
}

fn reassemble(_request: &mut Self::Request, _item: Self::SubRequests) {
// No need to reassemble, each ListGroups is an exact clone of the original
}
}

pub struct OffsetFetchSplitAndRouter;

impl RequestSplitAndRouter for OffsetFetchSplitAndRouter {
Expand Down
Loading

0 comments on commit e09cd43

Please sign in to comment.