From 9674a71ae06196004f16e99969fc9a95e0e3a303 Mon Sep 17 00:00:00 2001 From: Michael Borst Date: Mon, 15 May 2023 12:54:36 +0200 Subject: [PATCH] Pass KafkaError to rebalance hooks It is easier to build generic error handling on top of error codes than on top of the librdkafka-defined string representations intended for human consumption. --- changelog.md | 3 +++ src/consumer/mod.rs | 12 ++++++------ src/error.rs | 6 ++++++ 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/changelog.md b/changelog.md index 5576e8371..1fb2db620 100644 --- a/changelog.md +++ b/changelog.md @@ -4,6 +4,9 @@ See also the [rdkafka-sys changelog](rdkafka-sys/changelog.md). ## Unreleased +* **Breaking change.** Pass `KafkaError` to rebalance hooks instead of human-readable string + representation. + ## 0.30.0 (2023-05-12) * Support for unassigning static partitions by passing `null` to `rdsys::rd_kafka_assign` and expose the diff --git a/src/consumer/mod.rs b/src/consumer/mod.rs index b62a3b577..1b8aa8940 100644 --- a/src/consumer/mod.rs +++ b/src/consumer/mod.rs @@ -8,13 +8,13 @@ use rdkafka_sys as rdsys; use rdkafka_sys::types::*; use crate::client::{Client, ClientContext, NativeClient}; -use crate::error::KafkaResult; +use crate::error::{KafkaError, KafkaResult}; use crate::groups::GroupList; use crate::log::{error, trace}; use crate::message::BorrowedMessage; use crate::metadata::Metadata; use crate::topic_partition_list::{Offset, TopicPartitionList}; -use crate::util::{cstr_to_owned, KafkaDrop, NativePtr, Timeout}; +use crate::util::{KafkaDrop, NativePtr, Timeout}; pub mod base_consumer; pub mod stream_consumer; @@ -33,7 +33,7 @@ pub enum Rebalance<'a> { /// A new partition revocation is received. Revoke(&'a TopicPartitionList), /// Unexpected error from Kafka. - Error(String), + Error(KafkaError), } /// Consumer-specific context. @@ -59,9 +59,9 @@ pub trait ConsumerContext: ClientContext { RDKafkaRespErr::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS => Rebalance::Assign(tpl), RDKafkaRespErr::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS => Rebalance::Revoke(tpl), _ => { - let error = unsafe { cstr_to_owned(rdsys::rd_kafka_err2str(err)) }; - error!("Error rebalancing: {}", error); - Rebalance::Error(error) + let error_code: RDKafkaErrorCode = err.into(); + error!("Error rebalancing: {}", error_code); + Rebalance::Error(KafkaError::Rebalance(error_code)) } }; diff --git a/src/error.rs b/src/error.rs index 7ee4b26d1..c3f046dad 100644 --- a/src/error.rs +++ b/src/error.rs @@ -169,6 +169,8 @@ pub enum KafkaError { PartitionEOF(i32), /// Pause/Resume failed. PauseResume(String), + /// Rebalance failed. + Rebalance(RDKafkaErrorCode), /// Seeking a partition failed. Seek(String), /// Setting partition offset failed. @@ -223,6 +225,7 @@ impl fmt::Debug for KafkaError { KafkaError::PauseResume(ref err) => { write!(f, "KafkaError (Pause/resume error: {})", err) } + KafkaError::Rebalance(ref err) => write!(f, "KafkaError (Rebalance error: {})", err), KafkaError::Seek(ref err) => write!(f, "KafkaError (Seek error: {})", err), KafkaError::SetPartitionOffset(err) => { write!(f, "KafkaError (Set partition offset error: {})", err) @@ -262,6 +265,7 @@ impl fmt::Display for KafkaError { KafkaError::OffsetFetch(err) => write!(f, "Offset fetch error: {}", err), KafkaError::PartitionEOF(part_n) => write!(f, "Partition EOF: {}", part_n), KafkaError::PauseResume(ref err) => write!(f, "Pause/resume error: {}", err), + KafkaError::Rebalance(ref err) => write!(f, "Rebalance error: {}", err), KafkaError::Seek(ref err) => write!(f, "Seek error: {}", err), KafkaError::SetPartitionOffset(err) => write!(f, "Set partition offset error: {}", err), KafkaError::StoreOffset(err) => write!(f, "Store offset error: {}", err), @@ -291,6 +295,7 @@ impl Error for KafkaError { KafkaError::OffsetFetch(err) => Some(err), KafkaError::PartitionEOF(_) => None, KafkaError::PauseResume(_) => None, + KafkaError::Rebalance(err) => Some(err), KafkaError::Seek(_) => None, KafkaError::SetPartitionOffset(err) => Some(err), KafkaError::StoreOffset(err) => Some(err), @@ -328,6 +333,7 @@ impl KafkaError { KafkaError::OffsetFetch(err) => Some(*err), KafkaError::PartitionEOF(_) => None, KafkaError::PauseResume(_) => None, + KafkaError::Rebalance(err) => Some(*err), KafkaError::Seek(_) => None, KafkaError::SetPartitionOffset(err) => Some(*err), KafkaError::StoreOffset(err) => Some(*err),