Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(observability): add tests to sinks for Data Volume tags #17853

Merged
merged 14 commits into from
Jul 28, 2023
87 changes: 79 additions & 8 deletions lib/vector-common/src/request_metadata.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::ops::Add;
use std::ops::{Add, AddAssign};
use std::{collections::HashMap, sync::Arc};

use crate::{
Expand Down Expand Up @@ -145,6 +145,22 @@ impl GroupedCountByteSize {
}
}
}

/// Returns `true` if we are the `Tagged` variant - keeping track of the byte sizes
/// grouped by their relevant tags.
#[must_use]
pub fn is_tagged(&self) -> bool {
match self {
GroupedCountByteSize::Tagged { .. } => true,
GroupedCountByteSize::Untagged { .. } => false,
}
}

/// Returns `true` if we are the `Untagged` variant - keeping a single count for all events.
#[must_use]
pub fn is_untagged(&self) -> bool {
!self.is_tagged()
}
}

impl From<CountByteSize> for GroupedCountByteSize {
Expand All @@ -153,26 +169,81 @@ impl From<CountByteSize> for GroupedCountByteSize {
}
}

impl AddAssign for GroupedCountByteSize {
StephenWakely marked this conversation as resolved.
Show resolved Hide resolved
fn add_assign(&mut self, mut rhs: Self) {
if self.is_untagged() && rhs.is_tagged() {
// First handle the case where we are untagged and assigning to a tagged value.
// We need to change `self` and so need to ensure our match doesn't take ownership of the object.
*self = match (&self, &mut rhs) {
(Self::Untagged { size }, Self::Tagged { sizes }) => {
let mut sizes = std::mem::take(sizes);
match sizes.get_mut(&EventCountTags::new_empty()) {
Some(empty_size) => *empty_size += *size,
None => {
sizes.insert(EventCountTags::new_empty(), *size);
}
}

Self::Tagged { sizes }
}
_ => {
unreachable!()
}
};

return;
}

// For these cases, we know we won't have to change `self` so the match can take ownership.
match (self, rhs) {
(Self::Tagged { sizes: ref mut lhs }, Self::Tagged { sizes: rhs }) => {
for (key, value) in rhs {
match lhs.get_mut(&key) {
Some(size) => *size += value,
None => {
lhs.insert(key.clone(), value);
}
}
}
}

(Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => {
*lhs = *lhs + rhs;
}

(Self::Tagged { ref mut sizes }, Self::Untagged { size }) => {
match sizes.get_mut(&EventCountTags::new_empty()) {
Some(empty_size) => *empty_size += size,
None => {
sizes.insert(EventCountTags::new_empty(), size);
}
}
}
(Self::Untagged { .. }, Self::Tagged { .. }) => unreachable!(),
};
}
}

impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize {
type Output = GroupedCountByteSize;

fn add(self, other: &'a Self::Output) -> Self::Output {
match (self, other) {
(Self::Tagged { sizes: mut us }, Self::Tagged { sizes: them }) => {
for (key, value) in them {
match us.get_mut(key) {
(Self::Tagged { sizes: mut lhs }, Self::Tagged { sizes: rhs }) => {
for (key, value) in rhs {
match lhs.get_mut(key) {
Some(size) => *size += *value,
None => {
us.insert(key.clone(), *value);
lhs.insert(key.clone(), *value);
}
}
}

Self::Tagged { sizes: us }
Self::Tagged { sizes: lhs }
}

(Self::Untagged { size: us }, Self::Untagged { size: them }) => {
Self::Untagged { size: us + *them }
(Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => {
Self::Untagged { size: lhs + *rhs }
}

// The following two scenarios shouldn't really occur in practice, but are provided for completeness.
Expand Down
65 changes: 37 additions & 28 deletions src/sinks/datadog/metrics/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use crate::{
config::SinkConfig,
sinks::util::test::{build_test_server_status, load_sink},
test_util::{
components::{assert_sink_compliance, SINK_TAGS},
components::{
assert_data_volume_sink_compliance, assert_sink_compliance, DATA_VOLUME_SINK_TAGS,
SINK_TAGS,
},
map_event_batch_stream, next_addr,
},
};
Expand Down Expand Up @@ -168,35 +171,41 @@ async fn smoke() {
}
}

#[tokio::test]
async fn real_endpoint() {
assert_sink_compliance(&SINK_TAGS, async {
let config = indoc! {r#"
async fn run_sink() {
let config = indoc! {r#"
default_api_key = "${TEST_DATADOG_API_KEY}"
default_namespace = "fake.test.integration"
"#};
let api_key = std::env::var("TEST_DATADOG_API_KEY").unwrap();
assert!(!api_key.is_empty(), "$TEST_DATADOG_API_KEY required");
let config = config.replace("${TEST_DATADOG_API_KEY}", &api_key);
let (config, cx) = load_sink::<DatadogMetricsConfig>(config.as_str()).unwrap();

let (sink, _) = config.build(cx).await.unwrap();
let (batch, receiver) = BatchNotifier::new_with_receiver();
let events: Vec<_> = (0..10)
.map(|index| {
Event::Metric(Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
))
})
.collect();
let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));
let api_key = std::env::var("TEST_DATADOG_API_KEY").unwrap();
assert!(!api_key.is_empty(), "$TEST_DATADOG_API_KEY required");
let config = config.replace("${TEST_DATADOG_API_KEY}", &api_key);
let (config, cx) = load_sink::<DatadogMetricsConfig>(config.as_str()).unwrap();

sink.run(stream).await.unwrap();
assert_eq!(receiver.await, BatchStatus::Delivered);
})
.await;
let (sink, _) = config.build(cx).await.unwrap();
let (batch, receiver) = BatchNotifier::new_with_receiver();
let events: Vec<_> = (0..10)
.map(|index| {
Event::Metric(Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
))
})
.collect();
let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));

sink.run(stream).await.unwrap();
assert_eq!(receiver.await, BatchStatus::Delivered);
}

#[tokio::test]
async fn real_endpoint() {
assert_sink_compliance(&SINK_TAGS, async { run_sink().await }).await;
}

#[tokio::test]
async fn data_volume_tags() {
assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async { run_sink().await }).await;
}
12 changes: 4 additions & 8 deletions src/sinks/datadog/traces/request_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,7 @@ use bytes::Bytes;
use prost::Message;
use snafu::Snafu;
use vector_common::request_metadata::RequestMetadata;
use vector_core::{
event::{EventFinalizers, Finalizable},
EstimatedJsonEncodedSizeOf,
};
use vector_core::event::{EventFinalizers, Finalizable};

use super::{
apm_stats::{compute_apm_stats, Aggregator},
Expand Down Expand Up @@ -125,7 +122,6 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequ
.for_each(|r| match r {
Ok((payload, mut processed)) => {
let uncompressed_size = payload.len();
let json_size = processed.estimated_json_encoded_size_of();
let metadata = DDTracesMetadata {
api_key: key
.api_key
Expand All @@ -137,14 +133,14 @@ impl IncrementalRequestBuilder<(PartitionKey, Vec<Event>)> for DatadogTracesRequ
content_type: "application/x-protobuf".to_string(),
};

// build RequestMetadata
let builder = RequestMetadataBuilder::from_events(&processed);

let mut compressor = Compressor::from(self.compression);
match compressor.write_all(&payload) {
Ok(()) => {
let bytes = compressor.into_inner().freeze();

// build RequestMetadata
let builder =
RequestMetadataBuilder::new(n, uncompressed_size, json_size);
let bytes_len = NonZeroUsize::new(bytes.len())
.expect("payload should never be zero length");
let request_metadata = builder.with_request_size(bytes_len);
Expand Down
Loading