diff --git a/Cargo.lock b/Cargo.lock index f2a2e534c4774..55f2fd3a6f92f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6321,15 +6321,6 @@ dependencies = [ "libc", ] -[[package]] -name = "num_enum" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1f646caf906c20226733ed5b1374287eb97e3c2a5c227ce668c1f2ce20ae57c9" -dependencies = [ - "num_enum_derive 0.5.11", -] - [[package]] name = "num_enum" version = "0.6.1" @@ -6348,18 +6339,6 @@ dependencies = [ "num_enum_derive 0.7.3", ] -[[package]] -name = "num_enum_derive" -version = "0.5.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dcbff9bc912032c62bf65ef1d5aea88983b420f4f839db1e9b0c281a25c9c799" -dependencies = [ - "proc-macro-crate 1.3.1", - "proc-macro2 1.0.92", - "quote 1.0.37", - "syn 1.0.109", -] - [[package]] name = "num_enum_derive" version = "0.6.1" @@ -7904,9 +7883,9 @@ dependencies = [ [[package]] name = "rdkafka" -version = "0.35.0" +version = "0.37.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f16c17f411935214a5870e40aff9291f8b40a73e97bf8de29e5959c473d5ef33" +checksum = "14b52c81ac3cac39c9639b95c20452076e74b8d9a71bc6fc4d83407af2ea6fff" dependencies = [ "futures-channel", "futures-util", @@ -7922,15 +7901,15 @@ dependencies = [ [[package]] name = "rdkafka-sys" -version = "4.7.0+2.3.0" +version = "4.8.0+2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "55e0d2f9ba6253f6ec72385e453294f8618e9e15c2c6aba2a5c01ccf9622d615" +checksum = "ced38182dc436b3d9df0c77976f37a67134df26b050df1f0006688e46fc4c8be" dependencies = [ "cmake", "curl-sys", "libc", "libz-sys", - "num_enum 0.5.11", + "num_enum 0.7.3", "openssl-sys", "pkg-config", "sasl2-sys", diff --git a/Cargo.toml b/Cargo.toml index 12bb8fe919ca6..5df9dce099a04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -350,7 +350,7 @@ postgres-openssl = { version = "0.5.0", default-features = false, features = ["r pulsar = { version = "6.3.0", default-features = false, features = ["tokio-runtime", "auth-oauth2", "flate2", "lz4", "snap", "zstd"], optional = true } rand.workspace = true rand_distr = { version = "0.4.3", default-features = false } -rdkafka = { version = "0.35.0", default-features = false, features = ["curl-static", "tokio", "libz", "ssl", "zstd"], optional = true } +rdkafka = { version = "0.37.0", default-features = false, features = ["curl-static", "tokio", "libz", "ssl", "zstd"], optional = true } redis = { version = "0.24.0", default-features = false, features = ["connection-manager", "tokio-comp", "tokio-native-tls-comp"], optional = true } regex = { version = "1.11.1", default-features = false, features = ["std", "perf"] } roaring = { version = "0.10.7", default-features = false, features = ["std"], optional = true } diff --git a/src/sources/kafka.rs b/src/sources/kafka.rs index bfa35e11924e1..7d0c1cdf26904 100644 --- a/src/sources/kafka.rs +++ b/src/sources/kafka.rs @@ -16,8 +16,8 @@ use futures::{Stream, StreamExt}; use futures_util::future::OptionFuture; use rdkafka::{ consumer::{ - stream_consumer::StreamPartitionQueue, CommitMode, Consumer, ConsumerContext, Rebalance, - StreamConsumer, + stream_consumer::StreamPartitionQueue, BaseConsumer, CommitMode, Consumer, ConsumerContext, + Rebalance, StreamConsumer, }, error::KafkaError, message::{BorrowedMessage, Headers as _, Message}, @@ -1369,15 +1369,11 @@ impl ClientContext for KafkaSourceContext { } impl ConsumerContext for KafkaSourceContext { - fn pre_rebalance(&self, rebalance: &Rebalance) { + fn pre_rebalance(&self, _base_consumer: &BaseConsumer, rebalance: &Rebalance) { match rebalance { Rebalance::Assign(tpl) => self.consume_partitions(tpl), Rebalance::Revoke(tpl) => { - // TODO workaround for https://github.com/fede1024/rust-rdkafka/issues/681 - if tpl.capacity() == 0 { - return; - } self.revoke_partitions(tpl); self.commit_consumer_state(); }