Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

warn when shotover doesnt know how to route a request #1779

Merged
merged 2 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
168 changes: 162 additions & 6 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseL
use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic;
use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
use kafka_protocol::messages::metadata_response::MetadataResponseBroker;
use kafka_protocol::messages::offset_for_leader_epoch_request::OffsetForLeaderTopic;
use kafka_protocol::messages::produce_request::TopicProduceData;
use kafka_protocol::messages::produce_response::{
LeaderIdAndEpoch as ProduceResponseLeaderIdAndEpoch, TopicProduceResponse,
Expand All @@ -28,9 +29,9 @@ use kafka_protocol::messages::{
BrokerId, EndTxnRequest, FetchRequest, FetchResponse, FindCoordinatorRequest,
FindCoordinatorResponse, GroupId, HeartbeatRequest, InitProducerIdRequest, JoinGroupRequest,
LeaveGroupRequest, ListOffsetsRequest, ListOffsetsResponse, MetadataRequest, MetadataResponse,
ProduceRequest, ProduceResponse, RequestHeader, SaslAuthenticateRequest,
SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId,
TxnOffsetCommitRequest,
OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse,
RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest,
SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -46,7 +47,7 @@ use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, ListOffsetsRequestSplitAndRouter,
ProduceRequestSplitAndRouter, RequestSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::Hasher;
Expand Down Expand Up @@ -675,6 +676,14 @@ impl KafkaSinkCluster {
self.store_topic_names(&mut topic_names, topic.name.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetForLeaderEpoch(body),
..
})) => {
for topic in &body.topics {
self.store_topic_names(&mut topic_names, topic.topic.clone());
}
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::Fetch(fetch),
..
Expand Down Expand Up @@ -852,6 +861,12 @@ impl KafkaSinkCluster {
body: RequestBody::ListOffsets(_),
..
})) => self.split_and_route_request::<ListOffsetsRequestSplitAndRouter>(message)?,
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetForLeaderEpoch(_),
..
})) => self.split_and_route_request::<OffsetForLeaderEpochRequestSplitAndRouter>(
message,
)?,

// route to group coordinator
Some(Frame::Kafka(KafkaFrame::Request {
Expand Down Expand Up @@ -965,7 +980,33 @@ impl KafkaSinkCluster {
body: RequestBody::CreateTopics(_),
..
})) => self.route_to_controller(message),
_ => self.route_to_random_broker(message),

// route to random broker
Some(Frame::Kafka(KafkaFrame::Request {
body:
RequestBody::Metadata(_)
| RequestBody::DescribeConfigs(_)
| RequestBody::AlterConfigs(_)
| RequestBody::CreatePartitions(_)
| RequestBody::DeleteTopics(_)
| RequestBody::CreateAcls(_),
..
})) => self.route_to_random_broker(message),

// error handling
Some(Frame::Kafka(KafkaFrame::Request { header, .. })) => {
let request_type =
format!("{:?}", ApiKey::try_from(header.request_api_key).unwrap());
// remove Key postfix, since its not part of the actual message name which is confusing.
let request_type = request_type.trim_end_matches("Key");
tracing::warn!("Routing for request of type {request_type:?} has not been implemented yet.");
self.route_to_random_broker(message)
}
Some(_) => unreachable!("Must be a kafka request"),
None => {
tracing::warn!("Unable to parse request, routing to a random node");
self.route_to_random_broker(message)
}
}
}
Ok(())
Expand Down Expand Up @@ -1330,6 +1371,59 @@ impl KafkaSinkCluster {
result
}

