Skip to content

Commit

Permalink
Merge pull request #645 from fede1024/davidblewett/tune-docker-compose
Browse files Browse the repository at this point in the history
Disable valgrind in CI temporarily
  • Loading branch information
davidblewett authored Jan 11, 2024
2 parents 0c5c131 + 321c040 commit 738590b
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 72 deletions.
50 changes: 26 additions & 24 deletions test_suite.sh
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,36 @@ run_with_valgrind() {
# Initialize.

git submodule update --init
cargo test --no-run
docker-compose up -d
cargo test

# Run unit tests.

echo_good "*** Run unit tests ***"
for test_file in target/debug/deps/rdkafka-*
do
if [[ -x "$test_file" ]]
then
echo_good "Executing "$test_file""
run_with_valgrind "$test_file"
fi
done
echo_good "*** Unit tests succeeded ***"

# Run integration tests.

echo_good "*** Run unit tests ***"
for test_file in target/debug/deps/test_*
do
if [[ -x "$test_file" ]]
then
echo_good "Executing "$test_file""
run_with_valgrind "$test_file"
fi
done
echo_good "*** Integration tests succeeded ***"
#echo_good "*** Run unit tests ***"
#for test_file in target/debug/deps/rdkafka-*
#do
# if [[ -x "$test_file" ]]
# then
# echo_good "Executing "$test_file""
# run_with_valgrind "$test_file"
# fi
#done
#echo_good "*** Unit tests succeeded ***"
#
## Run integration tests.
#
#echo_good "*** Run integration tests ***"
#for test_file in target/debug/deps/test_*
#do
# if [[ -x "$test_file" ]]
# then
# #echo_good "*** Restarting kafka/zk ***"
# #docker-compose restart --timeout 30
# echo_good "Executing "$test_file""
# run_with_valgrind "$test_file"
# fi
#done
#echo_good "*** Integration tests succeeded ***"

# Run smol runtime example.

Expand Down
14 changes: 7 additions & 7 deletions tests/test_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ fn create_admin_client() -> AdminClient<DefaultClientContext> {

async fn create_consumer_group(consumer_group_name: &str) {
let admin_client = create_admin_client();
let topic_name = &rand_test_topic();
let topic_name = &rand_test_topic(consumer_group_name);
let consumer: BaseConsumer = create_config()
.set("group.id", consumer_group_name.clone())
.create()
Expand Down Expand Up @@ -124,8 +124,8 @@ async fn test_topics() {
// Verify that topics are created as specified, and that they can later
// be deleted.
{
let name1 = rand_test_topic();
let name2 = rand_test_topic();
let name1 = rand_test_topic("test_topics");
let name2 = rand_test_topic("test_topics");

// Test both the builder API and the literal construction.
let topic1 =
Expand Down Expand Up @@ -254,7 +254,7 @@ async fn test_topics() {
// Verify that incorrect replication configurations are ignored when
// creating partitions.
{
let name = rand_test_topic();
let name = rand_test_topic("test_topics");
let topic = NewTopic::new(&name, 1, TopicReplication::Fixed(1));

let res = admin_client
Expand Down Expand Up @@ -291,7 +291,7 @@ async fn test_topics() {

// Verify that deleting a non-existent topic fails.
{
let name = rand_test_topic();
let name = rand_test_topic("test_topics");
let res = admin_client
.delete_topics(&[&name], &opts)
.await
Expand All @@ -305,8 +305,8 @@ async fn test_topics() {
// Verify that mixed-success operations properly report the successful and
// failing operators.
{
let name1 = rand_test_topic();
let name2 = rand_test_topic();
let name1 = rand_test_topic("test_topics");
let name2 = rand_test_topic("test_topics");

let topic1 = NewTopic::new(&name1, 1, TopicReplication::Fixed(1));
let topic2 = NewTopic::new(&name2, 1, TopicReplication::Fixed(1));
Expand Down
34 changes: 19 additions & 15 deletions tests/test_high_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::collections::HashMap;
use std::error::Error;
use std::sync::Arc;

use futures::future::{self, FutureExt};
use futures::future;
use futures::stream::StreamExt;
use maplit::hashmap;
use rdkafka_sys::RDKafkaErrorCode;
Expand Down Expand Up @@ -70,7 +70,7 @@ async fn test_produce_consume_base() {
let _r = env_logger::try_init();

let start_time = current_time_millis();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base");
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
let consumer = create_stream_consumer(&rand_test_group(), None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
Expand Down Expand Up @@ -105,7 +105,7 @@ async fn test_produce_consume_base() {
async fn test_produce_consume_base_concurrent() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base_concurrent");
populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;

let consumer = Arc::new(create_stream_consumer(&rand_test_group(), None));
Expand Down Expand Up @@ -135,7 +135,7 @@ async fn test_produce_consume_base_concurrent() {
async fn test_produce_consume_base_assign() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base_assign");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
Expand Down Expand Up @@ -170,7 +170,7 @@ async fn test_produce_consume_base_assign() {
async fn test_produce_consume_base_unassign() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base_unassign");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
Expand All @@ -195,7 +195,7 @@ async fn test_produce_consume_base_unassign() {
async fn test_produce_consume_base_incremental_assign_and_unassign() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_base_incremental_assign_and_unassign");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(2), None).await;
Expand Down Expand Up @@ -236,7 +236,7 @@ async fn test_produce_consume_base_incremental_assign_and_unassign() {
async fn test_produce_consume_with_timestamp() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_with_timestamp");
let message_map =
populate_topic(&topic_name, 100, &value_fn, &key_fn, Some(0), Some(1111)).await;
let consumer = create_stream_consumer(&rand_test_group(), None);
Expand Down Expand Up @@ -277,7 +277,7 @@ async fn test_produce_consume_with_timestamp() {
async fn test_consumer_commit_message() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consumer_commit_message");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await;
Expand Down Expand Up @@ -355,7 +355,7 @@ async fn test_consumer_commit_message() {
async fn test_consumer_store_offset_commit() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consumer_store_offset_commit");
populate_topic(&topic_name, 10, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 11, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 12, &value_fn, &key_fn, Some(2), None).await;
Expand Down Expand Up @@ -440,7 +440,7 @@ async fn test_consumer_store_offset_commit() {
async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
let _ = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consumer_commit_metadata");
let group_name = rand_test_group();
populate_topic(&topic_name, 10, &value_fn, &key_fn, None, None).await;

Expand Down Expand Up @@ -491,11 +491,11 @@ async fn test_consumer_commit_metadata() -> Result<(), Box<dyn Error>> {
Ok(())
}

#[tokio::test]
#[tokio::test(flavor = "multi_thread")]
async fn test_consume_partition_order() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consume_partition_order");
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(2), None).await;
Expand Down Expand Up @@ -545,8 +545,8 @@ async fn test_consume_partition_order() {
let partition1 = consumer.split_partition_queue(&topic_name, 1).unwrap();

let mut i = 0;
while i < 12 {
if let Some(m) = consumer.recv().now_or_never() {
while i < 5 {
if let Ok(m) = time::timeout(Duration::from_millis(1000), consumer.recv()).await {
// retry on transient errors until we get a message
let m = match m {
Err(KafkaError::MessageConsumption(
Expand All @@ -564,9 +564,11 @@ async fn test_consume_partition_order() {
let partition: i32 = m.partition();
assert!(partition == 0 || partition == 2);
i += 1;
} else {
panic!("Timeout receiving message");
}

if let Some(m) = partition1.recv().now_or_never() {
if let Ok(m) = time::timeout(Duration::from_millis(1000), partition1.recv()).await {
// retry on transient errors until we get a message
let m = match m {
Err(KafkaError::MessageConsumption(
Expand All @@ -583,6 +585,8 @@ async fn test_consume_partition_order() {
};
assert_eq!(m.partition(), 1);
i += 1;
} else {
panic!("Timeout receiving message");
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions tests/test_high_producers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ fn future_producer(config_overrides: HashMap<&str, &str>) -> FutureProducer<Defa
#[tokio::test]
async fn test_future_producer_send() {
let producer = future_producer(HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_future_producer_send");

let results: FuturesUnordered<_> = (0..10)
.map(|_| {
Expand Down Expand Up @@ -60,7 +60,7 @@ async fn test_future_producer_send_full() {
config.insert("message.timeout.ms", "5000");
config.insert("queue.buffering.max.messages", "1");
let producer = &future_producer(config);
let topic_name = &rand_test_topic();
let topic_name = &rand_test_topic("test_future_producer_send_full");

// Fill up the queue.
producer
Expand Down
12 changes: 6 additions & 6 deletions tests/test_low_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ fn create_base_consumer(
async fn test_produce_consume_seek() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_seek");
populate_topic(&topic_name, 5, &value_fn, &key_fn, Some(0), None).await;
let consumer = create_base_consumer(&rand_test_group(), None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
Expand Down Expand Up @@ -96,7 +96,7 @@ async fn test_produce_consume_seek() {
async fn test_produce_consume_seek_partitions() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_seek_partitions");
populate_topic(&topic_name, 30, &value_fn, &key_fn, None, None).await;

let consumer = create_base_consumer(&rand_test_group(), None);
Expand Down Expand Up @@ -158,7 +158,7 @@ async fn test_produce_consume_iter() {
let _r = env_logger::try_init();

let start_time = current_time_millis();
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_iter");
let message_map = populate_topic(&topic_name, 100, &value_fn, &key_fn, None, None).await;
let consumer = create_base_consumer(&rand_test_group(), None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();
Expand Down Expand Up @@ -196,7 +196,7 @@ async fn test_pause_resume_consumer_iter() {

let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_pause_resume_consumer_iter");
populate_topic(
&topic_name,
MESSAGE_COUNT,
Expand Down Expand Up @@ -237,7 +237,7 @@ async fn test_pause_resume_consumer_iter() {
async fn test_consume_partition_order() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_consume_partition_order");
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(0), None).await;
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(1), None).await;
populate_topic(&topic_name, 4, &value_fn, &key_fn, Some(2), None).await;
Expand Down Expand Up @@ -357,7 +357,7 @@ async fn test_consume_partition_order() {
async fn test_produce_consume_message_queue_nonempty_callback() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_produce_consume_message_queue_nonempty_callback");

create_topic(&topic_name, 1).await;

Expand Down
22 changes: 14 additions & 8 deletions tests/test_low_producers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ where
#[test]
fn test_base_producer_queue_full() {
let producer = base_producer(hashmap! { "queue.buffering.max.messages" => "10" });
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_base_producer_queue_full");

let results = (0..30)
.map(|id| {
Expand Down Expand Up @@ -235,7 +235,7 @@ fn test_base_producer_timeout() {
"bootstrap.servers" => "1.2.3.4"
},
);
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_base_producer_timeout");

let results_count = (0..10)
.map(|id| {
Expand Down Expand Up @@ -346,7 +346,7 @@ fn test_base_producer_headers() {
ids: ids_set.clone(),
};
let producer = base_producer_with_context(context, HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_base_producer_headers");

let results_count = (0..10)
.map(|id| {
Expand Down Expand Up @@ -387,7 +387,7 @@ fn test_base_producer_headers() {
fn test_threaded_producer_send() {
let context = CollectingContext::new();
let producer = threaded_producer_with_context(context.clone(), HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_threaded_producer_send");

let results_count = (0..10)
.map(|id| {
Expand Down Expand Up @@ -431,7 +431,7 @@ fn test_base_producer_opaque_arc() -> Result<(), Box<dyn Error>> {
let shared_count = Arc::new(Mutex::new(0));
let context = OpaqueArcContext {};
let producer = base_producer_with_context(context, HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_base_producer_opaque_arc");

let results_count = (0..10)
.map(|_| {
Expand Down Expand Up @@ -482,7 +482,13 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() {
let producer = base_producer_with_context(context.clone(), config_overrides);

producer
.send(BaseRecord::<(), str, usize>::with_opaque_to(&rand_test_topic(), 0).payload(""))
.send(
BaseRecord::<(), str, usize>::with_opaque_to(
&rand_test_topic("test_register_custom_partitioner_linger_non_zero_key_null"),
0,
)
.payload(""),
)
.unwrap();
producer.flush(Duration::from_secs(10)).unwrap();

Expand All @@ -499,7 +505,7 @@ fn test_register_custom_partitioner_linger_non_zero_key_null() {
fn test_custom_partitioner_base_producer() {
let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2));
let producer = base_producer_with_context(context.clone(), HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_custom_partitioner_base_producer");

let results_count = (0..10)
.map(|id| {
Expand Down Expand Up @@ -527,7 +533,7 @@ fn test_custom_partitioner_base_producer() {
fn test_custom_partitioner_threaded_producer() {
let context = CollectingContext::new_with_custom_partitioner(FixedPartitioner::new(2));
let producer = threaded_producer_with_context(context.clone(), HashMap::new());
let topic_name = rand_test_topic();
let topic_name = rand_test_topic("test_custom_partitioner_threaded_producer");

let results_count = (0..10)
.map(|id| {
Expand Down
Loading

0 comments on commit 738590b

Please sign in to comment.