Skip to content

Commit

Permalink
chore(loki sink): add namespaced timestamp integration test (#16473)
Browse files Browse the repository at this point in the history
* Add namespaced timestamp integration test

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

* Clippy

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>

---------

Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
  • Loading branch information
StephenWakely authored Feb 16, 2023
1 parent b7ec52a commit 45d760c
Showing 1 changed file with 83 additions and 9 deletions.
92 changes: 83 additions & 9 deletions src/sinks/loki/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,22 @@
use std::convert::TryFrom;
use std::sync::Arc;

use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
use futures::stream;
use lookup::owned_value_path;
use value::{kind::Collection, Kind};
use vector_common::encode_logfmt;
use vector_core::event::{BatchNotifier, BatchStatus, Event, LogEvent};
use vector_core::{
config::LogNamespace,
event::{BatchNotifier, BatchStatus, Event, LogEvent},
};

use super::config::{LokiConfig, OutOfOrderAction};
use crate::{
config::{log_schema, SinkConfig},
event::Value,
schema,
sinks::{util::test::load_sink, VectorSink},
template::Template,
test_util::{
Expand All @@ -21,19 +29,18 @@ fn loki_address() -> String {
std::env::var("LOKI_ADDRESS").unwrap_or_else(|_| "http://localhost:3100".into())
}

async fn build_sink(codec: &str) -> (uuid::Uuid, VectorSink) {
async fn build_sink(codec: &str, remove_timestamp: bool) -> (uuid::Uuid, VectorSink) {
let stream = uuid::Uuid::new_v4();

let config = format!(
r#"
endpoint = "{}"
labels = {{test_name = "placeholder"}}
encoding.codec = "{}"
remove_timestamp = false
encoding.codec = "{codec}"
remove_timestamp = {remove_timestamp}
tenant_id = "default"
"#,
loki_address(),
codec
);

let (mut config, cx) = load_sink::<LokiConfig>(&config).unwrap();
Expand Down Expand Up @@ -91,9 +98,44 @@ fn event_generator(index: usize) -> Event {
Event::Log(LogEvent::from(line_generator(index)))
}

/// A generator that creates a timestamp in the given field.
/// This field is given the meaning of `timestamp` to ensure that in the Vector namespace
/// we use this field as a timestamp.
fn namespaced_timestamp_generator(
timestamp_field: String,
timestamp: chrono::DateTime<Utc>,
) -> impl Fn(usize) -> Event {
move |index: usize| {
let mut log = LogEvent::default();
log.insert("message", line_generator(index));
log.insert(timestamp_field.as_str(), Value::from(timestamp));

// We need vector metadata for it to pick up that it is in the Vector namespace.
LogNamespace::Vector.insert_standard_vector_source_metadata(&mut log, "loki", Utc::now());

let schema = schema::Definition::new_with_default_metadata(
Kind::object(Collection::empty()),
[LogNamespace::Vector],
)
// Add a field that means the timestamp.
.with_event_field(
&owned_value_path!(&timestamp_field),
Kind::timestamp(),
Some("timestamp"),
);

let mut event = Event::from(log);
event
.metadata_mut()
.set_schema_definition(&Arc::new(schema));

event
}
}

#[tokio::test]
async fn text() {
let (stream, sink) = build_sink("text").await;
let (stream, sink) = build_sink("text", false).await;

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (lines, events) = generate_lines_with_stream(line_generator, 10, Some(batch));
Expand All @@ -109,6 +151,38 @@ async fn text() {
}
}

#[tokio::test]
async fn namespaced_timestamp() {
let (stream, sink) = build_sink("json", true).await;

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let timestamp = Utc::now() - Duration::minutes(5);
let (lines, events) = generate_events_with_stream(
namespaced_timestamp_generator("norknork".to_string(), timestamp),
10,
Some(batch),
);
run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await;
assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

tokio::time::sleep(tokio::time::Duration::new(1, 0)).await;

let (timestamps, outputs) = fetch_stream(stream.to_string(), "default").await;
assert_eq!(lines.len(), outputs.len());
for (i, output) in outputs.iter().enumerate() {
let output = serde_json::from_str::<serde_json::Value>(output).unwrap();
let output = output.as_object().unwrap();

// The sink should have removed the `norknork` field because we have configured
// it to remove the timestamp field.
assert!(!output.contains_key("norknork"));

// The timestamp of the event needs to be the timestamp set in the `norknork`
// field since that was given the meaning of `timestamp`.
assert_eq!(timestamp.timestamp_nanos(), timestamps[i]);
}
}

#[tokio::test]
async fn text_with_none_compression() {
let (stream, sink) = build_sink_with_compression("text", "none").await;
Expand Down Expand Up @@ -147,7 +221,7 @@ async fn text_with_gzip_compression() {

#[tokio::test]
async fn json() {
let (stream, sink) = build_sink("json").await;
let (stream, sink) = build_sink("json", false).await;

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (lines, events) = generate_events_with_stream(event_generator, 10, Some(batch));
Expand All @@ -167,7 +241,7 @@ async fn json() {
// https://github.com/vectordotdev/vector/issues/7815
#[tokio::test]
async fn json_nested_fields() {
let (stream, sink) = build_sink("json").await;
let (stream, sink) = build_sink("json", false).await;

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let generator = |idx| {
Expand All @@ -192,7 +266,7 @@ async fn json_nested_fields() {

#[tokio::test]
async fn logfmt() {
let (stream, sink) = build_sink("logfmt").await;
let (stream, sink) = build_sink("logfmt", false).await;

let (batch, mut receiver) = BatchNotifier::new_with_receiver();
let (lines, events) = generate_events_with_stream(event_generator, 10, Some(batch));
Expand Down

0 comments on commit 45d760c

Please sign in to comment.