diff --git a/changelog.d/22026-retry-kafka-policy-violations.fix.md b/changelog.d/22026-retry-kafka-policy-violations.fix.md new file mode 100644 index 0000000000000..56bc556034812 --- /dev/null +++ b/changelog.d/22026-retry-kafka-policy-violations.fix.md @@ -0,0 +1,3 @@ +Retry Kafka messages that error with `RDKafkaErrorCode::PolicyViolation` so messages are not lost. + +authors: PriceHiller diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index 087795864b594..94017e22e7f36 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -159,9 +159,12 @@ impl Service for KafkaService { }) .map_err(|(err, _)| err); } - // Producer queue is full. + // Producer queue is full or a policy has been violated and the request should + // be retried Err(( - KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), + KafkaError::MessageProduction( + RDKafkaErrorCode::QueueFull | RDKafkaErrorCode::PolicyViolation, + ), original_record, )) => { if blocked_state.is_none() {