Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Aug 29, 2024
1 parent 649f7e9 commit abd8868
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 54 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions LICENSE-3rdparty.csv
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ tokio-rustls,https://github.com/rustls/tokio-rustls,MIT OR Apache-2.0,The tokio-
toml,https://github.com/toml-rs/toml,MIT OR Apache-2.0,Alex Crichton <alex@alexcrichton.com>
toml_edit,https://github.com/toml-rs/toml,MIT OR Apache-2.0,"Andronik Ordian <write@reusable.software>, Ed Page <eopage@gmail.com>"
tonic,https://github.com/hyperium/tonic,MIT,Lucio Franco <luciofranco14@gmail.com>
tonic-web,https://github.com/hyperium/tonic,MIT,Juan Alvarez <alce@me.com>
tower,https://github.com/tower-rs/tower,MIT,Tower Maintainers <team@tower-rs.com>
tower-http,https://github.com/tower-rs/tower-http,MIT,Tower Maintainers <team@tower-rs.com>
tracing,https://github.com/tokio-rs/tracing,MIT,"Eliza Weisman <eliza@buoyant.io>, Tokio Contributors <team@tokio.rs>"
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ test: ## Runs all unit tests
test-docs: check-rust-build-tools
test-docs: ## Runs all doctests
@echo "[*] Running doctests..."
cargo test --workspace --exclude datadog-protos --doc
cargo test --workspace --exclude datadog-protos --exclude otel-protos --doc

