Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an OpenTelemetry receiver source. #167

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "lib/otel-protos/proto"]
path = lib/otel-protos/proto
url = https://github.com/open-telemetry/opentelemetry-proto.git
41 changes: 41 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"lib/datadog-protos",
"lib/ddsketch-agent",
"lib/memory-accounting",
"lib/otel-protos",
"lib/process-memory",
"lib/saluki-api",
"lib/saluki-app",
Expand Down Expand Up @@ -31,6 +32,7 @@ repository = "https://github.com/DataDog/saluki"
datadog-protos = { path = "lib/datadog-protos" }
ddsketch-agent = { path = "lib/ddsketch-agent" }
memory-accounting = { path = "lib/memory-accounting" }
otel-protos = { path = "lib/otel-protos" }
process-memory = { path = "lib/process-memory" }
saluki-api = { path = "lib/saluki-api" }
saluki-app = { path = "lib/saluki-app" }
Expand Down Expand Up @@ -122,6 +124,7 @@ smallvec = { version = "1.13", default-features = false }
windows-sys = { version = "0.52", default-features = false }
cgroupfs = { version = "0.8", default-features = false }
rustls-native-certs = { version = "0.7", default-features = false }
hashbrown = { version = "0.14.5", default-features = false }

[patch.crates-io]
# Git dependency for `containerd-client` to:
Expand Down
1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ tokio-rustls,https://github.com/rustls/tokio-rustls,MIT OR Apache-2.0,The tokio-
toml,https://github.com/toml-rs/toml,MIT OR Apache-2.0,Alex Crichton <alex@alexcrichton.com>
toml_edit,https://github.com/toml-rs/toml,MIT OR Apache-2.0,"Andronik Ordian <write@reusable.software>, Ed Page <eopage@gmail.com>"
tonic,https://github.com/hyperium/tonic,MIT,Lucio Franco <luciofranco14@gmail.com>
tonic-web,https://github.com/hyperium/tonic,MIT,Juan Alvarez <alce@me.com>
tower,https://github.com/tower-rs/tower,MIT,Tower Maintainers <team@tower-rs.com>
tower-http,https://github.com/tower-rs/tower-http,MIT,Tower Maintainers <team@tower-rs.com>
tracing,https://github.com/tokio-rs/tracing,MIT,"Eliza Weisman <eliza@buoyant.io>, Tokio Contributors <team@tokio.rs>"
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ test: ## Runs all unit tests
test-docs: check-rust-build-tools
test-docs: ## Runs all doctests
@echo "[*] Running doctests..."
cargo test --workspace --exclude datadog-protos --doc
cargo test --workspace --exclude datadog-protos --exclude otel-protos --doc

.PHONY: test-miri
test-miri: check-rust-build-tools ensure-rust-miri
Expand Down
16 changes: 16 additions & 0 deletions lib/otel-protos/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "otel-protos"
version = "0.1.0"
edition = { workspace = true }
license = { workspace = true }
repository = { workspace = true }

[dependencies]
bytes = { workspace = true }
prost = { workspace = true, features = ["derive", "std"] }
protobuf = { workspace = true }
tonic = { workspace = true, features = ["codegen", "prost"] }

[build-dependencies]
protobuf-codegen = { workspace = true }
tonic-build = { workspace = true, features = ["prost"] }
20 changes: 20 additions & 0 deletions lib/otel-protos/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
fn main() {
// Always rerun if the build script itself changes.
println!("cargo:rerun-if-changed=build.rs");
println!("cargo:rerun-if-changed=proto/opentelemetry/proto");

// Handle code generation for gRPC service definitions.
tonic_build::configure()
.build_server(true)
.build_client(false)
.include_file("mod.rs")
.compile(
&[
"proto/opentelemetry/proto/collector/logs/v1/logs_service.proto",
"proto/opentelemetry/proto/collector/metrics/v1/metrics_service.proto",
"proto/opentelemetry/proto/collector/trace/v1/trace_service.proto",
],
&["proto"],
)
.expect("failed to build gRPC service definitions for DCA")
}
1 change: 1 addition & 0 deletions lib/otel-protos/proto
Submodule proto added at 9833c4
12 changes: 12 additions & 0 deletions lib/otel-protos/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//! OpenTelemetry Protocol Buffers definitions.
//!
//! This crate contains generated code based on the Protocol Buffers definitions used by
//! OpenTelemetry.
#![deny(warnings)]
#![allow(clippy::enum_variant_names)]
#![allow(clippy::doc_lazy_continuation)]
mod include {
include!(concat!(env!("OUT_DIR"), "/mod.rs"));
}

pub use self::include::opentelemetry::proto::*;
5 changes: 5 additions & 0 deletions lib/saluki-components/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bytesize = { workspace = true }
datadog-protos = { workspace = true }
ddsketch-agent = { workspace = true }
futures = { workspace = true }
hashbrown = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
http-body-util = { workspace = true }
Expand All @@ -27,6 +28,7 @@ memory-accounting = { workspace = true }
metrics = { workspace = true }
metrics-util = { workspace = true, features = ["handles", "recency", "registry"] }
nom = { workspace = true }
otel-protos = { workspace = true }
paste = { workspace = true }
pin-project = { workspace = true }
protobuf = { workspace = true }
Expand All @@ -42,10 +44,13 @@ saluki-io = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
slab = { workspace = true }
smallvec = { workspace = true }
snafu = { workspace = true }
stringtheory = { workspace = true }
tokio = { workspace = true, features = ["io-util", "macros", "net", "rt", "rt-multi-thread", "signal", "sync"] }
tokio-util = { workspace = true }
tonic = { workspace = true }
tonic-web = { version = "0.12", default-features = false }
tower = { workspace = true, features = ["timeout", "util"] }
tracing = { workspace = true }
url = { workspace = true }
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
use std::{io, time::Duration};
use std::io;

