Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Switch the chunk processing to use a trait impl instead of dyn #534

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion data-pipeline/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use hyper::{Body, Client, Method, Uri};
use log::error;
use std::{borrow::Borrow, collections::HashMap, str::FromStr};
use tokio::runtime::Runtime;
use datadog_trace_utils::tracer_payload;

/// TraceExporterInputFormat represents the format of the input traces.
/// The input format can be either Proxy or V0.4, where V0.4 is the default.
Expand Down Expand Up @@ -226,7 +227,7 @@ impl TraceExporter {
let tracer_payload = trace_utils::collect_trace_chunks(
traces,
&header_tags,
|_chunk, _root_span_index| {},
&mut tracer_payload::DefaultTracerPayloadChunkProcessor,
self.endpoint.api_key.is_some(),
TraceEncoding::V07,
);
Expand Down
51 changes: 26 additions & 25 deletions sidecar/src/service/sidecar_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ use datadog_ipc::platform::{AsyncChannel, ShmHandle};
use datadog_ipc::tarpc;
use datadog_ipc::tarpc::context::Context;
use datadog_ipc::transport::Transport;
use datadog_trace_protobuf::pb;
use datadog_trace_utils::trace_utils;
use datadog_trace_utils::trace_utils::SendData;
use datadog_trace_utils::tracer_payload;
use datadog_trace_utils::tracer_payload::TraceEncoding;
use ddcommon::Endpoint;
use ddtelemetry::worker::{
Expand Down Expand Up @@ -88,6 +87,13 @@ pub struct SidecarServer {
pub submitted_payloads: Arc<AtomicU64>,
}

// struct SidecarTracerPayloadChunkProcessor;
//
// impl tracer_payload::TracerPayloadChunkProcessor for SidecarTracerPayloadChunkProcessor {
// fn process(&mut self, _chunk: &mut datadog_trace_protobuf::pb::TraceChunk, _index: usize) {
// }
// }

impl SidecarServer {
/// Accepts a new connection and starts processing requests.
///
Expand Down Expand Up @@ -230,7 +236,6 @@ impl SidecarServer {
) -> Option<AppInstance> {
let rt_info = self.get_runtime(instance_id);

// let (app_future, completer) = rt_info.get_app(service_name, env_name);
let manual_app_future = rt_info.get_app(service_name, env_name);

if manual_app_future.completer.is_none() {
Expand All @@ -254,7 +259,6 @@ impl SidecarServer {
.unwrap_or_else(ddtelemetry::config::Config::from_env);
config.restartable = true;

// TODO: APMSP-1076 - log errors
let instance_option = match builder.spawn_with_config(config.clone()).await {
Ok((handle, worker_join)) => {
info!("spawning telemetry worker {config:?}");
Expand Down Expand Up @@ -297,30 +301,27 @@ impl SidecarServer {
};

let size = data.len();
let traces: Vec<Vec<pb::Span>> = match rmp_serde::from_slice(data) {
Ok(res) => res,
Err(err) => {
error!("Error deserializing trace from request body: {err}");
return;
}
};

if traces.is_empty() {
error!("No traces deserialized from the request body.");
return;
}

let payload = trace_utils::collect_trace_chunks(
traces,
match tracer_payload::TracerPayloadParams::new(
data,
&headers,
|_chunk, _root_span_index| {},
&mut tracer_payload::DefaultTracerPayloadChunkProcessor,
target.api_key.is_some(),
TraceEncoding::V04,
);

// send trace payload to our trace flusher
let data = SendData::new(size, payload, headers, target);
self.trace_flusher.enqueue(data);
)
.try_into()
{
Ok(payload) => {
let data = SendData::new(size, payload, headers, target);
self.trace_flusher.enqueue(data);
}
Err(e) => {
error!(
"Failed to collect trace chunks from msgpack with error {:?}",
e
)
}
}
}

async fn compute_stats(&self) -> SidecarStats {
Expand Down Expand Up @@ -802,4 +803,4 @@ async fn session_interceptor(
}
}

// TODO: APMSP-1079 - Unit tests are sparse for the sidecar server. We should add more.
// TODO: APMSP-1079 - Unit tests are sparse for the sidecar server. We should add more.
4 changes: 3 additions & 1 deletion tools/docker/Dockerfile.build
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ RUN echo \
tools/src/bin/dedup_headers.rs \
trace-normalization/benches/normalization_utils.rs \
trace-obfuscation/benches/trace_obfuscation.rs \
trace-utils/benches/deserialization.rs \
trace-utils/benches/main.rs \
| xargs -n 1 sh -c 'mkdir -p $(dirname $1); touch $1; echo $1' create_stubs

# cache dependencies
Expand All @@ -157,4 +159,4 @@ RUN ./build-profiling-ffi.sh /build/output

FROM scratch as ffi_build_output

COPY --from=ffi_build /build/output/ ./
COPY --from=ffi_build /build/output/ ./
45 changes: 30 additions & 15 deletions trace-mini-agent/src/trace_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ use log::info;
use tokio::sync::mpsc::Sender;

use datadog_trace_obfuscation::obfuscate::obfuscate_span;
use datadog_trace_protobuf::pb;
use datadog_trace_utils::trace_utils::SendData;
use datadog_trace_utils::trace_utils::{self};
use datadog_trace_utils::tracer_payload::TraceEncoding;
use datadog_trace_utils::tracer_payload::TracerPayloadChunkProcessor;

use crate::{
config::Config,
Expand All @@ -31,9 +33,33 @@ pub trait TraceProcessor {
) -> http::Result<Response<Body>>;
}

struct ChunkProcessor {
config: Arc<Config>,
mini_agent_metadata: Arc<trace_utils::MiniAgentMetadata>,
}

impl TracerPayloadChunkProcessor for ChunkProcessor {
fn process(&mut self, chunk: &mut pb::TraceChunk, root_span_index: usize) {
trace_utils::set_serverless_root_span_tags(
&mut chunk.spans[root_span_index],
self.config.function_name.clone(),
&self.config.env_type,
);
for span in chunk.spans.iter_mut() {
trace_utils::enrich_span_with_mini_agent_metadata(span, &self.mini_agent_metadata);
trace_utils::enrich_span_with_azure_metadata(
span,
self.config.mini_agent_version.as_str(),
);
obfuscate_span(span, &self.config.obfuscation_config);
}
}
}
#[derive(Clone)]
pub struct ServerlessTraceProcessor {}



#[async_trait]
impl TraceProcessor for ServerlessTraceProcessor {
async fn process_traces(
Expand Down Expand Up @@ -71,20 +97,9 @@ impl TraceProcessor for ServerlessTraceProcessor {
let payload = trace_utils::collect_trace_chunks(
traces,
&tracer_header_tags,
|chunk, root_span_index| {
trace_utils::set_serverless_root_span_tags(
&mut chunk.spans[root_span_index],
config.function_name.clone(),
&config.env_type,
);
for span in chunk.spans.iter_mut() {
trace_utils::enrich_span_with_mini_agent_metadata(span, &mini_agent_metadata);
trace_utils::enrich_span_with_azure_metadata(
span,
config.mini_agent_version.as_str(),
);
obfuscate_span(span, &config.obfuscation_config);
}
&mut ChunkProcessor {
config: config.clone(),
mini_agent_metadata: mini_agent_metadata.clone(),
},
true, // In mini agent, we always send agentless
TraceEncoding::V07,
Expand Down Expand Up @@ -304,4 +319,4 @@ mod tests {

assert_eq!(expected_tracer_payload, received_payload.unwrap());
}
}
}
13 changes: 9 additions & 4 deletions trace-utils/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
[package]
name = "datadog-trace-utils"
authors = ["David Lee <david.lee@datadoghq.com>"]
edition.workspace = true
version.workspace = true
rust-version.workspace = true
license.workspace = true
autobenches = false

[lib]
bench = false

[[bench]]
name = "main"
harness = false

[dependencies]
anyhow = "1.0"
hyper = { version = "0.14", features = ["client", "server"] }
Expand All @@ -34,9 +38,10 @@ testcontainers = { version = "0.17.0", optional = true }
cargo_metadata = { version = "0.18.1", optional = true }

[dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
serde_json = "1.0"
criterion = "0.5.1"
httpmock = { version = "0.7.0"}
serde_json = "1.0"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }

[features]
test-utils = ["httpmock", "testcontainers", "cargo_metadata"]
test-utils = ["httpmock", "testcontainers", "cargo_metadata"]
63 changes: 63 additions & 0 deletions trace-utils/benches/deserialization.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use criterion::{criterion_group, Criterion};
use datadog_trace_utils::tracer_header_tags::TracerHeaderTags;
use datadog_trace_utils::tracer_payload::{
TraceEncoding, TracerPayloadCollection, TracerPayloadParams,
};
use serde_json::json;

pub fn deserialize_msgpack_to_internal(c: &mut Criterion) {
let span_data1 = json!([{
"service": "test-service",
"name": "test-service-name",
"resource": "test-service-resource",
"trace_id": 111,
"span_id": 222,
"parent_id": 100,
"start": 1,
"duration": 5,
"error": 0,
"meta": {},
"metrics": {},
}]);

let span_data2 = json!([{
"service": "test-service",
"name": "test-service-name",
"resource": "test-service-resource",
"trace_id": 111,
"span_id": 333,
"parent_id": 100,
"start": 1,
"duration": 5,
"error": 1,
"meta": {},
"metrics": {},
}]);

let data =
rmp_serde::to_vec(&vec![span_data1, span_data2]).expect("Failed to serialize test spans.");
let tracer_header_tags = &TracerHeaderTags::default();

c.bench_function(
"benching deserializing traces from msgpack to their internal representation ",
|b| {
b.iter(|| {
let params = TracerPayloadParams::new(
&data,
tracer_header_tags,
Box::new(|_chunk, _root_span_index| {}),
false,
TraceEncoding::V04,
);

let result: Result<TracerPayloadCollection, _> = params.try_into();
assert!(result.is_ok())
})
},
);
}

criterion_group!(benches, deserialize_msgpack_to_internal);
8 changes: 8 additions & 0 deletions trace-utils/benches/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

use criterion::criterion_main;

mod deserialization;

criterion_main!(deserialization::benches);
Loading
Loading