diff --git a/src/sinks/kafka/service.rs b/src/sinks/kafka/service.rs index 0f1d122b7750c..607e7cd5fd4ea 100644 --- a/src/sinks/kafka/service.rs +++ b/src/sinks/kafka/service.rs @@ -1,11 +1,18 @@ -use std::task::{Context, Poll}; +use std::{ + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, + task::{Context, Poll}, + time::Duration, +}; use bytes::Bytes; use rdkafka::{ error::KafkaError, message::OwnedHeaders, producer::{FutureProducer, FutureRecord}, - util::Timeout, + types::RDKafkaErrorCode, }; use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*}; @@ -59,16 +66,38 @@ impl MetaDescriptive for KafkaRequest { } } +/// BlockedRecordState manages state for a record blocked from being enqueued on the producer. +struct BlockedRecordState { + records_blocked: Arc, +} + +impl BlockedRecordState { + fn new(records_blocked: Arc) -> Self { + records_blocked.fetch_add(1, Ordering::Relaxed); + Self { records_blocked } + } +} + +impl Drop for BlockedRecordState { + fn drop(&mut self) { + self.records_blocked.fetch_sub(1, Ordering::Relaxed); + } +} + #[derive(Clone)] pub struct KafkaService { kafka_producer: FutureProducer, + + /// The number of records blocked from being enqueued on the producer. + records_blocked: Arc, } impl KafkaService { - pub(crate) const fn new( - kafka_producer: FutureProducer, - ) -> KafkaService { - KafkaService { kafka_producer } + pub(crate) fn new(kafka_producer: FutureProducer) -> KafkaService { + KafkaService { + kafka_producer, + records_blocked: Arc::new(AtomicUsize::new(0)), + } } } @@ -78,13 +107,21 @@ impl Service for KafkaService { type Future = BoxFuture<'static, Result>; fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) + // The Kafka service is at capacity if any records are currently blocked from being enqueued + // on the producer. + if self.records_blocked.load(Ordering::Relaxed) > 0 { + Poll::Pending + } else { + Poll::Ready(Ok(())) + } } fn call(&mut self, request: KafkaRequest) -> Self::Future { let this = self.clone(); Box::pin(async move { + let raw_byte_size = + request.body.len() + request.metadata.key.as_ref().map_or(0, |x| x.len()); let event_byte_size = request .request_metadata .into_events_estimated_json_encoded_byte_size(); @@ -101,17 +138,39 @@ impl Service for KafkaService { record = record.headers(headers); } - // rdkafka will internally retry forever if the queue is full - match this.kafka_producer.send(record, Timeout::Never).await { - Ok((_partition, _offset)) => { - let raw_byte_size = - request.body.len() + request.metadata.key.map_or(0, |x| x.len()); - Ok(KafkaResponse { - event_byte_size, - raw_byte_size, - }) - } - Err((kafka_err, _original_record)) => Err(kafka_err), + // Manually poll [FutureProducer::send_result] instead of [FutureProducer::send] to track + // records that fail to be enqueued on the producer. + let mut blocked_state: Option = None; + loop { + match this.kafka_producer.send_result(record) { + // Record was successfully enqueued on the producer. + Ok(fut) => { + // Drop the blocked state (if any), as the producer is no longer blocked. + drop(blocked_state.take()); + return fut + .await + .expect("producer unexpectedly dropped") + .map(|_| KafkaResponse { + event_byte_size, + raw_byte_size, + }) + .map_err(|(err, _)| err); + } + // Producer queue is full. + Err(( + KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull), + original_record, + )) => { + if blocked_state.is_none() { + blocked_state = + Some(BlockedRecordState::new(Arc::clone(&this.records_blocked))); + } + record = original_record; + tokio::time::sleep(Duration::from_millis(100)).await; + } + // A different error occurred. + Err((err, _)) => return Err(err), + }; } }) } diff --git a/src/sinks/kafka/sink.rs b/src/sinks/kafka/sink.rs index 141c32f7cb3b7..db4395db15799 100644 --- a/src/sinks/kafka/sink.rs +++ b/src/sinks/kafka/sink.rs @@ -6,7 +6,6 @@ use rdkafka::{ }; use snafu::{ResultExt, Snafu}; use tokio::time::Duration; -use tower::limit::ConcurrencyLimit; use vrl::path::OwnedTargetPath; use super::config::{KafkaRole, KafkaSinkConfig}; @@ -62,11 +61,6 @@ impl KafkaSink { } async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { - // rdkafka will internally retry forever, so we need some limit to prevent this from overflowing. - // 64 should be plenty concurrency here, as a rdkafka send operation does not block until its underlying - // buffer is full. - let service = ConcurrencyLimit::new(self.service.clone(), 64); - let request_builder = KafkaRequestBuilder { key_field: self.key_field, headers_key: self.headers_key, @@ -100,8 +94,7 @@ impl KafkaSink { Ok(req) => Some(req), } }) - .into_driver(service) - .protocol("kafka") + .into_driver(self.service) .protocol("kafka") .run() .await