.PHONY: test-miri
test-miri: check-rust-build-tools ensure-rust-miri
Expand Down
25 changes: 1 addition & 24 deletions lib/otel-protos/build.rs
Original file line number Diff line number Diff line change
@@ -1,36 +1,13 @@
fn main() {
// Always rerun if the build script itself changes.
println!("cargo:rerun-if-changed=build.rs");

println!("cargo:rerun-if-changed=proto/opentelemetry/proto");

// Handle code generation for pure Protocol Buffers message types.
let codegen_customize = protobuf_codegen::Customize::default()
.tokio_bytes(true)
.tokio_bytes_for_string(true)
.generate_accessors(true)
.gen_mod_rs(true)
.lite_runtime(true);

protobuf_codegen::Codegen::new()
.protoc()
.includes(["proto"])
.inputs([
"proto/opentelemetry/proto/common/v1/common.proto",
"proto/opentelemetry/proto/resource/v1/resource.proto",
"proto/opentelemetry/proto/logs/v1/logs.proto",
"proto/opentelemetry/proto/metrics/v1/metrics.proto",
"proto/opentelemetry/proto/trace/v1/trace.proto",
])
.cargo_out_dir("protos")
.customize(codegen_customize)
.run_from_script();

// Handle code generation for gRPC service definitions.
tonic_build::configure()
.build_server(true)
.build_client(false)
.include_file("api.mod.rs")
.include_file("mod.rs")
.compile(
&[
"proto/opentelemetry/proto/collector/logs/v1/logs_service.proto",
Expand Down
15 changes: 3 additions & 12 deletions lib/otel-protos/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,9 @@
//! OpenTelemetry.
#![deny(warnings)]
#![allow(clippy::enum_variant_names)]
#![allow(clippy::doc_lazy_continuation)]
mod include {
include!(concat!(env!("OUT_DIR"), "/protos/mod.rs"));
include!(concat!(env!("OUT_DIR"), "/mod.rs"));
}

mod api {
include!(concat!(env!("OUT_DIR"), "/api.mod.rs"));
}

pub use self::include::logs;
pub use self::include::metrics;
pub use self::include::trace;

pub mod collector {
pub use super::api::opentelemetry::proto::collector::*;
}
pub use self::include::opentelemetry::proto::*;
200 changes: 187 additions & 13 deletions lib/saluki-components/src/sources/opentelemetry/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
use std::sync::LazyLock;
use std::sync::{Arc, LazyLock};

use async_trait::async_trait;
use memory_accounting::{MemoryBounds, MemoryBoundsBuilder};
use otel_protos::{
collector::{
logs::v1::{logs_service_server::*, *},
metrics::v1::{metrics_service_server::*, *},
trace::v1::{trace_service_server::*, *},
},
logs::v1::{LogRecord, ResourceLogs},
metrics::v1::{Metric as OtelMetric, ResourceMetrics},
trace::v1::{ResourceSpans, Span as OtelSpan},
};
use saluki_config::GenericConfiguration;
use saluki_core::{components::sources::*, topology::OutputDefinition};
use saluki_core::{
components::sources::*,
pooling::ObjectPool as _,
spawn_traced,
topology::{shutdown::DynamicShutdownCoordinator, OutputDefinition},
};
use saluki_error::GenericError;
use saluki_event::DataType;
use saluki_event::{DataType, Event};
use saluki_io::net::{listener::ConnectionOrientedListener, GrpcListenAddress, ListenAddress};
use serde::Deserialize;
use tracing::info;
use tonic::{transport::Server, Request, Response, Status};
use tonic_web::GrpcWebLayer;
use tower::util::option_layer;
use tracing::{error, info};

fn default_listen_addresses() -> Vec<GrpcListenAddress> {
vec![GrpcListenAddress::Binary(
ListenAddress::Tcp(([127, 0, 0, 1], 8080).into()),
)]
vec![GrpcListenAddress::Binary(ListenAddress::Tcp(
([127, 0, 0, 1], 8080).into(),
))]
}

/// OpenTelemetry source.
Expand Down Expand Up @@ -44,13 +62,13 @@ impl SourceBuilder for OpenTelemetryConfiguration {
async fn build(&self) -> Result<Box<dyn Source + Send>, GenericError> {
let mut listeners = Vec::with_capacity(self.listen_addresses.len());
for listen_address in &self.listen_addresses {
let (service_type, address) = match listen_address {
GrpcListenAddress::Binary(addr) => ("grpc", addr),
GrpcListenAddress::Web(addr) => ("http", addr),
let (is_grpc_web, address) = match listen_address {
GrpcListenAddress::Binary(addr) => (false, addr),
GrpcListenAddress::Web(addr) => (true, addr),
};

let listener = ConnectionOrientedListener::from_listen_address(address.clone()).await?;
listeners.push((service_type, listener));
listeners.push((is_grpc_web, listener));
}

Ok(Box::new(OpenTelemetry { listeners }))
Expand All @@ -70,11 +88,13 @@ impl SourceBuilder for OpenTelemetryConfiguration {
}

impl MemoryBounds for OpenTelemetryConfiguration {
fn specify_bounds(&self, builder: &mut MemoryBoundsBuilder) {}
fn specify_bounds(&self, _builder: &mut MemoryBoundsBuilder) {
todo!()
}
}

pub struct OpenTelemetry {
listeners: Vec<(&'static str, ConnectionOrientedListener)>,
listeners: Vec<(bool, ConnectionOrientedListener)>,
}

#[async_trait]
Expand All @@ -84,14 +104,168 @@ impl Source for OpenTelemetry {
.take_shutdown_handle()
.expect("should never fail to take shutdown handle");

// For each listener, create a corresponding gRPC server (with the appropriate gRPC-Web support, if enabled) to
// go with it, which will accept logs, metrics, and traces. Each server gets spawned as a dedicated task.
let mut shutdown_coordinator = DynamicShutdownCoordinator::default();
let server_inner = Arc::new(ServerInner {
context: context.clone(),
});

for (is_grpc_web, listener) in self.listeners {
let shutdown_handle = shutdown_coordinator.register();
let server = Server::builder()
.layer(option_layer(is_grpc_web.then(GrpcWebLayer::new)))
.add_service(LogsServiceServer::from_arc(Arc::clone(&server_inner)))
.add_service(MetricsServiceServer::from_arc(Arc::clone(&server_inner)))
.add_service(TraceServiceServer::from_arc(Arc::clone(&server_inner)));

spawn_traced(async move {
if let Err(e) = server.serve_with_incoming_shutdown(listener, shutdown_handle).await {
error!(error = %e, "Error serving OpenTelemetry collector endpoint.");
}
});
}

info!("OpenTelemetry source started.");

// Wait for the global shutdown signal, then notify listeners to shutdown.
global_shutdown.await;

info!("Stopping OpenTelemetry source...");
shutdown_coordinator.shutdown().await;

info!("OpenTelemetry source stopped.");

Ok(())
}
}

struct ServerInner {
context: SourceContext,
}

#[async_trait]
impl LogsService for ServerInner {
async fn export(
&self, request: Request<ExportLogsServiceRequest>,
) -> Result<Response<ExportLogsServiceResponse>, Status> {
// Convert each OTLP log into our internal representation, and write them to the event buffer.
let inner = request.into_inner();

let mut event_buffer = self.context.event_buffer_pool().acquire().await;
event_buffer.extend(inner.resource_logs.into_iter().flat_map(otel_logs_event_iter));
let logs_count = event_buffer.len();

// Try forwarding the event buffer, returning an error if we failed to do so.
match self.context.forwarder().forward_named("logs", event_buffer).await {
// TODO: If/when we make this bounded, we'll want to handle partial success here.
Ok(()) => Ok(Response::new(ExportLogsServiceResponse { partial_success: None })),
Err(e) => {
error!(%logs_count, error = %e, "Error forwarding logs.");
Err(Status::data_loss(format!(
"Failed to forward {} log(s) to downstream component(s).",
logs_count
)))
}
}
}
}

#[async_trait]
impl MetricsService for ServerInner {
async fn export(
&self, request: Request<ExportMetricsServiceRequest>,
) -> Result<Response<ExportMetricsServiceResponse>, Status> {
// Convert each OTLP metric into our internal representation, and write them to the event buffer.
let inner = request.into_inner();

let mut event_buffer = self.context.event_buffer_pool().acquire().await;
event_buffer.extend(inner.resource_metrics.into_iter().flat_map(otel_metrics_event_iter));
let metrics_count = event_buffer.len();

// Try forwarding the event buffer, returning an error if we failed to do so.
match self.context.forwarder().forward_named("metrics", event_buffer).await {
// TODO: If/when we make this bounded, we'll want to handle partial success here.
Ok(()) => Ok(Response::new(ExportMetricsServiceResponse { partial_success: None })),
Err(e) => {
error!(%metrics_count, error = %e, "Error forwarding metrics.");
Err(Status::data_loss(format!(
"Failed to forward {} metric(s) to downstream component(s).",
metrics_count
)))
}
}
}
}

#[async_trait]
impl TraceService for ServerInner {
async fn export(
&self, request: Request<ExportTraceServiceRequest>,
) -> Result<Response<ExportTraceServiceResponse>, Status> {
// Convert each OTLP trace into our internal representation, and write them to the event buffer.
let inner = request.into_inner();

let mut event_buffer = self.context.event_buffer_pool().acquire().await;
event_buffer.extend(inner.resource_spans.into_iter().flat_map(otel_traces_event_iter));
let traces_count = event_buffer.len();

// Try forwarding the event buffer, returning an error if we failed to do so.
match self.context.forwarder().forward_named("traces", event_buffer).await {
// TODO: If/when we make this bounded, we'll want to handle partial success here.
Ok(()) => Ok(Response::new(ExportTraceServiceResponse { partial_success: None })),
Err(e) => {
error!(%traces_count, error = %e, "Error forwarding traces.");
Err(Status::data_loss(format!(
"Failed to forward {} trace(s) to downstream component(s).",
traces_count
)))
}
}
}
}

fn otel_logs_event_iter(resource_logs: ResourceLogs) -> impl Iterator<Item = Event> {
// TODO: We probably need/want to utilize the resource data, as well as the scope data, when generating the final
// event, but we're mostly just trying to get the basic structure laid out here first.

resource_logs
.scope_logs
.into_iter()
.flat_map(|sl| sl.log_records.into_iter())
.map(otel_log_to_event)
}

fn otel_log_to_event(_log_record: LogRecord) -> Event {
todo!()
}

fn otel_metrics_event_iter(resource_metrics: ResourceMetrics) -> impl Iterator<Item = Event> {
// TODO: We probably need/want to utilize the resource data, as well as the scope data, when generating the final
// event, but we're mostly just trying to get the basic structure laid out here first.

resource_metrics
.scope_metrics
.into_iter()
.flat_map(|sm| sm.metrics.into_iter())
.flat_map(otel_metric_to_event)
}

fn otel_metric_to_event(_metric: OtelMetric) -> Vec<Event> {
todo!()
}

fn otel_traces_event_iter(resource_spans: ResourceSpans) -> impl Iterator<Item = Event> {
// TODO: We probably need/want to utilize the resource data, as well as the scope data, when generating the final
// event, but we're mostly just trying to get the basic structure laid out here first.

resource_spans
.scope_spans
.into_iter()
.flat_map(|ss| ss.spans.into_iter())
.map(otel_trace_to_event)
}

fn otel_trace_to_event(_span: OtelSpan) -> Event {
todo!()
}
3 changes: 1 addition & 2 deletions lib/saluki-event/src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,4 @@

/// A trace.
#[derive(Clone, Debug)]
pub struct Trace {
}
pub struct Trace {}
2 changes: 2 additions & 0 deletions lib/saluki-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ bitmask-enum = { workspace = true }
bytes = { workspace = true }
datadog-protos = { workspace = true }
ddsketch-agent = { workspace = true }
futures = { workspace = true }
http = { workspace = true }
http-body = { workspace = true }
hyper = { workspace = true, features = ["client", "server"] }
Expand Down Expand Up @@ -46,6 +47,7 @@ socket2 = { workspace = true }
stringtheory = { workspace = true }
tokio = { workspace = true, features = ["fs", "io-util", "macros", "net", "rt", "rt-multi-thread", "signal", "sync"] }
tokio-util = { workspace = true }
tonic = { workspace = true, features = ["transport"] }
tower = { workspace = true }
tracing = { workspace = true }
url = { workspace = true }
Expand Down
3 changes: 2 additions & 1 deletion lib/saluki-io/src/net/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ impl<'a> TryFrom<&'a str> for GrpcListenAddress {
//
// If we have a host[:port], we use TCP. Otherwise, we use Unix domain sockets in stream mode.
let new_scheme = if url.host().is_some() { "tcp" } else { "unix" };
url.set_scheme(new_scheme).map_err(|_| format!("failed to set new scheme '{}'", new_scheme))?;
url.set_scheme(new_scheme)
.map_err(|_| format!("failed to set new scheme '{}'", new_scheme))?;

let listen_address = ListenAddress::try_from(url.as_str())?;

Expand Down
Loading

0 comments on commit abd8868

Please sign in to comment.