Skip to content

Commit

Permalink
Add benchmark for SpanConcentrator (#603)
Browse files Browse the repository at this point in the history
* Add benchmark for concentrator

* Disable autobenches

* Change bench name
  • Loading branch information
VianneyRuhlmann committed Sep 4, 2024
1 parent 081589b commit c035537
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 4 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ rust-version.workspace = true
edition.workspace = true
version.workspace = true
license.workspace = true
autobenches = false

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

Expand All @@ -26,5 +27,11 @@ datadog-ddsketch = { path = "../ddsketch"}
[lib]
bench = false

[[bench]]
name = "main"
harness = false
path = "benches/main.rs"

[dev-dependencies]
criterion = "0.5.1"
rand = "0.8.5"
7 changes: 7 additions & 0 deletions data-pipeline/benches/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
use criterion::criterion_main;

mod span_concentrator_bench;

criterion_main!(span_concentrator_bench::benches);
72 changes: 72 additions & 0 deletions data-pipeline/benches/span_concentrator_bench.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0
use std::{
collections::HashMap,
time::{self, Duration, SystemTime},
};

use criterion::{criterion_group, Criterion};
use data_pipeline::span_concentrator::SpanConcentrator;
use datadog_trace_protobuf::pb;

fn get_bucket_start(now: SystemTime, n: u64) -> i64 {
let start = now.duration_since(time::UNIX_EPOCH).unwrap() + Duration::from_secs(10 * n);
start.as_nanos() as i64
}

fn get_span(now: SystemTime, trace_id: u64, span_id: u64) -> pb::Span {
let mut metrics = HashMap::from([("_dd.measured".to_string(), 1.0)]);
if span_id == 1 {
metrics.insert("_dd.top_level".to_string(), 1.0);
}
let mut meta = HashMap::from([("db_name".to_string(), "postgres".to_string())]);
if span_id % 3 == 0 {
meta.insert("bucket_s3".to_string(), "aws_bucket".to_string());
}
pb::Span {
trace_id,
span_id,
service: "test-service".to_string(),
name: "test-name".to_string(),
resource: format!("test-{trace_id}"),
error: (span_id % 2) as i32,
metrics,
meta,
parent_id: span_id - 1,
start: get_bucket_start(now, trace_id),
duration: span_id as i64 % Duration::from_secs(10).as_nanos() as i64,
..Default::default()
}
}

pub fn criterion_benchmark(c: &mut Criterion) {
let mut group = c.benchmark_group("concentrator");
let now = SystemTime::now() - Duration::from_secs(10 * 100);
let concentrator = SpanConcentrator::new(
Duration::from_secs(10),
now,
true,
true,
vec!["db_name".to_string(), "bucket_s3".to_string()],
);
let mut spans = vec![];
for trace_id in 1..100 {
for span_id in 1..100 {
spans.push(get_span(now, trace_id, span_id));
}
}
group.bench_function("add_spans_to_concentrator", |b| {
b.iter_batched_ref(
|| (concentrator.clone(), spans.clone()),
|data| {
let concentrator = &mut data.0;
let spans = &data.1;
for span in spans {
concentrator.add_span(span);
}
},
criterion::BatchSize::LargeInput,
);
});
}
criterion_group!(benches, criterion_benchmark);
2 changes: 1 addition & 1 deletion data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

mod span_concentrator;
pub mod span_concentrator;
pub mod trace_exporter;
4 changes: 2 additions & 2 deletions data-pipeline/src/span_concentrator/aggregation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ fn get_peer_tags(span: &pb::Span, peer_tag_keys: &[String]) -> Vec<Tag> {
}

/// The stats computed from a group of span with the same AggregationKey
#[derive(Debug, Default)]
#[derive(Debug, Default, Clone)]
pub(super) struct GroupedStats {
hits: u64,
errors: u64,
Expand Down Expand Up @@ -140,7 +140,7 @@ impl GroupedStats {

/// A time bucket used for stats aggregation. It stores a map of GroupedStats storing the stats of
/// spans aggregated on their AggregationKey.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub(super) struct StatsBucket {
data: HashMap<AggregationKey, GroupedStats>,
start: u64,
Expand Down
2 changes: 1 addition & 1 deletion data-pipeline/src/span_concentrator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ fn should_ignore_span(span: &pb::Span, compute_stats_by_span_kind: bool) -> bool
/// When the SpanConcentrator is flushed it keeps the `buffer_len` most recent buckets and remove
/// all older buckets returning their content. When using force flush all buckets are flushed
/// regardless of their age.
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct SpanConcentrator {
/// Size of the time buckets used for aggregation in nanos
bucket_size: u64,
Expand Down

0 comments on commit c035537

Please sign in to comment.