Skip to content

Commit

Permalink
Add test for seek_partitions
Browse files Browse the repository at this point in the history
  • Loading branch information
scanterog committed Jul 5, 2023
1 parent 7f266b1 commit 020700d
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions tests/test_low_consumers.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Test data consumption using low level consumers.
use std::collections::HashMap;
use std::convert::TryInto;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread;
Expand Down Expand Up @@ -90,6 +91,67 @@ async fn test_produce_consume_seek() {
}
}

// Seeking should allow replaying messages and skipping messages.
#[tokio::test]
async fn test_produce_consume_seek_partitions() {
let _r = env_logger::try_init();

let topic_name = rand_test_topic();
populate_topic(&topic_name, 30, &value_fn, &key_fn, None, None).await;

let consumer = create_base_consumer(&rand_test_group(), None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();

let mut partition_offset_map = HashMap::new();
for message in consumer.iter().take(30) {
match message {
Ok(m) => {
let offset = partition_offset_map.entry(m.partition()).or_insert(0);
assert_eq!(m.offset(), *offset);
*offset += 1;
}
Err(e) => panic!("Error receiving message: {:?}", e),
}
}

let mut tpl = TopicPartitionList::new();
tpl.add_partition_offset(&topic_name, 0, Offset::Beginning)
.unwrap();
tpl.add_partition_offset(&topic_name, 1, Offset::End)
.unwrap();
tpl.add_partition_offset(&topic_name, 2, Offset::Offset(2))
.unwrap();

let r_tpl = consumer.seek_partitions(tpl, None).unwrap();
assert_eq!(r_tpl.elements().len(), 3);
for tpe in r_tpl.elements().iter() {
assert!(tpe.error().is_ok());
}

let msg_cnt_p0 = partition_offset_map.get(&0).unwrap();
let msg_cnt_p2 = partition_offset_map.get(&2).unwrap();
let total_msgs_to_read = msg_cnt_p0 + (msg_cnt_p2 - 2);
let mut poffset_map = HashMap::new();
for message in consumer.iter().take(total_msgs_to_read.try_into().unwrap()) {
match message {
Ok(m) => {
let offset = poffset_map.entry(m.partition()).or_insert(0);
if m.partition() == 0 {
assert_eq!(m.offset(), *offset);
} else if m.partition() == 2 {
assert_eq!(m.offset(), *offset + 2);
} else if m.partition() == 1 {
panic!("Unexpected message from partition 1")
}
*offset += 1;
}
Err(e) => panic!("Error receiving message: {:?}", e),
}
}
assert_eq!(msg_cnt_p0, poffset_map.get(&0).unwrap());
assert_eq!(msg_cnt_p2 - 2, *poffset_map.get(&2).unwrap());
}

// All produced messages should be consumed.
#[tokio::test]
async fn test_produce_consume_iter() {
Expand Down

0 comments on commit 020700d

Please sign in to comment.