Skip to content

Commit

Permalink
KafkaSinkCluster: Split OffsetFetch
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai committed Oct 28, 2024
1 parent 1b52506 commit ccbefb0
Show file tree
Hide file tree
Showing 2 changed files with 93 additions and 13 deletions.
71 changes: 61 additions & 10 deletions shotover/src/transforms/kafka/sink_cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use kafka_protocol::messages::fetch_response::LeaderIdAndEpoch as FetchResponseL
use kafka_protocol::messages::list_offsets_request::ListOffsetsTopic;
use kafka_protocol::messages::metadata_request::MetadataRequestTopic;
use kafka_protocol::messages::metadata_response::MetadataResponseBroker;
use kafka_protocol::messages::offset_fetch_request::OffsetFetchRequestGroup;
use kafka_protocol::messages::offset_for_leader_epoch_request::OffsetForLeaderTopic;
use kafka_protocol::messages::produce_request::TopicProduceData;
use kafka_protocol::messages::produce_response::{
Expand All @@ -29,10 +30,10 @@ use kafka_protocol::messages::{
BrokerId, DeleteGroupsRequest, DeleteGroupsResponse, EndTxnRequest, FetchRequest,
FetchResponse, FindCoordinatorRequest, FindCoordinatorResponse, GroupId, HeartbeatRequest,
InitProducerIdRequest, JoinGroupRequest, LeaveGroupRequest, ListOffsetsRequest,
ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetForLeaderEpochRequest,
OffsetForLeaderEpochResponse, ProduceRequest, ProduceResponse, RequestHeader,
SaslAuthenticateRequest, SaslAuthenticateResponse, SaslHandshakeRequest, SyncGroupRequest,
TopicName, TransactionalId, TxnOffsetCommitRequest,
ListOffsetsResponse, MetadataRequest, MetadataResponse, OffsetFetchRequest,
OffsetFetchResponse, OffsetForLeaderEpochRequest, OffsetForLeaderEpochResponse, ProduceRequest,
ProduceResponse, RequestHeader, SaslAuthenticateRequest, SaslAuthenticateResponse,
SaslHandshakeRequest, SyncGroupRequest, TopicName, TransactionalId, TxnOffsetCommitRequest,
};
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::ResponseError;
Expand All @@ -48,8 +49,8 @@ use serde::{Deserialize, Serialize};
use shotover_node::{ShotoverNode, ShotoverNodeConfig};
use split::{
AddPartitionsToTxnRequestSplitAndRouter, DeleteGroupsSplitAndRouter,
ListOffsetsRequestSplitAndRouter, OffsetForLeaderEpochRequestSplitAndRouter,
ProduceRequestSplitAndRouter, RequestSplitAndRouter,
ListOffsetsRequestSplitAndRouter, OffsetFetchSplitAndRouter,
OffsetForLeaderEpochRequestSplitAndRouter, ProduceRequestSplitAndRouter, RequestSplitAndRouter,
};
use std::collections::{HashMap, HashSet, VecDeque};
use std::hash::Hasher;
Expand Down Expand Up @@ -899,18 +900,18 @@ impl KafkaSinkCluster {
body: RequestBody::OffsetFetch(offset_fetch),
header,
})) => {
let group_id = if header.request_api_version <= 7 {
offset_fetch.group_id.clone()
if header.request_api_version <= 7 {
let group_id = offset_fetch.group_id.clone();
self.route_to_group_coordinator(message, group_id);
} else {
// This is possibly dangerous.
// The client could construct a message which is valid for a specific shotover node, but not for any single kafka broker.
// We may need to add some logic to split the request into multiple messages going to different destinations,
// and then reconstruct the response back into a single response
//
// For now just pick the first group as that is sufficient for the simple cases.
offset_fetch.groups.first().unwrap().group_id.clone()
self.split_and_route_request::<OffsetFetchSplitAndRouter>(message)?;
};
self.route_to_group_coordinator(message, group_id);
}
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetCommit(offset_commit),
Expand Down Expand Up @@ -1405,6 +1406,32 @@ impl KafkaSinkCluster {
result
}

/// This method removes all group ids from the DeleteGroups request and returns them split up by their destination.
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_offset_fetch_request_by_destination(
&mut self,
body: &mut OffsetFetchRequest,
) -> HashMap<BrokerId, Vec<OffsetFetchRequestGroup>> {
let mut result: HashMap<BrokerId, Vec<OffsetFetchRequestGroup>> = Default::default();

for group in body.groups.drain(..) {
if let Some(destination) = self.group_to_coordinator_broker.get(&group.group_id) {
let dest_groups = result.entry(*destination).or_default();
dest_groups.push(group);
} else {
tracing::warn!(
"no known coordinator for group {:?}, routing message to a random broker so that a NOT_COORDINATOR or similar error is returned to the client",
group.group_id
);
let destination = BrokerId(-1);
let dest_groups = result.entry(destination).or_default();
dest_groups.push(group);
}
}

result
}

/// This method removes all topics from the list offsets request and returns them split up by their destination
/// If any topics are unroutable they will have their BrokerId set to -1
fn split_offset_for_leader_epoch_request_by_destination(
Expand Down Expand Up @@ -1948,6 +1975,10 @@ impl KafkaSinkCluster {
body: ResponseBody::DeleteGroups(base),
..
})) => Self::combine_delete_groups_responses(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetFetch(base),
..
})) => Self::combine_offset_fetch(base, drain)?,
Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::AddPartitionsToTxn(base),
version,
Expand Down Expand Up @@ -2149,6 +2180,25 @@ impl KafkaSinkCluster {
Ok(())
}