use datadog_protos::metrics::{self as proto, Resource};
use http::{Method, Request, Uri};
use protobuf::CodedOutputStream;
use saluki_core::pooling::ObjectPool;
use saluki_env::time::get_unix_timestamp;
use saluki_event::metric::*;
use saluki_io::{
buf::{ChunkedBuffer, ChunkedBufferObjectPool, ReadWriteIoBuffer},
Expand Down Expand Up @@ -85,12 +84,11 @@ impl MetricsEndpoint {

/// Gets the endpoint for the given metric.
pub fn from_metric(metric: &Metric) -> Self {
match metric.value() {
MetricValue::Counter { .. }
| MetricValue::Rate { .. }
| MetricValue::Gauge { .. }
| MetricValue::Set { .. } => Self::Series,
MetricValue::Distribution { .. } => Self::Sketches,
match metric.values() {
MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
Self::Series
}
MetricValues::Distribution(..) => Self::Sketches,
}
}

Expand Down Expand Up @@ -159,7 +157,7 @@ where
let endpoint = MetricsEndpoint::from_metric(&metric);
if endpoint != self.endpoint {
return Err(RequestBuilderError::InvalidMetricForEndpoint {
metric_type: metric.value().as_str(),
metric_type: metric.values().as_str(),
endpoint: self.endpoint,
});
}
Expand Down Expand Up @@ -333,12 +331,11 @@ impl EncodedMetric {
}

fn encode_single_metric(metric: &Metric) -> EncodedMetric {
match metric.value() {
MetricValue::Counter { .. }
| MetricValue::Rate { .. }
| MetricValue::Gauge { .. }
| MetricValue::Set { .. } => EncodedMetric::Series(encode_series_metric(metric)),
MetricValue::Distribution { .. } => EncodedMetric::Sketch(encode_sketch_metric(metric)),
match metric.values() {
MetricValues::Counter(..) | MetricValues::Rate(..) | MetricValues::Gauge(..) | MetricValues::Set(..) => {
EncodedMetric::Series(encode_series_metric(metric))
}
MetricValues::Distribution(..) => EncodedMetric::Sketch(encode_sketch_metric(metric)),
}
}

Expand Down Expand Up @@ -390,24 +387,32 @@ fn encode_series_metric(metric: &Metric) -> proto::MetricSeries {
}
}

let (metric_type, metric_value, maybe_interval) = match metric.value() {
MetricValue::Counter { value } => (proto::MetricType::COUNT, *value, None),
MetricValue::Rate { value, interval } => (proto::MetricType::RATE, *value, Some(duration_to_secs(*interval))),
MetricValue::Gauge { value } => (proto::MetricType::GAUGE, *value, None),
MetricValue::Set { values } => (proto::MetricType::GAUGE, values.len() as f64, None),
let (metric_type, points, maybe_interval) = match metric.values() {
MetricValues::Counter(points) => (proto::MetricType::COUNT, points.into_iter(), None),
MetricValues::Rate(points, interval) => (proto::MetricType::RATE, points.into_iter(), Some(interval)),
MetricValues::Gauge(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
MetricValues::Set(points) => (proto::MetricType::GAUGE, points.into_iter(), None),
_ => unreachable!(),
};

series.set_type(metric_type);

let mut point = proto::MetricPoint::new();
point.set_value(metric_value);
point.set_timestamp(metric.metadata().timestamp().unwrap_or_else(get_unix_timestamp) as i64);
for (timestamp, value) in points {
// If this is a rate metric, scale our value by the interval, in seconds.
let value = maybe_interval
.map(|interval| value / interval.as_secs_f64())
.unwrap_or(value);
let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;

series.mut_points().push(point);
let mut point = proto::MetricPoint::new();
point.set_value(value);
point.set_timestamp(timestamp);

series.mut_points().push(point);
}

if let Some(interval) = maybe_interval {
series.set_interval(interval);
series.set_interval(interval.as_secs() as i64);
}

series
Expand Down Expand Up @@ -448,15 +453,20 @@ fn encode_sketch_metric(metric: &Metric) -> proto::Sketch {
));
}

let ddsketch = match metric.value() {
MetricValue::Distribution { sketch: ddsketch } => ddsketch,
let sketches = match metric.values() {
MetricValues::Distribution(sketches) => sketches,
_ => unreachable!(),
};

let mut dogsketch = proto::Dogsketch::new();
dogsketch.set_ts(metric.metadata().timestamp().unwrap_or_else(get_unix_timestamp) as i64);
ddsketch.merge_to_dogsketch(&mut dogsketch);
sketch.mut_dogsketches().push(dogsketch);
for (timestamp, value) in sketches {
let timestamp = timestamp.map(|ts| ts.get()).unwrap_or(0) as i64;

let mut dogsketch = proto::Dogsketch::new();
dogsketch.set_ts(timestamp);
value.merge_to_dogsketch(&mut dogsketch);

sketch.mut_dogsketches().push(dogsketch);
}

sketch
}
Expand All @@ -472,7 +482,3 @@ fn origin_metadata_to_proto_metadata(product: u32, subproduct: u32, product_deta
proto_origin.set_origin_service(product_detail);
metadata
}

fn duration_to_secs(duration: Duration) -> i64 {
duration.as_secs_f64() as i64
}
Loading