Skip to content

Commit

Permalink
fix(datadog_logs sink): abort serialization and split batch when payl…
Browse files Browse the repository at this point in the history
…oad is too large (vectordotdev#19189)

* add failing test

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>

* abort and split batch serialization when too large

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>

* clippy

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>

* Update src/sinks/datadog/logs/sink.rs

Co-authored-by: Doug Smith <dsmith3197@users.noreply.github.com>

* do not double count byte size when splitting

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>

* emit dropped event

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>

* add changelog entry

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>

* Update changelog.d/OPW-86.fix.md

Co-authored-by: neuronull <neuronull@pm.me>

* rename changelog fragment

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>

* use dougs idea

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>

* remove unnecessary clone

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>

---------

Signed-off-by: Luke Steensen <luke.steensen@gmail.com>
Co-authored-by: Doug Smith <dsmith3197@users.noreply.github.com>
Co-authored-by: neuronull <neuronull@pm.me>
  • Loading branch information
3 people authored and AndrooTheChen committed Sep 23, 2024
1 parent 375b3eb commit e9be29a
Show file tree
Hide file tree
Showing 3 changed files with 205 additions and 137 deletions.
2 changes: 2 additions & 0 deletions changelog.d/datadog_logs_batching.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Fixed an issue where the `datadog_logs` sink could produce a request larger than the allowed API
limit.
292 changes: 155 additions & 137 deletions src/sinks/datadog/logs/sink.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
use std::{fmt::Debug, io, sync::Arc};
use std::{collections::VecDeque, fmt::Debug, io, sync::Arc};

use bytes::Bytes;
use snafu::Snafu;
use vector_lib::codecs::{encoding::Framer, CharacterDelimitedEncoder, JsonSerializerConfig};
use vector_lib::lookup::event_path;
use vector_lib::{
internal_event::{ComponentEventsDropped, UNINTENTIONAL},
lookup::event_path,
};

use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest};
use crate::sinks::{
prelude::*,
util::{encoding::Encoder as _, Compressor},
util::{http::HttpJsonBatchSizer, Compressor},
};
#[derive(Default)]
struct EventPartitioner;
Expand All @@ -24,7 +25,7 @@ impl Partitioner for EventPartitioner {

#[derive(Debug)]
pub struct LogSinkBuilder<S> {
encoding: JsonEncoding,
transformer: Transformer,
service: S,
batch_settings: BatcherSettings,
compression: Option<Compression>,
Expand All @@ -41,7 +42,7 @@ impl<S> LogSinkBuilder<S> {
protocol: String,
) -> Self {
Self {
encoding: JsonEncoding::new(transformer),
transformer,
service,
default_api_key,
batch_settings,
Expand All @@ -58,7 +59,7 @@ impl<S> LogSinkBuilder<S> {
pub fn build(self) -> LogSink<S> {
LogSink {
default_api_key: self.default_api_key,
encoding: self.encoding,
transformer: self.transformer,
service: self.service,
batch_settings: self.batch_settings,
compression: self.compression.unwrap_or_default(),
Expand All @@ -78,7 +79,7 @@ pub struct LogSink<S> {
/// The API service
service: S,
/// The encoding of payloads
encoding: JsonEncoding,
transformer: Transformer,
/// The compression technique to use when building the request body
compression: Compression,
/// Batch settings: timeout, max events, max bytes, etc.
Expand All @@ -87,69 +88,38 @@ pub struct LogSink<S> {
protocol: String,
}

/// Customized encoding specific to the Datadog Logs sink, as the logs API only accepts JSON encoded
/// log lines, and requires some specific normalization of certain event fields.
#[derive(Clone, Debug)]
pub struct JsonEncoding {
encoder: (Transformer, Encoder<Framer>),
}
fn normalize_event(event: &mut Event) {
let log = event.as_mut_log();
let message_path = log
.message_path()
.expect("message is required (make sure the \"message\" semantic meaning is set)")
.clone();
log.rename_key(&message_path, event_path!("message"));

impl JsonEncoding {
pub fn new(transformer: Transformer) -> Self {
Self {
encoder: (
transformer,
Encoder::<Framer>::new(
CharacterDelimitedEncoder::new(b',').into(),
JsonSerializerConfig::default().build().into(),
),
),
}
if let Some(host_path) = log.host_path().cloned().as_ref() {
log.rename_key(host_path, event_path!("hostname"));
}
}

impl crate::sinks::util::encoding::Encoder<Vec<Event>> for JsonEncoding {
fn encode_input(
&self,
mut input: Vec<Event>,
writer: &mut dyn io::Write,
) -> io::Result<(usize, GroupedCountByteSize)> {
for event in input.iter_mut() {
let log = event.as_mut_log();
let message_path = log
.message_path()
.expect("message is required (make sure the \"message\" semantic meaning is set)")
.clone();
log.rename_key(&message_path, event_path!("message"));

if let Some(host_path) = log.host_path().cloned().as_ref() {
log.rename_key(host_path, event_path!("hostname"));
}

let message_path = log
.timestamp_path()
.expect(
"timestamp is required (make sure the \"timestamp\" semantic meaning is set)",
)
.clone();
if let Some(Value::Timestamp(ts)) = log.remove(&message_path) {
log.insert(
event_path!("timestamp"),
Value::Integer(ts.timestamp_millis()),
);
}
}

self.encoder.encode_input(input, writer)
let timestamp_path = log
.timestamp_path()
.expect("timestamp is required (make sure the \"timestamp\" semantic meaning is set)")
.clone();
if let Some(Value::Timestamp(ts)) = log.remove(&timestamp_path) {
log.insert(
event_path!("timestamp"),
Value::Integer(ts.timestamp_millis()),
);
}
}

#[derive(Debug, Snafu)]
pub enum RequestBuildError {
#[snafu(display("Encoded payload is greater than the max limit."))]
PayloadTooBig,
PayloadTooBig { events_that_fit: usize },
#[snafu(display("Failed to build payload with error: {}", error))]
Io { error: std::io::Error },
#[snafu(display("Failed to serialize payload with error: {}", error))]
Json { error: serde_json::Error },
}

impl From<io::Error> for RequestBuildError {
Expand All @@ -158,96 +128,134 @@ impl From<io::Error> for RequestBuildError {
}
}

impl From<serde_json::Error> for RequestBuildError {
fn from(error: serde_json::Error) -> RequestBuildError {
RequestBuildError::Json { error }
}
}

struct LogRequestBuilder {
default_api_key: Arc<str>,
encoding: JsonEncoding,
transformer: Transformer,
compression: Compression,
}

impl RequestBuilder<(Option<Arc<str>>, Vec<Event>)> for LogRequestBuilder {
type Metadata = (Arc<str>, EventFinalizers);
type Events = Vec<Event>;
type Encoder = JsonEncoding;
type Payload = Bytes;
type Request = LogApiRequest;
type Error = RequestBuildError;

fn compression(&self) -> Compression {
self.compression
}

fn encoder(&self) -> &Self::Encoder {
&self.encoding
}

fn split_input(
impl LogRequestBuilder {
fn build_request(
&self,
input: (Option<Arc<str>>, Vec<Event>),
) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
let (api_key, mut events) = input;
let finalizers = events.take_finalizers();
let api_key = api_key.unwrap_or_else(|| Arc::clone(&self.default_api_key));
let builder = RequestMetadataBuilder::from_events(&events);
events: Vec<Event>,
api_key: Arc<str>,
) -> Result<Vec<LogApiRequest>, RequestBuildError> {
// Transform events and pre-compute their estimated size.
let mut events_with_estimated_size: VecDeque<(Event, JsonSize)> = events
.into_iter()
.map(|mut event| {
normalize_event(&mut event);
self.transformer.transform(&mut event);
let estimated_json_size = event.estimated_json_encoded_size_of();
(event, estimated_json_size)
})
.collect();

// Construct requests respecting the max payload size.
let mut requests: Vec<LogApiRequest> = Vec::new();
while !events_with_estimated_size.is_empty() {
let (events_serialized, body, byte_size) =
serialize_with_capacity(&mut events_with_estimated_size)?;
if events_serialized.is_empty() {
// first event was too large for whole request
let _too_big = events_with_estimated_size.pop_front();
emit!(ComponentEventsDropped::<UNINTENTIONAL> {
count: 1,
reason: "Event too large to encode."
});
} else {
let request =
self.finish_request(body, events_serialized, byte_size, Arc::clone(&api_key))?;
requests.push(request);
}
}

((api_key, finalizers), builder, events)
Ok(requests)
}

fn encode_events(
fn finish_request(
&self,
events: Self::Events,
) -> Result<EncodeResult<Self::Payload>, Self::Error> {
// We need to first serialize the payload separately so that we can figure out how big it is
// before compression. The Datadog Logs API has a limit on uncompressed data, so we can't
// use the default implementation of this method.
//
// TODO: We should probably make `build_request` fallible itself, because then this override of `encode_events`
// wouldn't even need to exist, and we could handle it in `build_request` which is required by all implementors.
//
// On the flip side, it would mean that we'd potentially be compressing payloads that we would inevitably end up
// rejecting anyways, which is meh. This might be a signal that the true "right" fix is to actually switch this
// sink to incremental encoding and simply put up with suboptimal batch sizes if we need to end up splitting due
// to (un)compressed size limitations.
let mut buf = Vec::new();
buf: Vec<u8>,
mut events: Vec<Event>,
byte_size: GroupedCountByteSize,
api_key: Arc<str>,
) -> Result<LogApiRequest, RequestBuildError> {
let n_events = events.len();
let (uncompressed_size, byte_size) = self.encoder().encode_input(events, &mut buf)?;
if uncompressed_size > MAX_PAYLOAD_BYTES {
return Err(RequestBuildError::PayloadTooBig);
}
let uncompressed_size = buf.len();

// Now just compress it like normal.
let mut compressor = Compressor::from(self.compression);
write_all(&mut compressor, n_events, &buf)?;
let bytes = compressor.into_inner().freeze();

if self.compression.is_compressed() {
Ok(EncodeResult::compressed(
bytes,
uncompressed_size,
byte_size,
))
let finalizers = events.take_finalizers();
let request_metadata_builder = RequestMetadataBuilder::from_events(&events);

let payload = if self.compression.is_compressed() {
EncodeResult::compressed(bytes, uncompressed_size, byte_size)
} else {
Ok(EncodeResult::uncompressed(bytes, byte_size))
}
}
EncodeResult::uncompressed(bytes, byte_size)
};

fn build_request(
&self,
dd_metadata: Self::Metadata,
metadata: RequestMetadata,
payload: EncodeResult<Self::Payload>,
) -> Self::Request {
let (api_key, finalizers) = dd_metadata;
let uncompressed_size = payload.uncompressed_byte_size;

LogApiRequest {
Ok::<_, RequestBuildError>(LogApiRequest {
api_key,
finalizers,
compression: self.compression,
metadata: request_metadata_builder.build(&payload),
uncompressed_size: payload.uncompressed_byte_size,
body: payload.into_payload(),
finalizers,
uncompressed_size,
metadata,
})
}
}

/// Serialize events into a buffer as a JSON array that has a maximum size of
/// `MAX_PAYLOAD_BYTES`.
///
/// Returns the serialized events, the buffer, and the byte size of the events.
/// Events that are not serialized remain in the `events` parameter.
fn serialize_with_capacity(
events: &mut VecDeque<(Event, JsonSize)>,
) -> Result<(Vec<Event>, Vec<u8>, GroupedCountByteSize), io::Error> {
// Compute estimated size, accounting for the size of the brackets and commas.
let total_estimated =
events.iter().map(|(_, size)| size.get()).sum::<usize>() + events.len() * 2;

// Initialize state.
let mut buf = Vec::with_capacity(total_estimated);
let mut byte_size = telemetry().create_request_count_byte_size();
let mut events_serialized = Vec::with_capacity(events.len());

// Write entries until the buffer is full.
buf.push(b'[');
let mut first = true;
while let Some((event, estimated_json_size)) = events.pop_front() {
// Track the existing length of the buffer so we can truncate it if we need to.
let existing_len = buf.len();
if first {
first = false;
} else {
buf.push(b',');
}
serde_json::to_writer(&mut buf, event.as_log())?;
// If the buffer is too big, truncate it and break out of the loop.
if buf.len() >= MAX_PAYLOAD_BYTES {
events.push_front((event, estimated_json_size));
buf.truncate(existing_len);
break;
}
// Otherwise, track the size of the event and continue.
byte_size.add_event(&event, estimated_json_size);
events_serialized.push(event);
}
buf.push(b']');

Ok((events_serialized, buf, byte_size))
}

impl<S> LogSink<S>
Expand All @@ -262,26 +270,36 @@ where

let partitioner = EventPartitioner;
let batch_settings = self.batch_settings;
let builder = Arc::new(LogRequestBuilder {
default_api_key,
transformer: self.transformer,
compression: self.compression,
});

let input = input.batched_partitioned(partitioner, || batch_settings.as_byte_size_config());
let input = input.batched_partitioned(partitioner, || {
batch_settings.as_item_size_config(HttpJsonBatchSizer)
});
input
.request_builder(
default_request_builder_concurrency_limit(),
LogRequestBuilder {
default_api_key,
encoding: self.encoding,
compression: self.compression,
},
)
.concurrent_map(default_request_builder_concurrency_limit(), move |input| {
let builder = Arc::clone(&builder);

Box::pin(async move {
let (api_key, events) = input;
let api_key = api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key));

builder.build_request(events, api_key)
})
})
.filter_map(|request| async move {
match request {
Err(error) => {
emit!(SinkRequestBuildError { error });
None
}
Ok(req) => Some(req),
Ok(reqs) => Some(futures::stream::iter(reqs)),
}
})
.flatten()
.into_driver(self.service)
.protocol(self.protocol)
.run()
Expand Down
Loading

0 comments on commit e9be29a

Please sign in to comment.