diff --git a/shotover/src/transforms/kafka/sink_cluster/mod.rs b/shotover/src/transforms/kafka/sink_cluster/mod.rs index 118e2edfc..a7cc95800 100644 --- a/shotover/src/transforms/kafka/sink_cluster/mod.rs +++ b/shotover/src/transforms/kafka/sink_cluster/mod.rs @@ -19,6 +19,7 @@ use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseL use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic; use kafka_protocol::messages::metadata_request::MetadataRequestTopic; use kafka_protocol::messages::metadata_response::MetadataResponseBroker; +use kafka_protocol::messages::offset_for_leader_epoch_request::OffsetForLeaderTopic; use kafka_protocol::messages::produce_request::TopicProduceData; use kafka_protocol::messages::produce_response::{ LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch, TopicProduceResponse, @@ -28,9 +29,9 @@ use kafka_protocol::messages::{ BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse, - ProduceRequest, ProduceResponse, RequestHeader, SaslAuthenticateRequest, - SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, - TxnOffsetCommitRequest, + OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, + RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, + SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest, }; use kafka_protocol::protocol::StrBytes; use kafka_protocol::ResponseError; @@ -46,7 +47,7 @@ use serde::{Deserialize, Serialize}; use shotover_node::{ShotoverNode, ShotoverNodeConfig}; use split::{ AddPartitionsToTxnRequestSplitAndRouter, ListOffsetsRequestSplitAndRouter, - ProduceRequestSplitAndRouter, RequestSplitAndRouter, + OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter, }; use std::collections::{HashMap, HashSet, VecDeque}; use std::hash::Hasher; @@ -675,6 +676,14 @@ impl KafkaSinkCluster { self.store_topic_names(&mut topic_names, topic.name.clone()); } } + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::OffsetForLeaderEpoch(body), + .. + })) => { + for topic in &body.topics { + self.store_topic_names(&mut topic_names, topic.topic.clone()); + } + } Some(Frame::Kafka(KafkaFrame::Request { body: RequestBody::Fetch(fetch), .. @@ -852,6 +861,12 @@ impl KafkaSinkCluster { body: RequestBody::ListOffsets(_), .. })) => self.split_and_route_request::(message)?, + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::OffsetForLeaderEpoch(_), + .. + })) => self.split_and_route_request::( + message, + )?, // route to group coordinator Some(Frame::Kafka(KafkaFrame::Request { @@ -980,7 +995,10 @@ impl KafkaSinkCluster { // error handling Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => { - let request_type = ApiKey::try_from(header.request_api_key).unwrap(); + let request_type = + format!("{:?}", ApiKey::try_from(header.request_api_key).unwrap()); + // remove Key postfix it makes it sound its part of the message type name which is confusing. + let request_type = request_type.trim_end_matches("Key"); tracing::warn!("Routing for request of type {request_type:?} has not been implemented yet."); self.route_to_random_broker(message) } @@ -1353,6 +1371,59 @@ impl KafkaSinkCluster { result } + /// This method removes all topics from the list offsets request and returns them split up by their destination + /// If any topics are unroutable they will have their BrokerId set to -1 + fn split_offset_for_leader_epoch_request_by_destination( + &mut self, + body: &mut OffsetForLeaderEpochRequest, + ) -> HashMap> { + let mut result: HashMap> = Default::default(); + + for mut topic in body.topics.drain(..) { + let topic_name = &topic.topic; + if let Some(topic_meta) = self.topic_by_name.get(topic_name) { + for partition in std::mem::take(&mut topic.partitions) { + let partition_index = partition.partition as usize; + let destination = if let Some(partition) = + topic_meta.partitions.get(partition_index) + { + if partition.leader_id == -1 { + tracing::warn!( + "leader_id is unknown for {topic_name:?} at partition index {partition_index}", + ); + } + partition.leader_id + } else { + let partition_len = topic_meta.partitions.len(); + tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + BrokerId(-1) + }; + tracing::debug!( + "Routing OffsetForLeaderEpoch request portion of partition {partition_index} in {topic_name:?} to broker {}", + destination.0 + ); + let dest_topics = result.entry(destination).or_default(); + if let Some(dest_topic) = + dest_topics.iter_mut().find(|x| x.topic == topic.topic) + { + dest_topic.partitions.push(partition); + } else { + let mut topic = topic.clone(); + topic.partitions.push(partition); + dest_topics.push(topic); + } + } + } else { + tracing::warn!("no known partition replica for {topic_name:?}, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client"); + let destination = BrokerId(-1); + let dest_topics = result.entry(destination).or_default(); + dest_topics.push(topic); + } + } + + result + } + /// This method removes all transactions from the AddPartitionsToTxn request and returns them split up by their destination /// If any topics are unroutable they will have their BrokerId set to -1 fn split_add_partition_to_txn_request_by_destination( @@ -1831,6 +1902,10 @@ impl KafkaSinkCluster { body: ResponseBody::ListOffsets(base), .. })) => Self::combine_list_offsets_responses(base, drain)?, + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::OffsetForLeaderEpoch(base), + .. + })) => Self::combine_offset_for_leader_epoch_responses(base, drain)?, Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Produce(base), .. @@ -1926,7 +2001,46 @@ impl KafkaSinkCluster { } } else { return Err(anyhow!( - "Combining ListOffests responses but received another message type" + "Combining ListOffsets responses but received another message type" + )); + } + } + + Ok(()) + } + + fn combine_offset_for_leader_epoch_responses( + base_list_offsets: &mut OffsetForLeaderEpochResponse, + drain: impl Iterator, + ) -> Result<()> { + for mut next in drain { + if let Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::OffsetForLeaderEpoch(next_body), + .. + })) = next.frame() + { + for next_topic in std::mem::take(&mut next_body.topics) { + if let Some(base_topic) = base_list_offsets + .topics + .iter_mut() + .find(|topic| topic.topic == next_topic.topic) + { + for next_partition in &next_topic.partitions { + for base_partition in &base_topic.partitions { + if next_partition.partition == base_partition.partition { + tracing::warn!("Duplicate partition indexes in combined OffsetForLeaderEpoch response, if this ever occurs we should investigate the repercussions") + } + } + } + // A partition can only be contained in one response so there is no risk of duplicating partitions + base_topic.partitions.extend(next_topic.partitions) + } else { + base_list_offsets.topics.push(next_topic); + } + } + } else { + return Err(anyhow!( + "Combining OffsetForLeaderEpoch responses but received another message type" )); } } @@ -2155,6 +2269,25 @@ impl KafkaSinkCluster { } } } + Some(Frame::Kafka(KafkaFrame::Response { + body: ResponseBody::OffsetForLeaderEpoch(body), + .. + })) => { + for topic in &mut body.topics { + for partition in &mut topic.partitions { + if let Some(ResponseError::NotLeaderOrFollower) = + ResponseError::try_from_code(partition.error_code) + { + self.topic_by_name.remove(&topic.topic); + tracing::info!( + "OffsetForLeaderEpoch response included error NOT_LEADER_OR_FOLLOWER and so cleared metadata for topic {:?}", + topic.topic, + ); + break; + } + } + } + } Some(Frame::Kafka(KafkaFrame::Response { body: ResponseBody::Heartbeat(heartbeat), .. diff --git a/shotover/src/transforms/kafka/sink_cluster/split.rs b/shotover/src/transforms/kafka/sink_cluster/split.rs index 9762a64f8..658dbfc4a 100644 --- a/shotover/src/transforms/kafka/sink_cluster/split.rs +++ b/shotover/src/transforms/kafka/sink_cluster/split.rs @@ -8,8 +8,9 @@ use crate::{ }; use kafka_protocol::messages::{ add_partitions_to_txn_request::AddPartitionsToTxnTransaction, - list_offsets_request::ListOffsetsTopic, produce_request::TopicProduceData, - AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest, ProduceRequest, TopicName, + list_offsets_request::ListOffsetsTopic, offset_for_leader_epoch_request::OffsetForLeaderTopic, + produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest, + OffsetForLeaderEpochRequest, ProduceRequest, TopicName, }; use std::collections::HashMap; @@ -107,3 +108,31 @@ impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter { request.topics = item; } } + +pub struct OffsetForLeaderEpochRequestSplitAndRouter; + +impl RequestSplitAndRouter for OffsetForLeaderEpochRequestSplitAndRouter { + type Request = OffsetForLeaderEpochRequest; + type SubRequests = Vec; + + fn split_by_destination( + transform: &mut KafkaSinkCluster, + request: &mut Self::Request, + ) -> HashMap { + transform.split_offset_for_leader_epoch_request_by_destination(request) + } + + fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> { + match request.frame() { + Some(Frame::Kafka(KafkaFrame::Request { + body: RequestBody::OffsetForLeaderEpoch(request), + .. + })) => Some(request), + _ => None, + } + } + + fn reassemble(request: &mut Self::Request, item: Self::SubRequests) { + request.topics = item; + } +}