Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement pause/resume for Consumer #167

Merged
merged 2 commits into from
Oct 27, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions src/consumer/base_consumer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,27 @@ impl<C: ConsumerContext> Consumer<C> for BaseConsumer<C> {
) -> KafkaResult<GroupList> {
self.client.fetch_group_list(group, timeout)
}

fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
let ret_code =
unsafe { rdsys::rd_kafka_pause_partitions(self.client.native_ptr(), partitions.ptr()) };
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::PauseResume(error));
};
Ok(())
}

fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
let ret_code = unsafe {
rdsys::rd_kafka_resume_partitions(self.client.native_ptr(), partitions.ptr())
};
if ret_code.is_error() {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(ret_code)) };
return Err(KafkaError::PauseResume(error));
};
Ok(())
}
}

impl<C: ConsumerContext> Drop for BaseConsumer<C> {
Expand Down
10 changes: 10 additions & 0 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,4 +274,14 @@ pub trait Consumer<C: ConsumerContext = DefaultConsumerContext> {
{
self.get_base_consumer().fetch_group_list(group, timeout)
}

/// Pause consumption for the provided list of partitions.
fn pause(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
self.get_base_consumer().pause(partitions)
}

/// Resume consumption for the provided list of partitions.
fn resume(&self, partitions: &TopicPartitionList) -> KafkaResult<()> {
self.get_base_consumer().resume(partitions)
}
}
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ pub enum KafkaError {
OffsetFetch(RDKafkaError),
/// End of partition reached.
PartitionEOF(i32),
/// Pause/Resume failed.
PauseResume(String),
/// Setting partition offset failed.
SetPartitionOffset(RDKafkaError),
/// Offset store failed.
Expand Down Expand Up @@ -108,6 +110,9 @@ impl fmt::Debug for KafkaError {
KafkaError::Nul(_) => write!(f, "FFI null error"),
KafkaError::OffsetFetch(err) => write!(f, "KafkaError (Offset fetch error: {})", err),
KafkaError::PartitionEOF(part_n) => write!(f, "KafkaError (Partition EOF: {})", part_n),
KafkaError::PauseResume(ref err) => {
write!(f, "KafkaError (Pause/resume error: {})", err)
}
KafkaError::SetPartitionOffset(err) => {
write!(f, "KafkaError (Set partition offset error: {})", err)
}
Expand Down Expand Up @@ -143,6 +148,7 @@ impl fmt::Display for KafkaError {
KafkaError::Nul(_) => write!(f, "FFI nul error"),
KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err),
KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err),
KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err),
KafkaError::Subscription(ref err) => write!(f, "Subscription error: {}", err),
Expand All @@ -168,6 +174,7 @@ impl error::Error for KafkaError {
KafkaError::Nul(_) => "FFI nul error",
KafkaError::OffsetFetch(_) => "Offset fetch error",
KafkaError::PartitionEOF(_) => "Partition EOF error",
KafkaError::PauseResume(_) => "Pause/resume error",
KafkaError::SetPartitionOffset(_) => "Set partition offset error",
KafkaError::StoreOffset(_) => "Store offset error",
KafkaError::Subscription(_) => "Subscription error",
Expand All @@ -192,6 +199,7 @@ impl error::Error for KafkaError {
KafkaError::Nul(_) => None,
KafkaError::OffsetFetch(ref err) => Some(err),
KafkaError::PartitionEOF(_) => None,
KafkaError::PauseResume(_) => None,
KafkaError::SetPartitionOffset(ref err) => Some(err),
KafkaError::StoreOffset(ref err) => Some(err),
KafkaError::Subscription(_) => None,
Expand Down
52 changes: 52 additions & 0 deletions tests/test_consumers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,3 +387,55 @@ fn test_consumer_store_offset_commit() {
position.add_partition_offset(&topic_name, 2, Offset::Offset(12));
assert_eq!(position, consumer.position().unwrap());
}

fn ensure_empty<C: ConsumerContext>(consumer: &BaseConsumer<C>, err_msg: &str) {
const MAX_TRY_TIME: Duration = Duration::from_secs(2);
let start = Instant::now();
while start.elapsed() < MAX_TRY_TIME {
assert!(consumer.poll(MAX_TRY_TIME).is_none(), "{}", err_msg);
}
}

#[test]
fn test_pause_resume_consumer_iter() {
const PAUSE_COUNT: i32 = 3;
const MESSAGE_COUNT: i32 = 300;
const MESSAGES_PER_PAUSE: i32 = MESSAGE_COUNT / PAUSE_COUNT;

let _r = env_logger::try_init();

let topic_name = rand_test_topic();
populate_topic(
&topic_name,
MESSAGE_COUNT,
&value_fn,
&key_fn,
Some(0),
None,
);
let group_id = rand_test_group();
let consumer = create_base_consumer(&group_id, None);
consumer.subscribe(&[topic_name.as_str()]).unwrap();

for _ in 0..PAUSE_COUNT {
let mut num_taken = 0;
for message in consumer.iter().take(MESSAGES_PER_PAUSE as usize) {
message.unwrap();
num_taken += 1;
}
assert_eq!(num_taken, MESSAGES_PER_PAUSE);

let partitions = consumer.assignment().unwrap();
assert!(partitions.count() > 0);
consumer.pause(&partitions).unwrap();

ensure_empty(
&consumer,
"Partition is paused - we should not receive anything",
);

consumer.resume(&partitions).unwrap();
}

ensure_empty(&consumer, "There should be no messages left");
}