Skip to content

Commit

Permalink
Merge pull request #167 from emilk/implement-pause-resume
Browse files Browse the repository at this point in the history
Implement pause/resume for Consumer
  • Loading branch information
benesch authored Oct 27, 2019
2 parents aea10ca + 0e5c747 commit 622e41e
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 0 deletions.
21 changes: 21 additions & 0 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,27 @@ impl<C: ConsumerContext> Consumer<C> for BaseConsumer<C> {
) -> KafkaResult<GroupList> {
self.client.fetch_group_list(group, timeout)
}

fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
let ret_code =
unsafe { rdsys::rd_kafka_pause_partitions(self.client.native_ptr(), partitions.ptr()) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::PauseResume(error));
};
Ok(())
}

fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
let ret_code = unsafe {
rdsys::rd_kafka_resume_partitions(self.client.native_ptr(), partitions.ptr())
};
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::PauseResume(error));
};
Ok(())
}
}

impl<C: ConsumerContext> Drop for BaseConsumer<C> {
Expand Down
10 changes: 10 additions & 0 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,14 @@ pub trait Consumer<C: ConsumerContext = DefaultConsumerContext> {
{
self.get_base_consumer().fetch_group_list(group, timeout)
}

/// Pause consumption for the provided list of partitions.
fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
self.get_base_consumer().pause(partitions)
}

/// Resume consumption for the provided list of partitions.
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
self.get_base_consumer().resume(partitions)
}
}
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub enum KafkaError {
OffsetFetch(RDKafkaError),
/// End of partition reached.
PartitionEOF(i32),
/// Pause/Resume failed.
PauseResume(String),
/// Setting partition offset failed.
SetPartitionOffset(RDKafkaError),
/// Offset store failed.
Expand Down Expand Up @@ -108,6 +110,9 @@ impl fmt::Debug for KafkaError {
KafkaError::Nul(_) => write!(f, "FFI null error"),
KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err),
KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n),
KafkaError::PauseResume(ref err) => {
write!(f, "KafkaError (Pause/resume error: {})", err)
}
KafkaError::SetPartitionOffset(err) => {
write!(f, "KafkaError (Set partition offset error: {})", err)
}
Expand Down Expand Up @@ -143,6 +148,7 @@ impl fmt::Display for KafkaError {
KafkaError::Nul(_) => write!(f, "FFI nul error"),
KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err),
KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err),
KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err),
KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err),
Expand All @@ -168,6 +174,7 @@ impl error::Error for KafkaError {
KafkaError::Nul(_) => "FFI nul error",
KafkaError::OffsetFetch(_) => "Offset fetch error",
KafkaError::PartitionEOF(_) => "Partition EOF error",
KafkaError::PauseResume(_) => "Pause/resume error",
KafkaError::SetPartitionOffset(_) => "Set partition offset error",
KafkaError::StoreOffset(_) => "Store offset error",
KafkaError::Subscription(_) => "Subscription error",
Expand All @@ -192,6 +199,7 @@ impl error::Error for KafkaError {
KafkaError::Nul(_) => None,
KafkaError::OffsetFetch(ref err) => Some(err),
KafkaError::PartitionEOF(_) => None,
KafkaError::PauseResume(_) => None,
KafkaError::SetPartitionOffset(ref err) => Some(err),
KafkaError::StoreOffset(ref err) => Some(err),
KafkaError::Subscription(_) => None,
Expand Down
52 changes: 52 additions & 0 deletions tests/test_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,55 @@ fn test_consumer_store_offset_commit() {
position.add_partition_offset(&topic_name, 2, Offset::Offset(12));
assert_eq!(position, consumer.position().unwrap());
}

fn ensure_empty<C: ConsumerContext>(consumer: &BaseConsumer<C>, err_msg: &str) {
const MAX_TRY_TIME: Duration = Duration::from_secs(2);
let start = Instant::now();
while start.elapsed() < MAX_TRY_TIME {
assert!(consumer.poll(MAX_TRY_TIME).is_none(), "{}", err_msg);
}
}

#[test]
fn test_pause_resume_consumer_iter() {
const PAUSE_COUNT: i32 = 3;
const MESSAGE_COUNT: i32 = 300;
const MESSAGES_PER_PAUSE: i32 = MESSAGE_COUNT / PAUSE_COUNT;

let _r = env_logger::try_init();

let topic_name = rand_test_topic();
populate_topic(
&topic_name,
MESSAGE_COUNT,
&value_fn,
&key_fn,
Some(0),
None,
);
let group_id = rand_test_group();
let consumer = create_base_consumer(&group_id, None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();

for _ in 0..PAUSE_COUNT {
let mut num_taken = 0;
for message in consumer.iter().take(MESSAGES_PER_PAUSE as usize) {
message.unwrap();
num_taken += 1;
}
assert_eq!(num_taken, MESSAGES_PER_PAUSE);

let partitions = consumer.assignment().unwrap();
assert!(partitions.count() > 0);
consumer.pause(&partitions).unwrap();

ensure_empty(
&consumer,
"Partition is paused - we should not receive anything",
);

consumer.resume(&partitions).unwrap();
}

ensure_empty(&consumer, "There should be no messages left");
}

0 comments on commit 622e41e

Please sign in to comment.