Skip to content

Commit

Permalink
fix(kafka sink): Make KafkaService return Poll::Pending when produc…
Browse files Browse the repository at this point in the history
…er queue is full (vectordotdev#18770)

* fix(kafka sink): set concurrency limits equal to kafka producer queue limits

* use send_result to better track state

* nits

* clippy
  • Loading branch information
dsmith3197 authored Oct 24, 2023
1 parent 78934c2 commit a1863e6
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 26 deletions.
95 changes: 77 additions & 18 deletions src/sinks/kafka/service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use std::task::{Context, Poll};
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::Duration,
};

use bytes::Bytes;
use rdkafka::{
error::KafkaError,
message::OwnedHeaders,
producer::{FutureProducer, FutureRecord},
util::Timeout,
types::RDKafkaErrorCode,
};

use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*};
Expand Down Expand Up @@ -59,16 +66,38 @@ impl MetaDescriptive for KafkaRequest {
}
}

/// BlockedRecordState manages state for a record blocked from being enqueued on the producer.
struct BlockedRecordState {
records_blocked: Arc<AtomicUsize>,
}

impl BlockedRecordState {
fn new(records_blocked: Arc<AtomicUsize>) -> Self {
records_blocked.fetch_add(1, Ordering::Relaxed);
Self { records_blocked }
}
}

impl Drop for BlockedRecordState {
fn drop(&mut self) {
self.records_blocked.fetch_sub(1, Ordering::Relaxed);
}
}

#[derive(Clone)]
pub struct KafkaService {
kafka_producer: FutureProducer<KafkaStatisticsContext>,

/// The number of records blocked from being enqueued on the producer.
records_blocked: Arc<AtomicUsize>,
}

impl KafkaService {
pub(crate) const fn new(
kafka_producer: FutureProducer<KafkaStatisticsContext>,
) -> KafkaService {
KafkaService { kafka_producer }
pub(crate) fn new(kafka_producer: FutureProducer<KafkaStatisticsContext>) -> KafkaService {
KafkaService {
kafka_producer,
records_blocked: Arc::new(AtomicUsize::new(0)),
}
}
}

Expand All @@ -78,13 +107,21 @@ impl Service<KafkaRequest> for KafkaService {
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
// The Kafka service is at capacity if any records are currently blocked from being enqueued
// on the producer.
if self.records_blocked.load(Ordering::Relaxed) > 0 {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}

fn call(&mut self, request: KafkaRequest) -> Self::Future {
let this = self.clone();

Box::pin(async move {
let raw_byte_size =
request.body.len() + request.metadata.key.as_ref().map_or(0, |x| x.len());
let event_byte_size = request
.request_metadata
.into_events_estimated_json_encoded_byte_size();
Expand All @@ -101,17 +138,39 @@ impl Service<KafkaRequest> for KafkaService {
record = record.headers(headers);
}

// rdkafka will internally retry forever if the queue is full
match this.kafka_producer.send(record, Timeout::Never).await {
Ok((_partition, _offset)) => {
let raw_byte_size =
request.body.len() + request.metadata.key.map_or(0, |x| x.len());
Ok(KafkaResponse {
event_byte_size,
raw_byte_size,
})
}
Err((kafka_err, _original_record)) => Err(kafka_err),
// Manually poll [FutureProducer::send_result] instead of [FutureProducer::send] to track
// records that fail to be enqueued on the producer.
let mut blocked_state: Option<BlockedRecordState> = None;
loop {
match this.kafka_producer.send_result(record) {
// Record was successfully enqueued on the producer.
Ok(fut) => {
// Drop the blocked state (if any), as the producer is no longer blocked.
drop(blocked_state.take());
return fut
.await
.expect("producer unexpectedly dropped")
.map(|_| KafkaResponse {
event_byte_size,
raw_byte_size,
})
.map_err(|(err, _)| err);
}
// Producer queue is full.
Err((
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull),
original_record,
)) => {
if blocked_state.is_none() {
blocked_state =
Some(BlockedRecordState::new(Arc::clone(&this.records_blocked)));
}
record = original_record;
tokio::time::sleep(Duration::from_millis(100)).await;
}
// A different error occurred.
Err((err, _)) => return Err(err),
};
}
})
}
Expand Down
9 changes: 1 addition & 8 deletions src/sinks/kafka/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use rdkafka::{
};
use snafu::{ResultExt, Snafu};
use tokio::time::Duration;
use tower::limit::ConcurrencyLimit;
use vrl::path::OwnedTargetPath;

use super::config::{KafkaRole, KafkaSinkConfig};
Expand Down Expand Up @@ -62,11 +61,6 @@ impl KafkaSink {
}

async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
// rdkafka will internally retry forever, so we need some limit to prevent this from overflowing.
// 64 should be plenty concurrency here, as a rdkafka send operation does not block until its underlying
// buffer is full.
let service = ConcurrencyLimit::new(self.service.clone(), 64);

let request_builder = KafkaRequestBuilder {
key_field: self.key_field,
headers_key: self.headers_key,
Expand Down Expand Up @@ -100,8 +94,7 @@ impl KafkaSink {
Ok(req) => Some(req),
}
})
.into_driver(service)
.protocol("kafka")
.into_driver(self.service)
.protocol("kafka")
.run()
.await
Expand Down

0 comments on commit a1863e6

Please sign in to comment.