diff --git a/tests/test_high_consumers.rs b/tests/test_high_consumers.rs index 97ca4f5a0..548ccb94d 100644 --- a/tests/test_high_consumers.rs +++ b/tests/test_high_consumers.rs @@ -4,7 +4,7 @@ use std::collections::HashMap; use std::error::Error; use std::sync::Arc; -use futures::future::{self, FutureExt}; +use futures::future; use futures::stream::StreamExt; use maplit::hashmap; use rdkafka_sys::RDKafkaErrorCode; @@ -491,7 +491,7 @@ async fn test_consumer_commit_metadata() -> Result<(), Box> { Ok(()) } -#[tokio::test] +#[tokio::test(flavor = "multi_thread")] async fn test_consume_partition_order() { let _r = env_logger::try_init(); @@ -545,8 +545,8 @@ async fn test_consume_partition_order() { let partition1 = consumer.split_partition_queue(&topic_name, 1).unwrap(); let mut i = 0; - while i < 12 { - if let Some(m) = consumer.recv().now_or_never() { + while i < 5 { + if let Ok(m) = time::timeout(Duration::from_millis(1000), consumer.recv()).await { // retry on transient errors until we get a message let m = match m { Err(KafkaError::MessageConsumption( @@ -564,9 +564,11 @@ async fn test_consume_partition_order() { let partition: i32 = m.partition(); assert!(partition == 0 || partition == 2); i += 1; + } else { + panic!("Timeout receiving message"); } - if let Some(m) = partition1.recv().now_or_never() { + if let Ok(m) = time::timeout(Duration::from_millis(1000), partition1.recv()).await { // retry on transient errors until we get a message let m = match m { Err(KafkaError::MessageConsumption( @@ -583,6 +585,8 @@ async fn test_consume_partition_order() { }; assert_eq!(m.partition(), 1); i += 1; + } else { + panic!("Timeout receiving message"); } } }