diff --git a/tests/test_low_consumers.rs b/tests/test_low_consumers.rs index 75f147173..e1ce16bdf 100644 --- a/tests/test_low_consumers.rs +++ b/tests/test_low_consumers.rs @@ -1,6 +1,7 @@ //! Test data consumption using low level consumers. use std::collections::HashMap; +use std::convert::TryInto; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::thread; @@ -90,6 +91,67 @@ async fn test_produce_consume_seek() { } } +// Seeking should allow replaying messages and skipping messages. +#[tokio::test] +async fn test_produce_consume_seek_partitions() { + let _r = env_logger::try_init(); + + let topic_name = rand_test_topic(); + populate_topic(&topic_name, 30, &value_fn, &key_fn, None, None).await; + + let consumer = create_base_consumer(&rand_test_group(), None); + consumer.subscribe(&[topic_name.as_str()]).unwrap(); + + let mut partition_offset_map = HashMap::new(); + for message in consumer.iter().take(30) { + match message { + Ok(m) => { + let offset = partition_offset_map.entry(m.partition()).or_insert(0); + assert_eq!(m.offset(), *offset); + *offset += 1; + } + Err(e) => panic!("Error receiving message: {:?}", e), + } + } + + let mut tpl = TopicPartitionList::new(); + tpl.add_partition_offset(&topic_name, 0, Offset::Beginning) + .unwrap(); + tpl.add_partition_offset(&topic_name, 1, Offset::End) + .unwrap(); + tpl.add_partition_offset(&topic_name, 2, Offset::Offset(2)) + .unwrap(); + + let r_tpl = consumer.seek_partitions(tpl, None).unwrap(); + assert_eq!(r_tpl.elements().len(), 3); + for tpe in r_tpl.elements().iter() { + assert!(tpe.error().is_ok()); + } + + let msg_cnt_p0 = partition_offset_map.get(&0).unwrap(); + let msg_cnt_p2 = partition_offset_map.get(&2).unwrap(); + let total_msgs_to_read = msg_cnt_p0 + (msg_cnt_p2 - 2); + let mut poffset_map = HashMap::new(); + for message in consumer.iter().take(total_msgs_to_read.try_into().unwrap()) { + match message { + Ok(m) => { + let offset = poffset_map.entry(m.partition()).or_insert(0); + if m.partition() == 0 { + assert_eq!(m.offset(), *offset); + } else if m.partition() == 2 { + assert_eq!(m.offset(), *offset + 2); + } else if m.partition() == 1 { + panic!("Unexpected message from partition 1") + } + *offset += 1; + } + Err(e) => panic!("Error receiving message: {:?}", e), + } + } + assert_eq!(msg_cnt_p0, poffset_map.get(&0).unwrap()); + assert_eq!(msg_cnt_p2 - 2, *poffset_map.get(&2).unwrap()); +} + // All produced messages should be consumed. #[tokio::test] async fn test_produce_consume_iter() {