diff --git a/src/consumer/base_consumer.rs b/src/consumer/base_consumer.rs index 0150008fa..715641fef 100644 --- a/src/consumer/base_consumer.rs +++ b/src/consumer/base_consumer.rs @@ -400,6 +400,27 @@ impl Consumer for BaseConsumer { ) -> KafkaResult { self.client.fetch_group_list(group, timeout) } + + fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> { + let ret_code = + unsafe { rdsys::rd_kafka_pause_partitions(self.client.native_ptr(), partitions.ptr()) }; + if ret_code.is_error() { + let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) }; + return Err(KafkaError::PauseResume(error)); + }; + Ok(()) + } + + fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> { + let ret_code = unsafe { + rdsys::rd_kafka_resume_partitions(self.client.native_ptr(), partitions.ptr()) + }; + if ret_code.is_error() { + let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) }; + return Err(KafkaError::PauseResume(error)); + }; + Ok(()) + } } impl Drop for BaseConsumer { diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index 9ac512ad5..65902753e 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -274,4 +274,14 @@ pub trait Consumer { { self.get_base_consumer().fetch_group_list(group, timeout) } + + /// Pause consumption for the provided list of partitions. + fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> { + self.get_base_consumer().pause(partitions) + } + + /// Resume consumption for the provided list of partitions. + fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> { + self.get_base_consumer().resume(partitions) + } } diff --git a/src/error.rs b/src/error.rs index 49dd4ea8c..127234266 100644 --- a/src/error.rs +++ b/src/error.rs @@ -62,6 +62,8 @@ pub enum KafkaError { OffsetFetch(RDKafkaError), /// End of partition reached. PartitionEOF(i32), + /// Pause/Resume failed. + PauseResume(String), /// Setting partition offset failed. SetPartitionOffset(RDKafkaError), /// Offset store failed. @@ -108,6 +110,9 @@ impl fmt::Debug for KafkaError { KafkaError::Nul(_) => write!(f, "FFI null error"), KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err), KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n), + KafkaError::PauseResume(ref err) => { + write!(f, "KafkaError (Pause/resume error: {})", err) + } KafkaError::SetPartitionOffset(err) => { write!(f, "KafkaError (Set partition offset error: {})", err) } @@ -143,6 +148,7 @@ impl fmt::Display for KafkaError { KafkaError::Nul(_) => write!(f, "FFI nul error"), KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err), KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n), + KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err), KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err), KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err), KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err), @@ -168,6 +174,7 @@ impl error::Error for KafkaError { KafkaError::Nul(_) => "FFI nul error", KafkaError::OffsetFetch(_) => "Offset fetch error", KafkaError::PartitionEOF(_) => "Partition EOF error", + KafkaError::PauseResume(_) => "Pause/resume error", KafkaError::SetPartitionOffset(_) => "Set partition offset error", KafkaError::StoreOffset(_) => "Store offset error", KafkaError::Subscription(_) => "Subscription error", @@ -192,6 +199,7 @@ impl error::Error for KafkaError { KafkaError::Nul(_) => None, KafkaError::OffsetFetch(ref err) => Some(err), KafkaError::PartitionEOF(_) => None, + KafkaError::PauseResume(_) => None, KafkaError::SetPartitionOffset(ref err) => Some(err), KafkaError::StoreOffset(ref err) => Some(err), KafkaError::Subscription(_) => None, diff --git a/tests/test_consumers.rs b/tests/test_consumers.rs index d923b7f41..f8065a35a 100644 --- a/tests/test_consumers.rs +++ b/tests/test_consumers.rs @@ -387,3 +387,55 @@ fn test_consumer_store_offset_commit() { position.add_partition_offset(&topic_name, 2, Offset::Offset(12)); assert_eq!(position, consumer.position().unwrap()); } + +fn ensure_empty(consumer: &BaseConsumer, err_msg: &str) { + const MAX_TRY_TIME: Duration = Duration::from_secs(2); + let start = Instant::now(); + while start.elapsed() < MAX_TRY_TIME { + assert!(consumer.poll(MAX_TRY_TIME).is_none(), "{}", err_msg); + } +} + +#[test] +fn test_pause_resume_consumer_iter() { + const PAUSE_COUNT: i32 = 3; + const MESSAGE_COUNT: i32 = 300; + const MESSAGES_PER_PAUSE: i32 = MESSAGE_COUNT / PAUSE_COUNT; + + let _r = env_logger::try_init(); + + let topic_name = rand_test_topic(); + populate_topic( + &topic_name, + MESSAGE_COUNT, + &value_fn, + &key_fn, + Some(0), + None, + ); + let group_id = rand_test_group(); + let consumer = create_base_consumer(&group_id, None); + consumer.subscribe(&[topic_name.as_str()]).unwrap(); + + for _ in 0..PAUSE_COUNT { + let mut num_taken = 0; + for message in consumer.iter().take(MESSAGES_PER_PAUSE as usize) { + message.unwrap(); + num_taken += 1; + } + assert_eq!(num_taken, MESSAGES_PER_PAUSE); + + let partitions = consumer.assignment().unwrap(); + assert!(partitions.count() > 0); + consumer.pause(&partitions).unwrap(); + + ensure_empty( + &consumer, + "Partition is paused - we should not receive anything", + ); + + consumer.resume(&partitions).unwrap(); + } + + ensure_empty(&consumer, "There should be no messages left"); +}