Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(kafka sink): Make KafkaService return Poll::Pending when producer queue is full #18770

Merged
merged 4 commits into from
Oct 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 77 additions & 18 deletions src/sinks/kafka/service.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
use std::task::{Context, Poll};
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
task::{Context, Poll},
time::Duration,
};

use bytes::Bytes;
use rdkafka::{
error::KafkaError,
message::OwnedHeaders,
producer::{FutureProducer, FutureRecord},
util::Timeout,
types::RDKafkaErrorCode,
};

use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*};
Expand Down Expand Up @@ -59,16 +66,38 @@ impl MetaDescriptive for KafkaRequest {
}
}

/// BlockedRecordState manages state for a record blocked from being enqueued on the producer.
struct BlockedRecordState {
records_blocked: Arc<AtomicUsize>,
}

impl BlockedRecordState {
fn new(records_blocked: Arc<AtomicUsize>) -> Self {
records_blocked.fetch_add(1, Ordering::Relaxed);
Self { records_blocked }
}
}

impl Drop for BlockedRecordState {
fn drop(&mut self) {
self.records_blocked.fetch_sub(1, Ordering::Relaxed);
}
}

#[derive(Clone)]
pub struct KafkaService {
kafka_producer: FutureProducer<KafkaStatisticsContext>,

/// The number of records blocked from being enqueued on the producer.
records_blocked: Arc<AtomicUsize>,
}

impl KafkaService {
pub(crate) const fn new(
kafka_producer: FutureProducer<KafkaStatisticsContext>,
) -> KafkaService {
KafkaService { kafka_producer }
pub(crate) fn new(kafka_producer: FutureProducer<KafkaStatisticsContext>) -> KafkaService {
KafkaService {
kafka_producer,
records_blocked: Arc::new(AtomicUsize::new(0)),
}
}
}

Expand All @@ -78,13 +107,21 @@ impl Service<KafkaRequest> for KafkaService {
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
// The Kafka service is at capacity if any records are currently blocked from being enqueued
// on the producer.
if self.records_blocked.load(Ordering::Relaxed) > 0 {
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
}

fn call(&mut self, request: KafkaRequest) -> Self::Future {
let this = self.clone();

Box::pin(async move {
let raw_byte_size =
request.body.len() + request.metadata.key.as_ref().map_or(0, |x| x.len());
let event_byte_size = request
.request_metadata
.into_events_estimated_json_encoded_byte_size();
Expand All @@ -101,17 +138,39 @@ impl Service<KafkaRequest> for KafkaService {
record = record.headers(headers);
}

// rdkafka will internally retry forever if the queue is full
match this.kafka_producer.send(record, Timeout::Never).await {
Ok((_partition, _offset)) => {
let raw_byte_size =
request.body.len() + request.metadata.key.map_or(0, |x| x.len());
Ok(KafkaResponse {
event_byte_size,
raw_byte_size,
})
}
Err((kafka_err, _original_record)) => Err(kafka_err),
// Manually poll [FutureProducer::send_result] instead of [FutureProducer::send] to track
// records that fail to be enqueued on the producer.
let mut blocked_state: Option<BlockedRecordState> = None;
loop {
match this.kafka_producer.send_result(record) {
// Record was successfully enqueued on the producer.
Ok(fut) => {
// Drop the blocked state (if any), as the producer is no longer blocked.
drop(blocked_state.take());
return fut
.await
.expect("producer unexpectedly dropped")
.map(|_| KafkaResponse {
event_byte_size,
raw_byte_size,
})
.map_err(|(err, _)| err);
}
// Producer queue is full.
Err((
KafkaError::MessageProduction(RDKafkaErrorCode::QueueFull),
original_record,
)) => {
if blocked_state.is_none() {
blocked_state =
Some(BlockedRecordState::new(Arc::clone(&this.records_blocked)));
}
record = original_record;
tokio::time::sleep(Duration::from_millis(100)).await;
}
// A different error occurred.
Err((err, _)) => return Err(err),
};
}
})
}
Expand Down
9 changes: 1 addition & 8 deletions src/sinks/kafka/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ use rdkafka::{
};
use snafu::{ResultExt, Snafu};
use tokio::time::Duration;
use tower::limit::ConcurrencyLimit;
use vrl::path::OwnedTargetPath;

use super::config::{KafkaRole, KafkaSinkConfig};
Expand Down Expand Up @@ -62,11 +61,6 @@ impl KafkaSink {
}

async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
// rdkafka will internally retry forever, so we need some limit to prevent this from overflowing.
// 64 should be plenty concurrency here, as a rdkafka send operation does not block until its underlying
// buffer is full.
let service = ConcurrencyLimit::new(self.service.clone(), 64);

let request_builder = KafkaRequestBuilder {
key_field: self.key_field,
headers_key: self.headers_key,
Expand Down Expand Up @@ -100,8 +94,7 @@ impl KafkaSink {
Ok(req) => Some(req),
}
})
.into_driver(service)
.protocol("kafka")
.into_driver(self.service)
.protocol("kafka")
.run()
.await
Expand Down
Loading