Skip to content

Commit

Permalink
Merge pull request #576 from mborst/provide-rich-error-in-rebalance
Browse files Browse the repository at this point in the history
Pass KafkaError to rebalance hooks
  • Loading branch information
davidblewett authored May 17, 2023
2 parents 72e16d0 + 9674a71 commit 0ab2770
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 6 deletions.
3 changes: 3 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md).

## Unreleased

* **Breaking change.** Pass `KafkaError` to rebalance hooks instead of human-readable string
representation.

## 0.30.0 (2023-05-12)

* Support for unassigning static partitions by passing `null` to `rdsys::rd_kafka_assign` and expose the
Expand Down
12 changes: 6 additions & 6 deletions src/consumer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,13 @@ use rdkafka_sys as rdsys;
use rdkafka_sys::types::*;

use crate::client::{Client, ClientContext, NativeClient};
use crate::error::KafkaResult;
use crate::error::{KafkaError, KafkaResult};
use crate::groups::GroupList;
use crate::log::{error, trace};
use crate::message::BorrowedMessage;
use crate::metadata::Metadata;
use crate::topic_partition_list::{Offset, TopicPartitionList};
use crate::util::{cstr_to_owned, KafkaDrop, NativePtr, Timeout};
use crate::util::{KafkaDrop, NativePtr, Timeout};

pub mod base_consumer;
pub mod stream_consumer;
Expand All @@ -33,7 +33,7 @@ pub enum Rebalance<'a> {
/// A new partition revocation is received.
Revoke(&'a TopicPartitionList),
/// Unexpected error from Kafka.
Error(String),
Error(KafkaError),
}

/// Consumer-specific context.
Expand All @@ -59,9 +59,9 @@ pub trait ConsumerContext: ClientContext {
RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => Rebalance::Assign(tpl),
RDKafkaRespErr::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => Rebalance::Revoke(tpl),
_ => {
let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(err)) };
error!("Error rebalancing: {}", error);
Rebalance::Error(error)
let error_code: RDKafkaErrorCode = err.into();
error!("Error rebalancing: {}", error_code);
Rebalance::Error(KafkaError::Rebalance(error_code))
}
};

Expand Down
6 changes: 6 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ pub enum KafkaError {
PartitionEOF(i32),
/// Pause/Resume failed.
PauseResume(String),
/// Rebalance failed.
Rebalance(RDKafkaErrorCode),
/// Seeking a partition failed.
Seek(String),
/// Setting partition offset failed.
Expand Down Expand Up @@ -223,6 +225,7 @@ impl fmt::Debug for KafkaError {
KafkaError::PauseResume(ref err) => {
write!(f, "KafkaError (Pause/resume error: {})", err)
}
KafkaError::Rebalance(ref err) => write!(f, "KafkaError (Rebalance error: {})", err),
KafkaError::Seek(ref err) => write!(f, "KafkaError (Seek error: {})", err),
KafkaError::SetPartitionOffset(err) => {
write!(f, "KafkaError (Set partition offset error: {})", err)
Expand Down Expand Up @@ -262,6 +265,7 @@ impl fmt::Display for KafkaError {
KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err),
KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n),
KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err),
KafkaError::Rebalance(ref err) => write!(f, "Rebalance error: {}", err),
KafkaError::Seek(ref err) => write!(f, "Seek error: {}", err),
KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err),
KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err),
Expand Down Expand Up @@ -291,6 +295,7 @@ impl Error for KafkaError {
KafkaError::OffsetFetch(err) => Some(err),
KafkaError::PartitionEOF(_) => None,
KafkaError::PauseResume(_) => None,
KafkaError::Rebalance(err) => Some(err),
KafkaError::Seek(_) => None,
KafkaError::SetPartitionOffset(err) => Some(err),
KafkaError::StoreOffset(err) => Some(err),
Expand Down Expand Up @@ -328,6 +333,7 @@ impl KafkaError {
KafkaError::OffsetFetch(err) => Some(*err),
KafkaError::PartitionEOF(_) => None,
KafkaError::PauseResume(_) => None,
KafkaError::Rebalance(err) => Some(*err),
KafkaError::Seek(_) => None,
KafkaError::SetPartitionOffset(err) => Some(*err),
KafkaError::StoreOffset(err) => Some(*err),
Expand Down

0 comments on commit 0ab2770

Please sign in to comment.