From ccbefb03219d3a8c7a9ee40cb552932f7a02b779 Mon Sep 17 00:00:00 2001 From: Lucas Kent Date: Mon, 28 Oct 2024 14:40:09 +1100 Subject: [PATCH] KafkaSinkCluster: Split OffsetFetch --- .../src/transforms/kafka/sink_cluster/mod.rs | 71 ++++++++++++++++--- .../transforms/kafka/sink_cluster/split.rs | 35 ++++++++- 2 files changed, 93 insertions(+), 13 deletions(-) diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 6ccbbb7fd..bbf29164d 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -19,6 +19,7 @@ use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseL use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; use kafka_protocol::messages::metadata_response::MetadataResponseBroker; +use kafka_protocol::messages::offset_fetch_request::OffsetFetchRequestGroup; use kafka_protocol::messages::offset_for_leader_epoch_request::OffsetForLeaderTopic; use kafka_protocol::messages::produce_request::TopicProduceData; use kafka_protocol::messages::produce_response::{ @@ -29,10 +30,10 @@ use kafka_protocol::messages::{ BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListOffsetsRequest, - ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetForLeaderEpochRequest, - OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader, - SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, - TopicName, TransactionalId, TxnOffsetCommitRequest, + ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest, + OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, + ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, + SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -48,8 +49,8 @@ use serde::{Deserialize, Serialize}; use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter, - ListOffsetsRequestSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter, - ProduceRequestSplitAndRouter, RequestSplitAndRouter, + ListOffsetsRequestSplitAndRouter, OffsetFetchSplitAndRouter, + OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::hash::Hasher; @@ -899,8 +900,9 @@ impl KafkaSinkCluster { body: RequestBody::OffsetFetch(offset_fetch), header, })) => { - let group_id = if header.request_api_version <= 7 { - offset_fetch.group_id.clone() + if header.request_api_version <= 7 { + let group_id = offset_fetch.group_id.clone(); + self.route_to_group_coordinator(message, group_id); } else { // This is possibly dangerous. // The client could construct a message which is valid for a specific shotover node, but not for any single kafka broker. @@ -908,9 +910,8 @@ impl KafkaSinkCluster { // and then reconstruct the response back into a single response // // For now just pick the first group as that is sufficient for the simple cases. - offset_fetch.groups.first().unwrap().group_id.clone() + self.split_and_route_request::(message)?; }; - self.route_to_group_coordinator(message, group_id); } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::OffsetCommit(offset_commit), @@ -1405,6 +1406,32 @@ impl KafkaSinkCluster { result } + /// This method removes all group ids from the DeleteGroups request and returns them split up by their destination. + /// If any topics are unroutable they will have their BrokerId set to -1 + fn split_offset_fetch_request_by_destination( + &mut self, + body: &mut OffsetFetchRequest, + ) -> HashMap> { + let mut result: HashMap> = Default::default(); + + for group in body.groups.drain(..) { + if let Some(destination) = self.group_to_coordinator_broker.get(&group.group_id) { + let dest_groups = result.entry(*destination).or_default(); + dest_groups.push(group); + } else { + tracing::warn!( + "no known coordinator for group {:?}, routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client", + group.group_id + ); + let destination = BrokerId(-1); + let dest_groups = result.entry(destination).or_default(); + dest_groups.push(group); + } + } + + result + } + /// This method removes all topics from the list offsets request and returns them split up by their destination /// If any topics are unroutable they will have their BrokerId set to -1 fn split_offset_for_leader_epoch_request_by_destination( @@ -1948,6 +1975,10 @@ impl KafkaSinkCluster { body: ResponseBody::DeleteGroups(base), .. })) => Self::combine_delete_groups_responses(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::OffsetFetch(base), + .. + })) => Self::combine_offset_fetch(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::AddPartitionsToTxn(base), version, @@ -2149,6 +2180,25 @@ impl KafkaSinkCluster { Ok(()) } + fn combine_offset_fetch( + base_offset_fetch: &mut OffsetFetchResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::OffsetFetch(next_offset_fetch), + .. + })) = next.frame() + { + base_offset_fetch + .groups + .extend(std::mem::take(&mut next_offset_fetch.groups)) + } + } + + Ok(()) + } + fn combine_add_partitions_to_txn( base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse, drain: impl Iterator, @@ -2357,6 +2407,7 @@ impl KafkaSinkCluster { body: ResponseBody::OffsetFetch(offset_fetch), .. })) => { + // TODO: suspicious self.handle_group_coordinator_routing_error(&request_ty, offset_fetch.error_code) } Some(Frame::Kafka(KafkaFrame::Response { diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index d5af7a6a5..b5f03d97d 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -8,9 +8,10 @@ use crate::{ }; use kafka_protocol::messages::{ add_partitions_to_txn_request::AddPartitionsToTxnTransaction, - list_offsets_request::ListOffsetsTopic, offset_for_leader_epoch_request::OffsetForLeaderTopic, - produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, - GroupId, ListOffsetsRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName, + list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup, + offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData, + AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListOffsetsRequest, + OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName, }; use std::collections::HashMap; @@ -164,3 +165,31 @@ impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter { request.groups_names = item; } } + +pub struct OffsetFetchSplitAndRouter; + +impl RequestSplitAndRouter for OffsetFetchSplitAndRouter { + type Request = OffsetFetchRequest; + type SubRequests = Vec; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_offset_fetch_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::OffsetFetch(request), + .. + })) => Some(request), + _ => None, + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.groups = item; + } +}