Skip to content

Commit

Permalink
chore: add VectorSink::Sink (#4846)
Browse files Browse the repository at this point in the history
Signed-off-by: Kirill Fomichev <fanatid@ya.ru>
  • Loading branch information
fanatid authored Nov 6, 2020
1 parent aade9e6 commit c526860
Show file tree
Hide file tree
Showing 3 changed files with 237 additions and 145 deletions.
186 changes: 116 additions & 70 deletions src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,34 @@ use crate::{
sinks::util::encoding::{EncodingConfig, EncodingConfigWithDefault, EncodingConfiguration},
template::{Template, TemplateError},
};
use futures::{compat::Compat, FutureExt};
use futures01::{
future as future01, stream::FuturesUnordered, Async, AsyncSink, Future, Poll, Sink, StartSend,
Stream,
use futures::{
channel::oneshot::Canceled, future::BoxFuture, ready, stream::FuturesUnordered, FutureExt,
Sink, Stream, TryFutureExt,
};
use rdkafka::{
consumer::{BaseConsumer, Consumer},
error::{KafkaError, RDKafkaError},
producer::{DeliveryFuture, FutureProducer, FutureRecord},
ClientConfig,
};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::time::Duration;
use std::{
collections::{HashMap, HashSet},
convert::TryFrom,
pin::Pin,
sync::Arc,
task::{Context, Poll},
};
use tokio::time::{delay_for, Duration};

type MetadataFuture<F, M> = future01::Join<F, future01::FutureResult<M, <F as Future>::Error>>;
// Maximum number of futures blocked by [send_result](https://docs.rs/rdkafka/0.24.0/rdkafka/producer/future_producer/struct.FutureProducer.html#method.send_result)
const SEND_RESULT_LIMIT: usize = 5;

#[derive(Debug, Snafu)]
enum BuildError {
#[snafu(display("creating kafka producer failed: {}", source))]
KafkaCreateFailed { source: rdkafka::error::KafkaError },
KafkaCreateFailed { source: KafkaError },
#[snafu(display("invalid topic template: {}", source))]
TopicTemplate { source: TemplateError },
}
Expand Down Expand Up @@ -67,11 +74,14 @@ pub enum Encoding {
}

pub struct KafkaSink {
producer: FutureProducer,
producer: Arc<FutureProducer>,
topic: Template,
key_field: Option<String>,
encoding: EncodingConfig<Encoding>,
in_flight: FuturesUnordered<MetadataFuture<Compat<DeliveryFuture>, usize>>,
delivery_fut: FuturesUnordered<BoxFuture<'static, (usize, Result<DeliveryFuture, KafkaError>)>>,
in_flight: FuturesUnordered<
BoxFuture<'static, (usize, Result<Result<(i32, i64), KafkaError>, Canceled>)>,
>,

acker: Acker,
seq_head: usize,
Expand Down Expand Up @@ -104,7 +114,7 @@ impl SinkConfig for KafkaSinkConfig {
) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
let sink = KafkaSink::new(self.clone(), cx.acker())?;
let hc = healthcheck(self.clone()).boxed();
Ok((super::VectorSink::Futures01Sink(Box::new(sink)), hc))
Ok((super::VectorSink::Sink(Box::new(sink)), hc))
}

fn input_type(&self) -> DataType {
Expand All @@ -117,8 +127,8 @@ impl SinkConfig for KafkaSinkConfig {
}

impl KafkaSinkConfig {
fn to_rdkafka(&self) -> crate::Result<rdkafka::ClientConfig> {
let mut client_config = rdkafka::ClientConfig::new();
fn to_rdkafka(&self) -> crate::Result<ClientConfig> {
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", &self.bootstrap_servers)
.set("compression.codec", &to_string(self.compression))
Expand All @@ -141,95 +151,133 @@ impl KafkaSink {
fn new(config: KafkaSinkConfig, acker: Acker) -> crate::Result<Self> {
let producer = config.to_rdkafka()?.create().context(KafkaCreateFailed)?;
Ok(KafkaSink {
producer,
producer: Arc::new(producer),
topic: Template::try_from(config.topic).context(TopicTemplate)?,
key_field: config.key_field,
encoding: config.encoding.into(),
delivery_fut: FuturesUnordered::new(),
in_flight: FuturesUnordered::new(),
acker,
seq_head: 0,
seq_tail: 0,
pending_acks: HashSet::new(),
})
}

fn poll_delivery_fut(&mut self, cx: &mut Context<'_>) -> Poll<()> {
while !self.delivery_fut.is_empty() {
let result = Pin::new(&mut self.delivery_fut).poll_next(cx);
let (seqno, result) = ready!(result).expect("`delivery_fut` is endless stream");
self.in_flight.push(Box::pin(async move {
let result = match result {
Ok(fut) => {
fut.map_ok(|result| result.map_err(|(error, _owned_message)| error))
.await
}
Err(error) => Ok(Err(error)),
};

(seqno, result)
}));
}

Poll::Ready(())
}
}

impl Sink for KafkaSink {
type SinkItem = Event;
type SinkError = ();
impl Sink<Event> for KafkaSink {
type Error = ();

fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.poll_delivery_fut(cx) {
Poll::Pending if self.delivery_fut.len() >= SEND_RESULT_LIMIT => Poll::Pending,
_ => Poll::Ready(Ok(())),
}
}

fn start_send(mut self: Pin<&mut Self>, item: Event) -> Result<(), Self::Error> {
assert!(
self.delivery_fut.len() < SEND_RESULT_LIMIT,
"Expected `poll_ready` to be called first."
);

fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
let topic = self.topic.render_string(&item).map_err(|missing_keys| {
error!(message = "Missing keys for topic.", missing_keys = ?missing_keys);
})?;

let (key, body) = encode_event(item.clone(), &self.key_field, &self.encoding);

let mut record = FutureRecord::to(&topic).key(&key).payload(&body[..]);
let seqno = self.seq_head;
self.seq_head += 1;

if let Some(Value::Timestamp(timestamp)) = item.as_log().get(log_schema().timestamp_key()) {
record = record.timestamp(timestamp.timestamp_millis());
}
let producer = Arc::clone(&self.producer);
self.delivery_fut.push(Box::pin(async move {
let mut record = FutureRecord::to(&topic).key(&key).payload(&body[..]);
if let Some(Value::Timestamp(timestamp)) =
item.as_log().get(log_schema().timestamp_key())
{
record = record.timestamp(timestamp.timestamp_millis());
}

debug!(message = "Sending event.", count = 1);
let future = match self.producer.send_result(record) {
Ok(f) => f,
Err((error, record)) => {
// Docs suggest this will only happen when the producer queue is full, so let's
// treat it as we do full buffers in other sinks
debug!(message = "The rdkafka queue full.", %error);
self.poll_complete()?;

match self.producer.send_result(record) {
Ok(f) => f,
Err((error, _record)) => {
debug!(message = "The rdkafka queue still full.", %error);
return Ok(AsyncSink::NotReady(item));
let result = loop {
debug!(message = "Sending event.", count = 1);
match producer.send_result(record) {
Ok(future) => break Ok(future),
// Try again if queue is full.
// See item 4 on GitHub: https://github.com/timberio/vector/pull/101#issue-257150924
// https://docs.rs/rdkafka/0.24.0/src/rdkafka/producer/future_producer.rs.html#296
Err((error, future_record))
if error == KafkaError::MessageProduction(RDKafkaError::QueueFull) =>
{
debug!(message = "The rdkafka queue full.", %error, %seqno, rate_limit_secs = 1);
record = future_record;
delay_for(Duration::from_millis(10)).await;
}
Err((error, _)) => break Err(error),
}
}
};
};

let seqno = self.seq_head;
self.seq_head += 1;
(seqno, result)
}));

self.in_flight
.push(Compat::new(future).join(future01::ok(seqno)));
Ok(AsyncSink::Ready)
Ok(())
}

fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
loop {
match self.in_flight.poll() {
// nothing ready yet
Ok(Async::NotReady) => return Ok(Async::NotReady),
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
ready!(self.poll_delivery_fut(cx));

// nothing in flight
Ok(Async::Ready(None)) => return Ok(Async::Ready(())),

// request finished, check for success
Ok(Async::Ready(Some((result, seqno)))) => {
let this = Pin::into_inner(self);
while !this.in_flight.is_empty() {
match ready!(Pin::new(&mut this.in_flight).poll_next(cx)) {
Some((seqno, Ok(result))) => {
match result {
Ok((partition, offset)) => trace!(
message = "Produced message.", parition = ?partition, offset = ?offset
),
Err((error, _msg)) => error!(message = "Kafka error.", %error),
Ok((partition, offset)) => {
trace!(message = "Produced message.", ?partition, ?offset)
}
Err(error) => error!(message = "Kafka error.", %error),
};

self.pending_acks.insert(seqno);
this.pending_acks.insert(seqno);

let mut num_to_ack = 0;
while self.pending_acks.remove(&self.seq_tail) {
while this.pending_acks.remove(&this.seq_tail) {
num_to_ack += 1;
self.seq_tail += 1
this.seq_tail += 1
}
self.acker.ack(num_to_ack);
this.acker.ack(num_to_ack);
}

// request got canceled (according to docs)
Err(error) => error!(message = "Delivery future canceled.", %error),
Some((_, Err(Canceled))) => {
error!(message = "Request canceled.");
return Poll::Ready(Err(()));
}
None => break,
}
}

Poll::Ready(Ok(()))
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.poll_flush(cx)
}
}

Expand Down Expand Up @@ -366,7 +414,7 @@ mod integration_test {
test_util::{random_lines_with_stream, random_string, wait_for},
tls::TlsOptions,
};
use futures::{compat::Sink01CompatExt, future, SinkExt, StreamExt};
use futures::{future, StreamExt};
use rdkafka::{
consumer::{BaseConsumer, Consumer},
Message, Offset, TopicPartitionList,
Expand Down Expand Up @@ -499,9 +547,7 @@ mod integration_test {

let num_events = 1000;
let (input, events) = random_lines_with_stream(100, num_events);
let mut events = events.map(Ok);

let _ = sink.sink_compat().send_all(&mut events).await.unwrap();
events.map(Ok).forward(sink).await.unwrap();

// read back everything from the beginning
let mut client_config = rdkafka::ClientConfig::new();
Expand Down
12 changes: 7 additions & 5 deletions src/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use crate::Event;
use futures::{
compat::{Compat, Future01CompatExt},
future::BoxFuture,
StreamExt, TryFutureExt,
Sink, Stream, StreamExt, TryFutureExt,
};
use futures01::Stream;
use futures01::{Sink as Sink01, Stream as Stream01};
use snafu::Snafu;
use std::fmt;

Expand Down Expand Up @@ -70,7 +70,8 @@ pub mod statsd;
pub mod vector;

pub enum VectorSink {
Futures01Sink(Box<dyn futures01::Sink<SinkItem = Event, SinkError = ()> + Send + 'static>),
Futures01Sink(Box<dyn Sink01<SinkItem = Event, SinkError = ()> + Send + 'static>),
Sink(Box<dyn Sink<Event, Error = ()> + Send + Unpin>),
Stream(Box<dyn util::StreamSink + Send>),
}

Expand Down Expand Up @@ -99,7 +100,7 @@ pub enum HealthcheckError {
impl VectorSink {
pub async fn run<S>(mut self, input: S) -> Result<(), ()>
where
S: futures::Stream<Item = Event> + Send + 'static,
S: Stream<Item = Event> + Send + 'static,
{
match self {
Self::Futures01Sink(sink) => {
Expand All @@ -108,13 +109,14 @@ impl VectorSink {
let inner = Compat::new(Box::pin(input.map(Ok)));
inner.forward(sink).compat().map_ok(|_| ()).await
}
Self::Sink(sink) => input.map(Ok).forward(sink).await,
Self::Stream(ref mut s) => s.run(Box::pin(input)).await,
}
}

pub fn into_futures01sink(
self,
) -> Box<dyn futures01::Sink<SinkItem = Event, SinkError = ()> + Send + 'static> {
) -> Box<dyn Sink01<SinkItem = Event, SinkError = ()> + Send + 'static> {
match self {
Self::Futures01Sink(sink) => sink,
_ => panic!("Failed type coercion, {:?} is not a Futures01Sink", self),
Expand Down
Loading

0 comments on commit c526860

Please sign in to comment.