Skip to content

Commit

Permalink
fix(datadog_metrics sink): the integration tests weren't actually val…
Browse files Browse the repository at this point in the history
…idating anything (#18754)

* fix(datadog_metrics sink): fix the integration tests which weren't actually validating anything

* fix workflows

* clippy

* fix filter for traces
  • Loading branch information
neuronull authored Oct 4, 2023
1 parent 39b9298 commit afc166f
Show file tree
Hide file tree
Showing 5 changed files with 139 additions and 119 deletions.
15 changes: 12 additions & 3 deletions .github/workflows/changes.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,14 @@ on:
value: ${{ jobs.int_tests.outputs.clickhouse }}
databend:
value: ${{ jobs.int_tests.outputs.databend }}
datadog:
value: ${{ jobs.int_tests.outputs.datadog }}
datadog-agent:
value: ${{ jobs.int_tests.outputs.datadog-agent }}
datadog-logs:
value: ${{ jobs.int_tests.outputs.datadog-logs }}
datadog-metrics:
value: ${{ jobs.int_tests.outputs.datadog-metrics }}
datadog-traces:
value: ${{ jobs.int_tests.outputs.datadog-traces }}
dnstap:
value: ${{ jobs.int_tests.outputs.dnstap }}
docker-logs:
Expand Down Expand Up @@ -189,7 +195,10 @@ jobs:
azure: ${{ steps.filter.outputs.azure }}
clickhouse: ${{ steps.filter.outputs.clickhouse }}
databend: ${{ steps.filter.outputs.databend }}
datadog: ${{ steps.filter.outputs.datadog }}
datadog-agent: ${{ steps.filter.outputs.datadog-agent }}
datadog-logs: ${{ steps.filter.outputs.datadog-logs }}
datadog-metrics: ${{ steps.filter.outputs.datadog-metrics }}
datadog-traces: ${{ steps.filter.outputs.datadog-traces }}
dnstap: ${{ steps.filter.outputs.dnstap }}
docker-logs: ${{ steps.filter.outputs.docker-logs }}
elasticsearch: ${{ steps.filter.outputs.elasticsearch }}
Expand Down
13 changes: 8 additions & 5 deletions .github/workflows/integration.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ jobs:
|| needs.changes.outputs.azure == 'true'
|| needs.changes.outputs.clickhouse == 'true'
|| needs.changes.outputs.databend == 'true'
|| needs.changes.outputs.datadog == 'true'
|| needs.changes.outputs.datadog-agent == 'true'
|| needs.changes.outputs.datadog-logs == 'true'
|| needs.changes.outputs.datadog-metrics == 'true'
|| needs.changes.outputs.datadog-traces == 'true'
|| needs.changes.outputs.dnstap == 'true'
|| needs.changes.outputs.docker-logs == 'true'
|| needs.changes.outputs.elasticsearch == 'true'
Expand Down Expand Up @@ -166,7 +169,7 @@ jobs:
max_attempts: 3
command: bash scripts/ci-integration-test.sh databend

- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog == 'true') &&
- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog-agent == 'true') &&
(github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true')
name: datadog-agent
uses: nick-fields/retry@v2
Expand All @@ -175,7 +178,7 @@ jobs:
max_attempts: 3
command: bash scripts/ci-integration-test.sh datadog-agent

- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog == 'true') &&
- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog-logs == 'true') &&
(github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true')
name: datadog-logs
uses: nick-fields/retry@v2
Expand All @@ -184,7 +187,7 @@ jobs:
max_attempts: 3
command: bash scripts/ci-integration-test.sh datadog-logs

- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog == 'true') &&
- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog-metrics == 'true') &&
(github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true')
name: datadog-metrics
uses: nick-fields/retry@v2
Expand All @@ -193,7 +196,7 @@ jobs:
max_attempts: 3
command: bash scripts/ci-integration-test.sh datadog-metrics

- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog == 'true') &&
- if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog-traces == 'true') &&
(github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true')
name: datadog-traces
uses: nick-fields/retry@v2
Expand Down
2 changes: 1 addition & 1 deletion scripts/integration/datadog-metrics/test.yaml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
features:
- datadog-metrics-integration-tests

test_filter: '::datadog::metrics::'
test_filter: '::datadog::metrics::integration_tests'

runner:
env:
Expand Down
2 changes: 1 addition & 1 deletion scripts/integration/datadog-traces/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ matrix:
paths:
- "src/common/datadog.rs"
- "src/internal_events/datadog_*"
- "src/sinks/datadog/**"
- "src/sinks/datadog/traces/**"
- "src/sinks/util/**"
- "scripts/integration/datadog-traces/**"
226 changes: 117 additions & 109 deletions src/sinks/datadog/metrics/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bytes::Bytes;
use chrono::{SubsecRound, Utc};
use flate2::read::ZlibDecoder;
use futures::{channel::mpsc::Receiver, stream, StreamExt};
use hyper::StatusCode;
Expand All @@ -22,28 +23,25 @@ use crate::{
},
};

