Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(jaeger): better configuration pipeline. #748

Merged
merged 7 commits into from
Mar 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/actix-http/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use opentelemetry::{
};

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_pipeline()
.with_collector_endpoint("http://127.0.0.1:14268/api/traces")
opentelemetry_jaeger::new_collector_pipeline()
.with_endpoint("http://127.0.0.1:14268/api/traces")
.with_service_name("trace-http-demo")
.install_batch(opentelemetry::runtime::TokioCurrentThread)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/actix-udp/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use opentelemetry::{
};

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_pipeline()
.with_agent_endpoint("localhost:6831")
opentelemetry_jaeger::new_agent_pipeline()
.with_endpoint("localhost:6831")
.with_service_name("trace-udp-demo")
.with_trace_config(opentelemetry::sdk::trace::config().with_resource(
opentelemetry::sdk::Resource::new(vec![
Expand Down
2 changes: 1 addition & 1 deletion examples/async/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn run(addr: &SocketAddr) -> io::Result<usize> {
}

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_pipeline()
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("trace-demo")
.install_batch(opentelemetry::runtime::Tokio)
}
Expand Down
2 changes: 1 addition & 1 deletion examples/basic/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use std::error::Error;
use std::time::Duration;

fn init_tracer() -> Result<sdktrace::Tracer, TraceError> {
opentelemetry_jaeger::new_pipeline()
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("trace-demo")
.with_trace_config(Config::default().with_resource(Resource::new(vec![
KeyValue::new("service.name", "new_service"),
Expand Down
2 changes: 1 addition & 1 deletion examples/grpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub mod hello_world {

fn tracing_init() -> TraceResult<Tracer> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_pipeline()
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("grpc-client")
.install_simple()
}
Expand Down
2 changes: 1 addition & 1 deletion examples/grpc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl Greeter for MyGreeter {

fn tracing_init() -> Result<impl Tracer, TraceError> {
global::set_text_map_propagator(TraceContextPropagator::new());
opentelemetry_jaeger::new_pipeline()
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("grpc-server")
.install_simple()
}
Expand Down
4 changes: 2 additions & 2 deletions examples/multiple-span-processors/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use std::time::Duration;
fn init_tracer() -> Result<(), TraceError> {
// build a jaeger batch span processor
let jaeger_processor = BatchSpanProcessor::builder(
opentelemetry_jaeger::new_pipeline()
opentelemetry_jaeger::new_agent_pipeline()
.with_service_name("trace-demo")
.with_trace_config(
Config::default()
.with_resource(Resource::new(vec![KeyValue::new("exporter", "jaeger")])),
)
.init_async_exporter(opentelemetry::runtime::Tokio)?,
.build_async_agent_exporter(opentelemetry::runtime::Tokio)?,
opentelemetry::runtime::Tokio,
)
.build();
Expand Down
12 changes: 6 additions & 6 deletions opentelemetry-jaeger/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ use opentelemetry::trace::Tracer;

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
global::set_text_map_propagator(opentelemetry_jaeger::Propagator::new());
let tracer = opentelemetry_jaeger::new_pipeline().install_simple()?;
let tracer = opentelemetry_jaeger::new_agent_pipeline().install_simple()?;

tracer.in_span("doing_work", |cx| {
// Traced app logic here...
Expand Down Expand Up @@ -76,7 +76,7 @@ opentelemetry-jaeger = { version = "*", features = ["rt-tokio"] }
```

```rust
let tracer = opentelemetry_jaeger::new_pipeline()
let tracer = opentelemetry_jaeger::new_agent_pipeline()
.install_batch(opentelemetry::runtime::Tokio)?;
```

Expand Down Expand Up @@ -120,11 +120,11 @@ Then you can use the [`with_collector_endpoint`] method to specify the endpoint:
use opentelemetry::trace::Tracer;

fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync + 'static>> {
let tracer = opentelemetry_jaeger::new_pipeline()
.with_collector_endpoint("http://localhost:14268/api/traces")
let tracer = opentelemetry_jaeger::new_collector_pipeline()
.with_endpoint("http://localhost:14268/api/traces")
// optionally set username and password as well.
.with_collector_username("username")
.with_collector_password("s3cr3t")
.with_username("username")
.with_password("s3cr3t")
.install_batch()?;

tracer.in_span("doing_work", |cx| {
Expand Down
9 changes: 2 additions & 7 deletions opentelemetry-jaeger/src/exporter/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,6 @@ use thrift::{
transport::{ReadHalf, TIoChannel, WriteHalf},
};

/// The max size of UDP packet we want to send, synced with jaeger-agent
const UDP_PACKET_MAX_LENGTH: usize = 65_000;

struct BufferClient {
buffer: ReadHalf<TBufferChannel>,
client: agent::AgentSyncClient<
Expand Down Expand Up @@ -47,10 +44,9 @@ impl AgentSyncClientUdp {
/// Create a new UDP agent client
pub(crate) fn new<T: ToSocketAddrs>(
host_port: T,
max_packet_size: Option<usize>,
max_packet_size: usize,
auto_split: bool,
) -> thrift::Result<Self> {
let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH);
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
let client = agent::AgentSyncClient::new(
TCompactInputProtocol::new(TNoopChannel),
Expand Down Expand Up @@ -106,11 +102,10 @@ impl<R: JaegerTraceRuntime> AgentAsyncClientUdp<R> {
/// Create a new UDP agent client
pub(crate) fn new<T: ToSocketAddrs>(
host_port: T,
max_packet_size: Option<usize>,
max_packet_size: usize,
runtime: R,
auto_split: bool,
) -> thrift::Result<Self> {
let max_packet_size = max_packet_size.unwrap_or(UDP_PACKET_MAX_LENGTH);
let (buffer, write) = TBufferChannel::with_capacity(max_packet_size).split()?;
let client = agent::AgentSyncClient::new(
TCompactInputProtocol::new(TNoopChannel),
Expand Down
58 changes: 33 additions & 25 deletions opentelemetry-jaeger/src/exporter/collector.rs
Original file line number Diff line number Diff line change
@@ -1,26 +1,14 @@
//! # HTTP Jaeger Collector Client
//!
#[cfg(feature = "collector_client")]
use http::Uri;
#[cfg(feature = "collector_client")]
use opentelemetry_http::{HttpClient, ResponseExt as _};
use std::sync::atomic::AtomicUsize;

/// `CollectorAsyncClientHttp` implements an async version of the
/// `TCollectorSyncClient` interface over HTTP
#[derive(Debug)]
pub(crate) struct CollectorAsyncClientHttp {
endpoint: Uri,
#[cfg(feature = "collector_client")]
client: Box<dyn HttpClient>,
#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))]
client: WasmHttpClient,
payload_size_estimate: AtomicUsize,
}

#[cfg(feature = "collector_client")]
pub(crate) use collector_client::AsyncHttpClient;
#[cfg(feature = "wasm_collector_client")]
#[derive(Debug)]
struct WasmHttpClient {
_auth: Option<String>,
}
pub(crate) use wasm_collector_client::WasmCollector;

#[cfg(feature = "collector_client")]
mod collector_client {
Expand All @@ -31,14 +19,23 @@ mod collector_client {
use std::sync::atomic::{AtomicUsize, Ordering};
use thrift::protocol::TBinaryOutputProtocol;

impl CollectorAsyncClientHttp {
/// `AsyncHttpClient` implements an async version of the
/// `TCollectorSyncClient` interface over HTTP
#[derive(Debug)]
pub(crate) struct AsyncHttpClient {
endpoint: Uri,
http_client: Box<dyn HttpClient>,
payload_size_estimate: AtomicUsize,
}

impl AsyncHttpClient {
/// Create a new HTTP collector client
pub(crate) fn new(endpoint: Uri, client: Box<dyn HttpClient>) -> Self {
let payload_size_estimate = AtomicUsize::new(512);

CollectorAsyncClientHttp {
AsyncHttpClient {
endpoint,
client,
http_client: client,
payload_size_estimate,
}
}
Expand Down Expand Up @@ -68,15 +65,14 @@ mod collector_client {
.expect("request should always be valid");

// Send request to collector
let _ = self.client.send(req).await?.error_for_status()?;
let _ = self.http_client.send(req).await?.error_for_status()?;
Ok(())
}
}
}

#[cfg(all(feature = "wasm_collector_client", not(feature = "collector_client")))]
#[cfg(feature = "wasm_collector_client")]
mod wasm_collector_client {
use super::*;
use crate::exporter::thrift::jaeger;
use futures_util::future;
use http::Uri;
Expand All @@ -91,7 +87,19 @@ mod wasm_collector_client {
use wasm_bindgen_futures::JsFuture;
use web_sys::{Request, RequestCredentials, RequestInit, RequestMode, Response};

impl CollectorAsyncClientHttp {
#[derive(Debug)]
pub(crate) struct WasmCollector {
endpoint: Uri,
payload_size_estimate: AtomicUsize,
client: WasmHttpClient,
}

#[derive(Debug, Default)]
struct WasmHttpClient {
auth: Option<String>,
}

impl WasmCollector {
/// Create a new HTTP collector client
pub(crate) fn new(
endpoint: Uri,
Expand All @@ -111,7 +119,7 @@ mod wasm_collector_client {

Ok(Self {
endpoint,
client: WasmHttpClient { _auth: auth },
client: WasmHttpClient { auth },
payload_size_estimate,
})
}
Expand Down
Loading