fn combine_offset_fetch(
base_offset_fetch: &mut OffsetFetchResponse,
drain: impl Iterator<Item = Message>,
) -> Result<()> {
for mut next in drain {
if let Some(Frame::Kafka(KafkaFrame::Response {
body: ResponseBody::OffsetFetch(next_offset_fetch),
..
})) = next.frame()
{
base_offset_fetch
.groups
.extend(std::mem::take(&mut next_offset_fetch.groups))
}
}

Ok(())
}

fn combine_add_partitions_to_txn(
base_add_partitions_to_txn: &mut AddPartitionsToTxnResponse,
drain: impl Iterator<Item = Message>,
Expand Down Expand Up @@ -2357,6 +2407,7 @@ impl KafkaSinkCluster {
body: ResponseBody::OffsetFetch(offset_fetch),
..
})) => {
// TODO: suspicious
self.handle_group_coordinator_routing_error(&request_ty, offset_fetch.error_code)
}
Some(Frame::Kafka(KafkaFrame::Response {
Expand Down
35 changes: 32 additions & 3 deletions shotover/src/transforms/kafka/sink_cluster/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use crate::{
};
use kafka_protocol::messages::{
add_partitions_to_txn_request::AddPartitionsToTxnTransaction,
list_offsets_request::ListOffsetsTopic, offset_for_leader_epoch_request::OffsetForLeaderTopic,
produce_request::TopicProduceData, AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest,
GroupId, ListOffsetsRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
list_offsets_request::ListOffsetsTopic, offset_fetch_request::OffsetFetchRequestGroup,
offset_for_leader_epoch_request::OffsetForLeaderTopic, produce_request::TopicProduceData,
AddPartitionsToTxnRequest, BrokerId, DeleteGroupsRequest, GroupId, ListOffsetsRequest,
OffsetFetchRequest, OffsetForLeaderEpochRequest, ProduceRequest, TopicName,
};
use std::collections::HashMap;

Expand Down Expand Up @@ -164,3 +165,31 @@ impl RequestSplitAndRouter for DeleteGroupsSplitAndRouter {
request.groups_names = item;
}
}

pub struct OffsetFetchSplitAndRouter;

impl RequestSplitAndRouter for OffsetFetchSplitAndRouter {
type Request = OffsetFetchRequest;
type SubRequests = Vec<OffsetFetchRequestGroup>;

fn split_by_destination(
transform: &mut KafkaSinkCluster,
request: &mut Self::Request,
) -> HashMap<BrokerId, Self::SubRequests> {
transform.split_offset_fetch_request_by_destination(request)
}

fn get_request_frame(request: &mut Message) -> Option<&mut Self::Request> {
match request.frame() {
Some(Frame::Kafka(KafkaFrame::Request {
body: RequestBody::OffsetFetch(request),
..
})) => Some(request),
_ => None,
}
}

fn reassemble(request: &mut Self::Request, item: Self::SubRequests) {
request.groups = item;
}
}

0 comments on commit ccbefb0

Please sign in to comment.