diff --git a/.github/workflows/changes.yml b/.github/workflows/changes.yml index 653d3395bdfda..f480bac6265e4 100644 --- a/.github/workflows/changes.yml +++ b/.github/workflows/changes.yml @@ -56,8 +56,14 @@ on: value: ${{ jobs.int_tests.outputs.clickhouse }} databend: value: ${{ jobs.int_tests.outputs.databend }} - datadog: - value: ${{ jobs.int_tests.outputs.datadog }} + datadog-agent: + value: ${{ jobs.int_tests.outputs.datadog-agent }} + datadog-logs: + value: ${{ jobs.int_tests.outputs.datadog-logs }} + datadog-metrics: + value: ${{ jobs.int_tests.outputs.datadog-metrics }} + datadog-traces: + value: ${{ jobs.int_tests.outputs.datadog-traces }} dnstap: value: ${{ jobs.int_tests.outputs.dnstap }} docker-logs: @@ -189,7 +195,10 @@ jobs: azure: ${{ steps.filter.outputs.azure }} clickhouse: ${{ steps.filter.outputs.clickhouse }} databend: ${{ steps.filter.outputs.databend }} - datadog: ${{ steps.filter.outputs.datadog }} + datadog-agent: ${{ steps.filter.outputs.datadog-agent }} + datadog-logs: ${{ steps.filter.outputs.datadog-logs }} + datadog-metrics: ${{ steps.filter.outputs.datadog-metrics }} + datadog-traces: ${{ steps.filter.outputs.datadog-traces }} dnstap: ${{ steps.filter.outputs.dnstap }} docker-logs: ${{ steps.filter.outputs.docker-logs }} elasticsearch: ${{ steps.filter.outputs.elasticsearch }} diff --git a/.github/workflows/integration.yml b/.github/workflows/integration.yml index 01a2a7dabca3a..709f2091fe0b1 100644 --- a/.github/workflows/integration.yml +++ b/.github/workflows/integration.yml @@ -60,7 +60,10 @@ jobs: || needs.changes.outputs.azure == 'true' || needs.changes.outputs.clickhouse == 'true' || needs.changes.outputs.databend == 'true' - || needs.changes.outputs.datadog == 'true' + || needs.changes.outputs.datadog-agent == 'true' + || needs.changes.outputs.datadog-logs == 'true' + || needs.changes.outputs.datadog-metrics == 'true' + || needs.changes.outputs.datadog-traces == 'true' || needs.changes.outputs.dnstap == 'true' || needs.changes.outputs.docker-logs == 'true' || needs.changes.outputs.elasticsearch == 'true' @@ -166,7 +169,7 @@ jobs: max_attempts: 3 command: bash scripts/ci-integration-test.sh databend - - if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog == 'true') && + - if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog-agent == 'true') && (github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true') name: datadog-agent uses: nick-fields/retry@v2 @@ -175,7 +178,7 @@ jobs: max_attempts: 3 command: bash scripts/ci-integration-test.sh datadog-agent - - if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog == 'true') && + - if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog-logs == 'true') && (github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true') name: datadog-logs uses: nick-fields/retry@v2 @@ -184,7 +187,7 @@ jobs: max_attempts: 3 command: bash scripts/ci-integration-test.sh datadog-logs - - if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog == 'true') && + - if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog-metrics == 'true') && (github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true') name: datadog-metrics uses: nick-fields/retry@v2 @@ -193,7 +196,7 @@ jobs: max_attempts: 3 command: bash scripts/ci-integration-test.sh datadog-metrics - - if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog == 'true') && + - if: (github.event_name == 'merge_group' || needs.changes.outputs.all-int == 'true' || needs.changes.outputs.datadog-traces == 'true') && (github.event_name != 'pull_request' || env.PR_HAS_ACCESS_TO_SECRETS == 'true') name: datadog-traces uses: nick-fields/retry@v2 diff --git a/scripts/integration/datadog-metrics/test.yaml b/scripts/integration/datadog-metrics/test.yaml index 237008a0d2551..a45e55a9797f2 100644 --- a/scripts/integration/datadog-metrics/test.yaml +++ b/scripts/integration/datadog-metrics/test.yaml @@ -1,7 +1,7 @@ features: - datadog-metrics-integration-tests -test_filter: '::datadog::metrics::' +test_filter: '::datadog::metrics::integration_tests' runner: env: diff --git a/scripts/integration/datadog-traces/test.yaml b/scripts/integration/datadog-traces/test.yaml index 31c4c0f97ef11..e518c8d3c1d93 100644 --- a/scripts/integration/datadog-traces/test.yaml +++ b/scripts/integration/datadog-traces/test.yaml @@ -19,6 +19,6 @@ matrix: paths: - "src/common/datadog.rs" - "src/internal_events/datadog_*" -- "src/sinks/datadog/**" +- "src/sinks/datadog/traces/**" - "src/sinks/util/**" - "scripts/integration/datadog-traces/**" diff --git a/src/sinks/datadog/metrics/integration_tests.rs b/src/sinks/datadog/metrics/integration_tests.rs index 458cc5be987f3..99645780b65ca 100644 --- a/src/sinks/datadog/metrics/integration_tests.rs +++ b/src/sinks/datadog/metrics/integration_tests.rs @@ -1,4 +1,5 @@ use bytes::Bytes; +use chrono::{SubsecRound, Utc}; use flate2::read::ZlibDecoder; use futures::{channel::mpsc::Receiver, stream, StreamExt}; use hyper::StatusCode; @@ -22,28 +23,25 @@ use crate::{ }, }; -enum ApiStatus { - OK, - // Forbidden, -} +fn generate_metric_events() -> Vec { + let timestamp = Utc::now().trunc_subsecs(3); + let events: Vec<_> = (0..10) + .map(|index| { + let ts = timestamp + (std::time::Duration::from_secs(2) * index); + Event::Metric( + Metric::new( + format!("counter_{}", thread_rng().gen::()), + MetricKind::Incremental, + MetricValue::Counter { + value: index as f64, + }, + ) + .with_timestamp(Some(ts)), + ) + }) + .collect(); -fn test_server( - addr: std::net::SocketAddr, - api_status: ApiStatus, -) -> ( - futures::channel::mpsc::Receiver<(http::request::Parts, Bytes)>, - stream_cancel::Trigger, - impl std::future::Future>, -) { - let status = match api_status { - ApiStatus::OK => StatusCode::OK, - // ApiStatus::Forbidden => StatusCode::FORBIDDEN, - }; - - // NOTE: we pass `Trigger` out to the caller even though this suite never - // uses it as it's being dropped cancels the stream machinery here, - // indicating failures that might not be valid. - build_test_server_status(addr, status) + events } /// Starts a test sink with random metrics running into it @@ -55,10 +53,7 @@ fn test_server( /// Testers may set `http_status` and `batch_status`. The first controls what /// status code faked HTTP responses will have, the second acts as a check on /// the `Receiver`'s status before being returned to the caller. -async fn start_test( - api_status: ApiStatus, - batch_status: BatchStatus, -) -> (Vec, Receiver<(http::request::Parts, Bytes)>) { +async fn start_test() -> (Vec, Receiver<(http::request::Parts, Bytes)>) { let config = indoc! {r#" default_api_key = "atoken" default_namespace = "foo" @@ -73,25 +68,18 @@ async fn start_test( let (sink, _) = config.build(cx).await.unwrap(); - let (rx, _trigger, server) = test_server(addr, api_status); + let (rx, _trigger, server) = build_test_server_status(addr, StatusCode::OK); tokio::spawn(server); - let (batch, receiver) = BatchNotifier::new_with_receiver(); - let events: Vec<_> = (0..10) - .map(|index| { - Event::Metric(Metric::new( - format!("counter_{}", thread_rng().gen::()), - MetricKind::Absolute, - MetricValue::Counter { - value: index as f64, - }, - )) - }) - .collect(); + let (batch, mut receiver) = BatchNotifier::new_with_receiver(); + + let events = generate_metric_events(); + let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); sink.run(stream).await.unwrap(); - assert_eq!(receiver.await, batch_status); + + assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered)); (events, rx) } @@ -110,68 +98,96 @@ fn decompress_payload(payload: Vec) -> std::io::Result> { /// were delivered and then asserts that every message is able to be /// deserialized. async fn smoke() { - let (expected, rx) = start_test(ApiStatus::OK, BatchStatus::Delivered).await; + let (expected, rx) = start_test().await; let output = rx.take(expected.len()).collect::>().await; - for val in output.iter() { - assert_eq!( - val.0.headers.get("Content-Type").unwrap(), - "application/json" - ); - assert_eq!(val.0.headers.get("DD-API-KEY").unwrap(), "atoken"); - assert!(val.0.headers.contains_key("DD-Agent-Payload")); - - let compressed_payload = val.1.to_vec(); - let payload = decompress_payload(compressed_payload).unwrap(); - let payload = std::str::from_utf8(&payload).unwrap(); - let payload: serde_json::Value = serde_json::from_str(payload).unwrap(); - - let series = payload - .as_object() - .unwrap() - .get("series") - .unwrap() - .as_array() - .unwrap(); - assert!(!series.is_empty()); - - // check metrics are sorted by name, which helps HTTP compression - let metric_names: Vec = series - .iter() - .map(|value| { - value - .as_object() - .unwrap() - .get("metric") - .unwrap() - .as_str() - .unwrap() - .to_string() - }) - .collect(); - let mut sorted_names = metric_names.clone(); - sorted_names.sort(); - assert_eq!(metric_names, sorted_names); - - let entry = series.first().unwrap().as_object().unwrap(); - assert_eq!( - entry.get("metric").unwrap().as_str().unwrap(), - "foo.counter" - ); - assert_eq!(entry.get("type").unwrap().as_str().unwrap(), "count"); - let points = entry - .get("points") - .unwrap() - .as_array() - .unwrap() - .first() - .unwrap() - .as_array() - .unwrap(); - assert_eq!(points.len(), 2); - assert_eq!(points.get(1).unwrap().as_f64().unwrap(), 1.0); - } + assert!(output.len() == 1, "Should have received a response"); + + let val = output.first().unwrap(); + + assert_eq!( + val.0.headers.get("Content-Type").unwrap(), + "application/json" + ); + assert_eq!(val.0.headers.get("DD-API-KEY").unwrap(), "atoken"); + assert!(val.0.headers.contains_key("DD-Agent-Payload")); + + let compressed_payload = val.1.to_vec(); + let payload = decompress_payload(compressed_payload).unwrap(); + let payload = std::str::from_utf8(&payload).unwrap(); + let payload: serde_json::Value = serde_json::from_str(payload).unwrap(); + + let series = payload + .as_object() + .unwrap() + .get("series") + .unwrap() + .as_array() + .unwrap(); + assert!(!series.is_empty()); + + // check metrics are sorted by name, which helps HTTP compression + let metric_names: Vec = series + .iter() + .map(|value| { + value + .as_object() + .unwrap() + .get("metric") + .unwrap() + .as_str() + .unwrap() + .to_string() + }) + .collect(); + let mut sorted_names = metric_names.clone(); + sorted_names.sort(); + assert_eq!(metric_names, sorted_names); + + let entry = series.first().unwrap().as_object().unwrap(); + assert!(entry + .get("metric") + .unwrap() + .as_str() + .unwrap() + .starts_with("foo.counter_"),); + assert_eq!(entry.get("type").unwrap().as_str().unwrap(), "count"); + let points = entry + .get("points") + .unwrap() + .as_array() + .unwrap() + .first() + .unwrap() + .as_array() + .unwrap(); + assert_eq!(points.len(), 2); + + // validate that all values were received + let all_values: f64 = series + .iter() + .map(|entry| { + entry + .as_object() + .unwrap() + .get("points") + .unwrap() + .as_array() + .unwrap() + .first() + .unwrap() + .as_array() + .unwrap() + .get(1) + .unwrap() + .as_f64() + .unwrap() + }) + .sum(); + + // the input values are [0..10) + assert_eq!(all_values, 45.0); } async fn run_sink() { @@ -186,17 +202,9 @@ async fn run_sink() { let (sink, _) = config.build(cx).await.unwrap(); let (batch, receiver) = BatchNotifier::new_with_receiver(); - let events: Vec<_> = (0..10) - .map(|index| { - Event::Metric(Metric::new( - "counter", - MetricKind::Absolute, - MetricValue::Counter { - value: index as f64, - }, - )) - }) - .collect(); + + let events = generate_metric_events(); + let stream = map_event_batch_stream(stream::iter(events.clone()), Some(batch)); sink.run(stream).await.unwrap();