Skip to content

Commit

Permalink
Merge branch 'main' into improve_consumer_assertion
Browse files Browse the repository at this point in the history
  • Loading branch information
rukai authored Oct 18, 2024
2 parents 5d420b3 + d8b0e59 commit 110567a
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 23 deletions.
18 changes: 7 additions & 11 deletions shotover-proxy/tests/kafka_int_tests/test_cases.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tokio_bin_process::BinProcess;
async fn admin_setup(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
.create_topics(&[
.create_topics_and_wait(&[
NewTopic {
name: "partitions1",
num_partitions: 1,
Expand Down Expand Up @@ -442,7 +442,7 @@ pub async fn produce_consume_partitions1_kafka_node_goes_down(
{
let admin = connection_builder.connect_admin().await;
admin
.create_topics(&[NewTopic {
.create_topics_and_wait(&[NewTopic {
name: topic_name,
num_partitions: 1,
replication_factor: 3,
Expand Down Expand Up @@ -568,7 +568,7 @@ pub async fn produce_consume_partitions1_shotover_nodes_go_down(
{
let admin = connection_builder.connect_admin().await;
admin
.create_topics(&[NewTopic {
.create_topics_and_wait(&[NewTopic {
name: topic_name,
num_partitions: 1,
replication_factor: 3,
Expand Down Expand Up @@ -1204,16 +1204,13 @@ async fn produce_consume_acks0(connection_builder: &KafkaConnectionBuilder) {
pub async fn test_broker_idle_timeout(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin
.create_topics(&[NewTopic {
.create_topics_and_wait(&[NewTopic {
name: "partitions3",
num_partitions: 3,
replication_factor: 1,
}])
.await;

// cpp driver hits race condition here
tokio::time::sleep(Duration::from_secs(2)).await;

let mut producer = connection_builder.connect_producer("all", 0).await;
let mut consumer = connection_builder
.connect_consumer(
Expand Down Expand Up @@ -1286,7 +1283,7 @@ pub async fn standard_test_suite(connection_builder: &KafkaConnectionBuilder) {
let admin = connection_builder.connect_admin().await;
admin.delete_topics(&["partitions1"]).await;
admin
.create_topics(&[NewTopic {
.create_topics_and_wait(&[NewTopic {
name: "partitions1",
num_partitions: 1,
replication_factor: 1,
Expand Down Expand Up @@ -1368,7 +1365,7 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
standard_test_suite(connection_builder).await;
let admin = connection_builder.connect_admin().await;
admin
.create_topics(&[
.create_topics_and_wait(&[
NewTopic {
name: "partitions1_rf3",
num_partitions: 1,
Expand All @@ -1381,7 +1378,6 @@ pub async fn cluster_test_suite(connection_builder: &KafkaConnectionBuilder) {
},
])
.await;
tokio::time::sleep(Duration::from_secs(10)).await;
produce_consume_partitions1(connection_builder, "partitions1_rf3").await;
produce_consume_partitions3(connection_builder, "partitions3_rf3", 1, 500).await;
}
Expand Down Expand Up @@ -1428,7 +1424,7 @@ pub async fn assert_topic_creation_is_denied_due_to_acl(connection: &KafkaConnec
// * The request succeeds because user has AclOperation::Describe.
// * But no topic is found since the topic creation was denied.
assert_eq!(
admin.describe_topic("acl_check_topic").await.unwrap_err().to_string(),
admin.describe_topics(&["acl_check_topic"]).await.unwrap_err().to_string(),
"org.apache.kafka.common.errors.UnknownTopicOrPartitionException: This server does not host this topic-partition.\n"
)
}
36 changes: 30 additions & 6 deletions test-helpers/src/connection/kafka/java.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::{
Acl, AclOperation, AclPermissionType, AlterConfig, ConsumerConfig, ExpectedResponse,
ListOffsetsResultInfo, NewPartition, NewTopic, OffsetAndMetadata, OffsetSpec, ProduceResult,
Record, ResourcePatternType, ResourceSpecifier, ResourceType, TopicDescription, TopicPartition,
TopicPartitionInfo,
};
use crate::connection::java::{Jvm, Value};
use anyhow::Result;
Expand Down Expand Up @@ -437,16 +438,39 @@ impl KafkaAdminJava {
self.create_topics_fallible(topics).await.unwrap();
}

pub async fn describe_topic(&self, topic_name: &str) -> Result<TopicDescription> {
let topics = self
.jvm
.new_list("java.lang.String", vec![self.jvm.new_string(topic_name)]);
pub async fn describe_topics(&self, topic_names: &[&str]) -> Result<Vec<TopicDescription>> {
let topics = self.jvm.new_list(
"java.lang.String",
topic_names
.iter()
.map(|topic| self.jvm.new_string(topic))
.collect(),
);

self.admin
let topic_names_to_info = self
.admin
.call("describeTopics", vec![topics])
.call_async_fallible("allTopicNames", vec![])
.await?;
Ok(TopicDescription {})

Ok(topic_names_to_info
.call("values", vec![])
.call("iterator", vec![])
.into_iter()
.map(|java_topic_description| {
let java_topic_description =
java_topic_description.cast("org.apache.kafka.clients.admin.TopicDescription");
TopicDescription {
topic_name: java_topic_description.call("name", vec![]).into_rust(),
partitions: java_topic_description
.call("partitions", vec![])
.call("iterator", vec![])
.into_iter()
.map(|_| TopicPartitionInfo {})
.collect(),
}
})
.collect())
}

pub async fn create_topics_fallible(&self, topics: &[NewTopic<'_>]) -> Result<()> {
Expand Down
59 changes: 53 additions & 6 deletions test-helpers/src/connection/kafka/mod.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use pretty_assertions::assert_eq;
use std::{
collections::{HashMap, HashSet},
time::Duration,
time::{Duration, Instant},
};

#[cfg(feature = "kafka-cpp-driver-tests")]
Expand Down Expand Up @@ -380,11 +380,56 @@ impl KafkaAdmin {
}
}

pub async fn describe_topic(&self, topic_name: &str) -> Result<TopicDescription> {
pub async fn create_topics_and_wait(&self, topics: &[NewTopic<'_>]) {
self.create_topics(topics).await;

match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
KafkaAdmin::Cpp(_) => {
// rdkafka-rs driver doesnt support describe_topics so just wait instead :/
tokio::time::sleep(Duration::from_secs(5)).await;
}
KafkaAdmin::Java(_) => {
let instant = Instant::now();
let topics_to_wait_for: Vec<&str> = topics.iter().map(|x| x.name).collect();
loop {
if self.are_topics_ready(&topics_to_wait_for).await {
break;
} else {
tokio::time::sleep(Duration::from_millis(1)).await;
if instant.elapsed() > Duration::from_secs(30) {
panic!("Timedout while waiting for created topics to be available. Was waiting for topics {topics_to_wait_for:?}")
}
}
}
}
}
}

async fn are_topics_ready(&self, topics_to_wait_for: &[&str]) -> bool {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
KafkaAdmin::Cpp(_) => unimplemented!(),
KafkaAdmin::Java(java) => java.describe_topic(topic_name).await,
KafkaAdmin::Java(java) => match java.describe_topics(topics_to_wait_for).await {
Ok(topics) => {
for topic in topics {
if topic.partitions.is_empty() {
return false;
}
}
true
}

Err(_) => false,
},
}
}

pub async fn describe_topics(&self, topic_names: &[&str]) -> Result<Vec<TopicDescription>> {
match self {
#[cfg(feature = "kafka-cpp-driver-tests")]
KafkaAdmin::Cpp(_) => unimplemented!(),
KafkaAdmin::Java(java) => java.describe_topics(topic_names).await,
}
}

Expand Down Expand Up @@ -526,11 +571,13 @@ pub enum AclPermissionType {

#[derive(Debug)]
pub struct TopicDescription {
// None of our tests actually make use of the contents of TopicDescription,
// instead they just check if the describe succeeded or failed,
// so this is intentionally left empty for now
pub topic_name: String,
pub partitions: Vec<TopicPartitionInfo>,
}

#[derive(Debug)]
pub struct TopicPartitionInfo {}

pub enum OffsetSpec {
Earliest,
Latest,
Expand Down

0 comments on commit 110567a

Please sign in to comment.