enum ApiStatus {
OK,
// Forbidden,
}
fn generate_metric_events() -> Vec<Event> {
let timestamp = Utc::now().trunc_subsecs(3);
let events: Vec<_> = (0..10)
.map(|index| {
let ts = timestamp + (std::time::Duration::from_secs(2) * index);
Event::Metric(
Metric::new(
format!("counter_{}", thread_rng().gen::<u32>()),
MetricKind::Incremental,
MetricValue::Counter {
value: index as f64,
},
)
.with_timestamp(Some(ts)),
)
})
.collect();

fn test_server(
addr: std::net::SocketAddr,
api_status: ApiStatus,
) -> (
futures::channel::mpsc::Receiver<(http::request::Parts, Bytes)>,
stream_cancel::Trigger,
impl std::future::Future<Output = Result<(), ()>>,
) {
let status = match api_status {
ApiStatus::OK => StatusCode::OK,
// ApiStatus::Forbidden => StatusCode::FORBIDDEN,
};

// NOTE: we pass `Trigger` out to the caller even though this suite never
// uses it as it's being dropped cancels the stream machinery here,
// indicating failures that might not be valid.
build_test_server_status(addr, status)
events
}

/// Starts a test sink with random metrics running into it
Expand All @@ -55,10 +53,7 @@ fn test_server(
/// Testers may set `http_status` and `batch_status`. The first controls what
/// status code faked HTTP responses will have, the second acts as a check on
/// the `Receiver`'s status before being returned to the caller.
async fn start_test(
api_status: ApiStatus,
batch_status: BatchStatus,
) -> (Vec<Event>, Receiver<(http::request::Parts, Bytes)>) {
async fn start_test() -> (Vec<Event>, Receiver<(http::request::Parts, Bytes)>) {
let config = indoc! {r#"
default_api_key = "atoken"
default_namespace = "foo"
Expand All @@ -73,25 +68,18 @@ async fn start_test(

let (sink, _) = config.build(cx).await.unwrap();

let (rx, _trigger, server) = test_server(addr, api_status);
let (rx, _trigger, server) = build_test_server_status(addr, StatusCode::OK);
tokio::spawn(server);

let (batch, receiver) = BatchNotifier::new_with_receiver();
let events: Vec<_> = (0..10)
.map(|index| {
Event::Metric(Metric::new(
format!("counter_{}", thread_rng().gen::<u32>()),
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
))
})
.collect();
let (batch, mut receiver) = BatchNotifier::new_with_receiver();

let events = generate_metric_events();

let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));

sink.run(stream).await.unwrap();
assert_eq!(receiver.await, batch_status);

assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));

(events, rx)
}
Expand All @@ -110,68 +98,96 @@ fn decompress_payload(payload: Vec<u8>) -> std::io::Result<Vec<u8>> {
/// were delivered and then asserts that every message is able to be
/// deserialized.
async fn smoke() {
let (expected, rx) = start_test(ApiStatus::OK, BatchStatus::Delivered).await;
let (expected, rx) = start_test().await;

let output = rx.take(expected.len()).collect::<Vec<_>>().await;

for val in output.iter() {
assert_eq!(
val.0.headers.get("Content-Type").unwrap(),
"application/json"
);
assert_eq!(val.0.headers.get("DD-API-KEY").unwrap(), "atoken");
assert!(val.0.headers.contains_key("DD-Agent-Payload"));

let compressed_payload = val.1.to_vec();
let payload = decompress_payload(compressed_payload).unwrap();
let payload = std::str::from_utf8(&payload).unwrap();
let payload: serde_json::Value = serde_json::from_str(payload).unwrap();

let series = payload
.as_object()
.unwrap()
.get("series")
.unwrap()
.as_array()
.unwrap();
assert!(!series.is_empty());

// check metrics are sorted by name, which helps HTTP compression
let metric_names: Vec<String> = series
.iter()
.map(|value| {
value
.as_object()
.unwrap()
.get("metric")
.unwrap()
.as_str()
.unwrap()
.to_string()
})
.collect();
let mut sorted_names = metric_names.clone();
sorted_names.sort();
assert_eq!(metric_names, sorted_names);

let entry = series.first().unwrap().as_object().unwrap();
assert_eq!(
entry.get("metric").unwrap().as_str().unwrap(),
"foo.counter"
);
assert_eq!(entry.get("type").unwrap().as_str().unwrap(), "count");
let points = entry
.get("points")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.as_array()
.unwrap();
assert_eq!(points.len(), 2);
assert_eq!(points.get(1).unwrap().as_f64().unwrap(), 1.0);
}
assert!(output.len() == 1, "Should have received a response");

let val = output.first().unwrap();

assert_eq!(
val.0.headers.get("Content-Type").unwrap(),
"application/json"
);
assert_eq!(val.0.headers.get("DD-API-KEY").unwrap(), "atoken");
assert!(val.0.headers.contains_key("DD-Agent-Payload"));

let compressed_payload = val.1.to_vec();
let payload = decompress_payload(compressed_payload).unwrap();
let payload = std::str::from_utf8(&payload).unwrap();
let payload: serde_json::Value = serde_json::from_str(payload).unwrap();

let series = payload
.as_object()
.unwrap()
.get("series")
.unwrap()
.as_array()
.unwrap();
assert!(!series.is_empty());

// check metrics are sorted by name, which helps HTTP compression
let metric_names: Vec<String> = series
.iter()
.map(|value| {
value
.as_object()
.unwrap()
.get("metric")
.unwrap()
.as_str()
.unwrap()
.to_string()
})
.collect();
let mut sorted_names = metric_names.clone();
sorted_names.sort();
assert_eq!(metric_names, sorted_names);

let entry = series.first().unwrap().as_object().unwrap();
assert!(entry
.get("metric")
.unwrap()
.as_str()
.unwrap()
.starts_with("foo.counter_"),);
assert_eq!(entry.get("type").unwrap().as_str().unwrap(), "count");
let points = entry
.get("points")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.as_array()
.unwrap();
assert_eq!(points.len(), 2);

// validate that all values were received
let all_values: f64 = series
.iter()
.map(|entry| {
entry
.as_object()
.unwrap()
.get("points")
.unwrap()
.as_array()
.unwrap()
.first()
.unwrap()
.as_array()
.unwrap()
.get(1)
.unwrap()
.as_f64()
.unwrap()
})
.sum();

// the input values are [0..10)
assert_eq!(all_values, 45.0);
}

async fn run_sink() {
Expand All @@ -186,17 +202,9 @@ async fn run_sink() {

let (sink, _) = config.build(cx).await.unwrap();
let (batch, receiver) = BatchNotifier::new_with_receiver();
let events: Vec<_> = (0..10)
.map(|index| {
Event::Metric(Metric::new(
"counter",
MetricKind::Absolute,
MetricValue::Counter {
value: index as f64,
},
))
})
.collect();

let events = generate_metric_events();

let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch));

sink.run(stream).await.unwrap();
Expand Down

0 comments on commit afc166f

Please sign in to comment.