/// This method removes all topics from the list offsets request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_offset_for_leader_epoch_request_by_destination(
&mut self,
body: &mut OffsetForLeaderEpochRequest,
) -> HashMap<BrokerId, Vec<OffsetForLeaderTopic>> {
let mut result: HashMap<BrokerId, Vec<OffsetForLeaderTopic>> = Default::default();

for mut topic in body.topics.drain(..) {
let topic_name = &topic.topic;
if let Some(topic_meta) = self.topic_by_name.get(topic_name) {
for partition in std::mem::take(&mut topic.partitions) {
let partition_index = partition.partition as usize;
let destination = if let Some(partition) =
topic_meta.partitions.get(partition_index)
{
if partition.leader_id == -1 {
tracing::warn!(
"leader_id is unknown for {topic_name:?} at partition index {partition_index}",
);
}
partition.leader_id
} else {
let partition_len = topic_meta.partitions.len();
tracing::warn!("no known partition for {topic_name:?} at partition index {partition_index} out of {partition_len} partitions, routing request to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
BrokerId(-1)
};
tracing::debug!(
"Routing OffsetForLeaderEpoch request portion of partition {partition_index} in {topic_name:?} to broker {}",
destination.0
);
let dest_topics = result.entry(destination).or_default();
if let Some(dest_topic) =
dest_topics.iter_mut().find(|x| x.topic == topic.topic)
{
dest_topic.partitions.push(partition);
} else {
let mut topic = topic.clone();
topic.partitions.push(partition);
dest_topics.push(topic);
}
}
} else {
tracing::warn!("no known partition replica for {topic_name:?}, routing message to a random broker so that a NOT_LEADER_OR_FOLLOWER or similar error is returned to the client");
let destination = BrokerId(-1);
let dest_topics = result.entry(destination).or_default();
dest_topics.push(topic);
}
}

result
}

/// This method removes all transactions from the AddPartitionsToTxn request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_add_partition_to_txn_request_by_destination(
Expand Down Expand Up @@ -1808,6 +1902,10 @@ impl KafkaSinkCluster {
body: ResponseBody::ListOffsets(base),
..
})) => Self::combine_list_offsets_responses(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetForLeaderEpoch(base),
..
})) => Self::combine_offset_for_leader_epoch_responses(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Produce(base),
..
Expand Down Expand Up @@ -1903,7 +2001,46 @@ impl KafkaSinkCluster {
}
} else {
return Err(anyhow!(
"Combining ListOffests responses but received another message type"
"Combining ListOffsets responses but received another message type"
));
}
}

Ok(())
}

fn combine_offset_for_leader_epoch_responses(
base_list_offsets: &mut OffsetForLeaderEpochResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetForLeaderEpoch(next_body),
..
})) = next.frame()
{
for next_topic in std::mem::take(&mut next_body.topics) {
if let Some(base_topic) = base_list_offsets
.topics
.iter_mut()
.find(|topic| topic.topic == next_topic.topic)
{
for next_partition in &next_topic.partitions {
for base_partition in &base_topic.partitions {
if next_partition.partition == base_partition.partition {
tracing::warn!("Duplicate partition indexes in combined OffsetForLeaderEpoch response, if this ever occurs we should investigate the repercussions")
}
}
}
// A partition can only be contained in one response so there is no risk of duplicating partitions
base_topic.partitions.extend(next_topic.partitions)
} else {
base_list_offsets.topics.push(next_topic);
}
}
} else {
return Err(anyhow!(
"Combining OffsetForLeaderEpoch responses but received another message type"
));
}
}
Expand Down Expand Up @@ -2132,6 +2269,25 @@ impl KafkaSinkCluster {
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetForLeaderEpoch(body),
..
})) => {
for topic in &mut body.topics {
for partition in &mut topic.partitions {
if let Some(ResponseError::NotLeaderOrFollower) =
ResponseError::try_from_code(partition.error_code)
{
self.topic_by_name.remove(&topic.topic);
tracing::info!(
"OffsetForLeaderEpoch response included error NOT_LEADER_OR_FOLLOWER and so cleared metadata for topic {:?}",
topic.topic,
);
break;
}
}
}
}
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::Heartbeat(heartbeat),
..
Expand Down
33 changes: 31 additions & 2 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ use crate::{
};
use kafka_protocol::messages::{
add_partitions_to_txn_request::AddPartitionsToTxnTransaction,
list_offsets_request::ListOffsetsTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest, ProduceRequest, TopicName,
list_offsets_request::ListOffsetsTopic, offset_for_leader_epoch_request::OffsetForLeaderTopic,
produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, ListOffsetsRequest,
OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -107,3 +108,31 @@ impl RequestSplitAndRouter for ListOffsetsRequestSplitAndRouter {
request.topics = item;
}
}

pub struct OffsetForLeaderEpochRequestSplitAndRouter;

impl RequestSplitAndRouter for OffsetForLeaderEpochRequestSplitAndRouter {
type Request = OffsetForLeaderEpochRequest;
type SubRequests = Vec<OffsetForLeaderTopic>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_offset_for_leader_epoch_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetForLeaderEpoch(request),
..
})) => Some(request),
_ => None,
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.topics = item;
}
}
Loading