Skip to content

Commit

Permalink
fix(loki sink): use json size of unencoded event (#17572)
Browse files Browse the repository at this point in the history
Currently the `loki` sink is encoding the event and then taking the
estimated json size of that encoded event. This is wrong. All sinks
should take the estimated json size of the unencoded event.

There is an open question around whether we should be taking the json
size before or after the `only_fields` and `except_fields` are applied.
I'm currently trying to get an answer to that. Currently everything is
before.

The `loki` sink works a little bit different to the other stream based
sinks. Most sinks pass the event unencoded to the request builder. It is
at this point that the json size of the metrics are calculated. However,
`loki` encodes the event and passes the encoded value to the request
builder.

This PR changes it so it also passes the json size to the request
builder so it can use that value to calculate the metrics.

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely authored Jun 2, 2023
1 parent 8a741d5 commit ee5a52c
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 17 deletions.
18 changes: 2 additions & 16 deletions src/sinks/loki/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,21 +139,6 @@ impl ByteSizeOf for LokiEvent {
}
}

/// This implementation approximates the `Serialize` implementation below, without any allocations.
impl EstimatedJsonEncodedSizeOf for LokiEvent {
fn estimated_json_encoded_size_of(&self) -> JsonSize {
static BRACKETS_SIZE: JsonSize = JsonSize::new(2);
static COLON_SIZE: JsonSize = JsonSize::new(1);
static QUOTES_SIZE: JsonSize = JsonSize::new(2);

BRACKETS_SIZE
+ QUOTES_SIZE
+ self.timestamp.estimated_json_encoded_size_of()
+ COLON_SIZE
+ self.event.estimated_json_encoded_size_of()
}
}

impl Serialize for LokiEvent {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
Expand All @@ -172,6 +157,7 @@ pub struct LokiRecord {
pub partition: PartitionKey,
pub labels: Labels,
pub event: LokiEvent,
pub json_byte_size: JsonSize,
pub finalizers: EventFinalizers,
}

Expand All @@ -187,7 +173,7 @@ impl ByteSizeOf for LokiRecord {

impl EstimatedJsonEncodedSizeOf for LokiRecord {
fn estimated_json_encoded_size_of(&self) -> JsonSize {
self.event.estimated_json_encoded_size_of()
self.json_byte_size
}
}

Expand Down
4 changes: 3 additions & 1 deletion src/sinks/loki/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use vector_core::{
partition::Partitioner,
sink::StreamSink,
stream::BatcherSettings,
ByteSizeOf,
ByteSizeOf, EstimatedJsonEncodedSizeOf,
};

use super::{
Expand Down Expand Up @@ -268,6 +268,7 @@ impl EventEncoder {
pub(super) fn encode_event(&mut self, mut event: Event) -> Option<LokiRecord> {
let tenant_id = self.key_partitioner.partition(&event);
let finalizers = event.take_finalizers();
let json_byte_size = event.estimated_json_encoded_size_of();
let mut labels = self.build_labels(&event);
self.remove_label_fields(&mut event);

Expand Down Expand Up @@ -302,6 +303,7 @@ impl EventEncoder {
},
partition,
finalizers,
json_byte_size,
})
}
}
Expand Down

0 comments on commit ee5a52c

Please sign in to comment.