From 296551f9335f0c1de67bc24a6bf9a4df46d95913 Mon Sep 17 00:00:00 2001 From: Price Hiller Date: Mon, 16 Dec 2024 13:51:19 -0600 Subject: [PATCH] fix(kafka sink): retry messages that result in kafka policy violations Problem: Some messages were getting dropped by Vector due to Kafka throwing `PolicyViolation` errors. These should be retried as a policy can be as simple as a more aggressive rate limit. ---------- Solution: Retry any messages that had the `RDKafkaErrorCode::PolicyViolation` error. ---------- Note: A dynamic back off may be better, as there may be a rate limit out there that still needs more than 100ms to back off on requests. ---------- See the original issue at https://github.com/vectordotdev/vector/issues/22026 Closes https://github.com/vectordotdev/vector/issues/22026 --- changelog.d/22026-retry-kafka-policy-violations.fix.md | 3 +++ src/sinks/kafka/service.rs | 7 +++++-- 2 files changed, 8 insertions(+), 2 deletions(-) create mode 100644 changelog.d/22026-retry-kafka-policy-violations.fix.md diff --git a/changelog.d/22026-retry-kafka-policy-violations.fix.md b/changelog.d/22026-retry-kafka-policy-violations.fix.md new file mode 100644 index 0000000000000..56bc556034812 --- /dev/null +++ b/changelog.d/22026-retry-kafka-policy-violations.fix.md @@ -0,0 +1,3 @@ +Retry Kafka messages that error with `RDKafkaErrorCode::PolicyViolation` so messages are not lost. + +authors: PriceHiller diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index 087795864b594..94017e22e7f36 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -159,9 +159,12 @@ impl Service for KafkaService { }) .map_err(|(err, _)| err); } - // Producer queue is full. + // Producer queue is full or a policy has been violated and the request should + // be retried Err(( - KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), + KafkaError::MessageProduction( + RDKafkaErrorCode::QueueFull | RDKafkaErrorCode::PolicyViolation, + ), original_record, )) => { if blocked_state.is_none() {