diff --git a/Cargo.lock b/Cargo.lock index 7d70a8526..ad2ebf9af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -805,6 +805,8 @@ dependencies = [ "http 0.2.12", "hyper 0.14.29", "mappable-rc", + "opentelemetry", + "opentelemetry-http", "p256", "paste", "prio", @@ -819,6 +821,7 @@ dependencies = [ "tokio", "tower", "tracing", + "tracing-opentelemetry", "tracing-subscriber", "url", "webpki", @@ -859,6 +862,8 @@ dependencies = [ "futures", "getrandom", "hex", + "opentelemetry", + "opentelemetry-http", "paste", "prio", "prometheus", @@ -871,6 +876,7 @@ dependencies = [ "serde_json", "tracing", "tracing-core", + "tracing-opentelemetry", "tracing-subscriber", "url", "worker", @@ -1273,6 +1279,12 @@ version = "0.29.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40ecd4077b5ae9fd2e9e169b102c6c330d0605168eb0e8bf79952b256dbefffd" +[[package]] +name = "glob" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" + [[package]] name = "group" version = "0.13.0" @@ -2007,6 +2019,61 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "opentelemetry" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b69a91d4893e713e06f724597ad630f1fa76057a5e1026c0ca67054a9032a76" +dependencies = [ + "futures-core", + "futures-sink", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", +] + +[[package]] +name = "opentelemetry-http" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0ba633e55c5ea6f431875ba55e71664f2fa5d3a90bd34ec9302eecc41c865dd" +dependencies = [ + "async-trait", + "bytes", + "http 0.2.12", + "opentelemetry", +] + +[[package]] +name = "opentelemetry_sdk" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae312d58eaa90a82d2e627fd86e075cf5230b3f11794e2ed74199ebbe572d4fd" +dependencies = [ + "async-trait", + "futures-channel", + "futures-executor", + "futures-util", + "glob", + "lazy_static", + "once_cell", + "opentelemetry", + "ordered-float", + "percent-encoding", + "rand", + "thiserror", +] + +[[package]] +name = "ordered-float" +version = "4.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a76df7075c7d4d01fdcb46c912dd17fba5b60c78ea480b475f2b6ab6f666584e" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.4.3" @@ -3562,6 +3629,24 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-opentelemetry" +version = "0.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f68803492bf28ab40aeccaecc7021096bd256baf7ca77c3d425d89b35a7be4e4" +dependencies = [ + "js-sys", + "once_cell", + "opentelemetry", + "opentelemetry_sdk", + "smallvec", + "tracing", + "tracing-core", + "tracing-log", + "tracing-subscriber", + "web-time", +] + [[package]] name = "tracing-serde" version = "0.1.3" @@ -3838,6 +3923,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a6580f308b1fad9207618087a65c04e7a10bc77e02c8e84e9b00dd4b12fa0bb" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki" version = "0.22.4" diff --git a/Cargo.toml b/Cargo.toml index d29b06c6c..1f462377d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,7 +53,10 @@ hpke-rs-rust-crypto = "0.2.0" http = "0.2" hyper = "0.14.29" itertools = "0.12.1" +mappable-rc = "0.1.1" matchit = "0.7.3" +opentelemetry = "0.23.0" +opentelemetry-http = "0.12.0" p256 = { version = "0.13.2", features = ["ecdsa-core", "ecdsa", "pem"] } paste = "1.0.15" pin-project = "1.1.5" @@ -75,6 +78,7 @@ tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] } tower = "0.4.13" tracing = "0.1.40" tracing-core = "0.1.32" +tracing-opentelemetry = "0.24.0" tracing-subscriber = "0.3.18" url = { version = "2.5.2", features = ["serde"] } webpki = "0.22.4" diff --git a/crates/daphne-server/Cargo.toml b/crates/daphne-server/Cargo.toml index 30a964c17..e34817bfd 100644 --- a/crates/daphne-server/Cargo.toml +++ b/crates/daphne-server/Cargo.toml @@ -20,7 +20,9 @@ futures.workspace = true hex.workspace = true http.workspace = true hyper.workspace = true -mappable-rc = "0.1.1" +mappable-rc.workspace = true +opentelemetry.workspace = true +opentelemetry-http.workspace = true p256.workspace = true prio.workspace = true rayon.workspace = true @@ -31,6 +33,7 @@ thiserror.workspace = true tokio.workspace = true tower.workspace = true tracing.workspace = true +tracing-opentelemetry.workspace = true url.workspace = true [dev-dependencies] diff --git a/crates/daphne-server/src/roles/leader.rs b/crates/daphne-server/src/roles/leader.rs index 2c0b6f35b..d053f74a5 100644 --- a/crates/daphne-server/src/roles/leader.rs +++ b/crates/daphne-server/src/roles/leader.rs @@ -19,8 +19,6 @@ use daphne_service_utils::{auth::DaphneAuth, http_headers}; use tracing::{error, info}; use url::Url; -use crate::storage_proxy_connection::method_http_1_0_to_reqwest_0_11; - #[async_trait] impl DapAuthorizedSender for crate::App { async fn authorize( @@ -147,8 +145,6 @@ impl crate::App { ) -> Result { use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue}; - let method = method_http_1_0_to_reqwest_0_11(method); - let content_type = req .media_type .and_then(|mt| mt.as_str_for_version(req.version)) diff --git a/crates/daphne-server/src/storage_proxy_connection/kv/mod.rs b/crates/daphne-server/src/storage_proxy_connection/kv/mod.rs index bfd9f86d2..64317f6ce 100644 --- a/crates/daphne-server/src/storage_proxy_connection/kv/mod.rs +++ b/crates/daphne-server/src/storage_proxy_connection/kv/mod.rs @@ -14,7 +14,7 @@ use tracing::{info_span, Instrument}; use crate::StorageProxyConfig; -use super::{status_http_1_0_to_reqwest_0_11, Error}; +use super::Error; pub(crate) use cache::Cache; pub(crate) struct Kv<'h> { @@ -228,13 +228,16 @@ impl<'h> Kv<'h> { prefix = std::any::type_name::

() ); async { - let resp = self + let mut req = self .http .get(self.config.url.join(&key).unwrap()) .bearer_auth(&self.config.auth_token) - .send() - .await?; - if resp.status() == status_http_1_0_to_reqwest_0_11(StatusCode::NOT_FOUND) { + .build()?; + super::add_tracing_headers(&mut req); + + let resp = self.http.execute(req).await?; + + if resp.status() == StatusCode::NOT_FOUND { if opt.cache_not_found { self.cache.write().await.put::

(key, None); } @@ -263,13 +266,16 @@ impl<'h> Kv<'h> { { let key = Self::to_key::

(key); tracing::debug!(key, "PUT"); - self.http + let mut req = self + .http .post(self.config.url.join(&key).unwrap()) .bearer_auth(&self.config.auth_token) .body(serde_json::to_vec(&value).unwrap()) - .send() - .await? - .error_for_status()?; + .build()?; + + super::add_tracing_headers(&mut req); + + self.http.execute(req).await?.error_for_status()?; self.cache.write().await.put::

(key, Some(value.into())); Ok(()) } @@ -294,15 +300,18 @@ impl<'h> Kv<'h> { let key = Self::to_key::

(key); tracing::debug!(key, "PUT if not exists"); - let response = self + let mut req = self .http .put(self.config.url.join(&key).unwrap()) .bearer_auth(&self.config.auth_token) .body(serde_json::to_vec(&value).unwrap()) - .send() - .await?; + .build()?; + + super::add_tracing_headers(&mut req); + + let response = self.http.execute(req).await?; - if response.status() == status_http_1_0_to_reqwest_0_11(StatusCode::CONFLICT) { + if response.status() == StatusCode::CONFLICT { Ok(Some(value)) } else { response.error_for_status()?; diff --git a/crates/daphne-server/src/storage_proxy_connection/mod.rs b/crates/daphne-server/src/storage_proxy_connection/mod.rs index f526b381d..93f426e68 100644 --- a/crates/daphne-server/src/storage_proxy_connection/mod.rs +++ b/crates/daphne-server/src/storage_proxy_connection/mod.rs @@ -9,14 +9,17 @@ pub(crate) mod kv; use std::fmt::Debug; -use axum::http::{Method, StatusCode}; +use axum::http::StatusCode; use daphne_service_utils::durable_requests::{ bindings::{DurableMethod, DurableRequestPayload, DurableRequestPayloadExt}, DurableRequest, ObjectIdFrom, DO_PATH_PREFIX, }; +use opentelemetry_http::HeaderInjector; use serde::de::DeserializeOwned; pub(crate) use kv::Kv; +use tracing::Span; +use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::StorageProxyConfig; @@ -77,20 +80,23 @@ impl<'d, B: DurableMethod + Debug, P: AsRef<[u8]>> RequestBuilder<'d, B, P> { .url .join(&format!("{DO_PATH_PREFIX}{}", self.path.to_uri())) .unwrap(); - let resp = self + let mut req = self .durable .http .post(url) .body(self.request.into_bytes()) .bearer_auth(&self.durable.config.auth_token) - .send() - .await?; + .build()?; + + add_tracing_headers(&mut req); + + let resp = self.durable.http.execute(req).await?; if resp.status().is_success() { Ok(resp.json().await?) } else { Err(Error::Http { - status: status_reqwest_0_11_to_http_1_0(resp.status()), + status: resp.status(), body: resp.text().await?, }) } @@ -147,37 +153,9 @@ impl<'w> Do<'w> { } } -/// this is needed while [reqwest#2039](https://github.com/seanmonstar/reqwest/issues/2039) isn't -/// completed. -/// -/// This is because axum is using http 1.0 and reqwest is still in http 0.2 -pub fn method_http_1_0_to_reqwest_0_11(method: Method) -> reqwest::Method { - match method { - Method::GET => reqwest::Method::GET, - Method::POST => reqwest::Method::POST, - Method::PUT => reqwest::Method::PUT, - Method::PATCH => reqwest::Method::PATCH, - Method::HEAD => reqwest::Method::HEAD, - Method::TRACE => reqwest::Method::TRACE, - Method::OPTIONS => reqwest::Method::OPTIONS, - Method::CONNECT => reqwest::Method::CONNECT, - Method::DELETE => reqwest::Method::DELETE, - _ => unreachable!(), - } -} - -/// this is needed while [reqwest#2039](https://github.com/seanmonstar/reqwest/issues/2039) isn't -/// completed. -/// -/// This is because axum is using http 1.0 and reqwest is still in http 0.2 -pub fn status_http_1_0_to_reqwest_0_11(status: StatusCode) -> reqwest::StatusCode { - reqwest::StatusCode::from_u16(status.as_u16()).unwrap() -} - -/// this is needed while [reqwest#2039](https://github.com/seanmonstar/reqwest/issues/2039) isn't -/// completed. -/// -/// This is because axum is using http 1.0 and reqwest is still in http 0.2 -pub fn status_reqwest_0_11_to_http_1_0(status: reqwest::StatusCode) -> StatusCode { - StatusCode::from_u16(status.as_u16()).unwrap() +pub fn add_tracing_headers(req: &mut reqwest::Request) { + let ctx = Span::current().context(); + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context(&ctx, &mut HeaderInjector(req.headers_mut())); + }); } diff --git a/crates/daphne-worker/Cargo.toml b/crates/daphne-worker/Cargo.toml index 46dfccff8..b4db36341 100644 --- a/crates/daphne-worker/Cargo.toml +++ b/crates/daphne-worker/Cargo.toml @@ -23,6 +23,8 @@ daphne = { path = "../daphne", features = ["prometheus"] } futures = { workspace = true, optional = true } getrandom = { workspace = true, features = ["js"] } hex.workspace = true +opentelemetry-http.workspace = true +opentelemetry.workspace = true prio.workspace = true prometheus.workspace = true rand.workspace = true @@ -32,6 +34,7 @@ serde-wasm-bindgen.workspace = true serde.workspace = true serde_json.workspace = true tracing-core.workspace = true +tracing-opentelemetry.workspace = true tracing-subscriber = { workspace = true, features = ["env-filter", "json"]} tracing.workspace = true url.workspace = true diff --git a/crates/daphne-worker/src/durable/mod.rs b/crates/daphne-worker/src/durable/mod.rs index cd3844457..0ef29d41a 100644 --- a/crates/daphne-worker/src/durable/mod.rs +++ b/crates/daphne-worker/src/durable/mod.rs @@ -23,12 +23,12 @@ pub(crate) mod aggregate_store; #[cfg(feature = "test-utils")] pub(crate) mod test_state_cleaner; -use crate::tracing_utils::shorten_paths; use daphne_service_utils::durable_requests::bindings::{ DurableMethod, DurableRequestPayload, DurableRequestPayloadExt, }; use serde::{Deserialize, Serialize}; use tracing::info_span; +use tracing_opentelemetry::OpenTelemetrySpanExt as _; use worker::{Env, Error, Request, Response, Result, ScheduledTime, State}; pub use aggregate_store::AggregateStore; @@ -227,9 +227,11 @@ where } fn create_span_from_request(req: &Request) -> tracing::Span { - let path = req.path(); - let span = info_span!("DO span", p = %shorten_paths(path.split('/')).display()); - span.in_scope(|| tracing::info!(path, "DO handling new request")); + let extractor = crate::tracing_utils::HeaderExtractor::new(req); + let remote_context = + opentelemetry::global::get_text_map_propagator(|propagator| propagator.extract(&extractor)); + let span = info_span!("DO span", path = req.path()); + span.set_parent(remote_context); span } diff --git a/crates/daphne-worker/src/storage_proxy/mod.rs b/crates/daphne-worker/src/storage_proxy/mod.rs index 4b65422e7..6f2db1265 100644 --- a/crates/daphne-worker/src/storage_proxy/mod.rs +++ b/crates/daphne-worker/src/storage_proxy/mod.rs @@ -78,7 +78,8 @@ use daphne_service_utils::durable_requests::{ DurableRequest, ObjectIdFrom, DO_PATH_PREFIX, KV_PATH_PREFIX, }; use prometheus::Registry; -use tracing::warn; +use tracing::{info_span, warn, Instrument}; +use tracing_opentelemetry::OpenTelemetrySpanExt; use url::Url; use worker::{js_sys::Uint8Array, Delay, Env, Request, RequestInit, Response}; @@ -132,12 +133,20 @@ pub async fn handle_request( env: &Env, registry: &Registry, ) -> worker::Result { + let span = info_span!("handle_request", path = req.path(), method = ?req.method()); + { + let extractor = crate::tracing_utils::HeaderExtractor::new(&req); + let remote_context = opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.extract(&extractor) + }); + span.set_parent(remote_context); + } + let mut ctx = RequestContext { metrics: Metrics::new(registry), req, env, }; - // make an async block in order to avoid early returning and skipping the push_metrics // statement let response = async { @@ -169,6 +178,7 @@ pub async fn handle_request( Response::error("invalid base path", 400) } } + .instrument(span) .await; match &response { @@ -194,11 +204,13 @@ async fn storage_purge(ctx: &RequestContext<'_>) -> worker::Result { }; let do_delete = async { - let req = Request::new_with_init( + let mut req = Request::new_with_init( &format!("https://fake-host{}", TestStateCleaner::DeleteAll.to_uri(),), RequestInit::new().with_method(worker::Method::Post), )?; + crate::tracing_utils::add_tracing_headers(&mut req); + ctx.env .durable_object(TestStateCleaner::BINDING)? .id_from_name(TestStateCleaner::NAME_STR)? @@ -319,7 +331,9 @@ async fn handle_do_request(ctx: &mut RequestContext<'_>, uri: &str) -> worker::R do_req.with_body(Some(buffer.into())); } let url = Url::parse("https://fake-host/").unwrap().join(uri).unwrap(); - let do_req = Request::new_with_init(url.as_str(), &do_req)?; + let mut do_req = Request::new_with_init(url.as_str(), &do_req)?; + + crate::tracing_utils::add_tracing_headers(&mut do_req); let obj = match &parsed_req.id { ObjectIdFrom::Name(name) => binding.id_from_name(name)?, diff --git a/crates/daphne-worker/src/tracing_utils/mod.rs b/crates/daphne-worker/src/tracing_utils/mod.rs index 4da95a478..9476ef176 100644 --- a/crates/daphne-worker/src/tracing_utils/mod.rs +++ b/crates/daphne-worker/src/tracing_utils/mod.rs @@ -6,8 +6,9 @@ mod workers_json_layer; use chrono::{SecondsFormat, Utc}; use std::{collections::HashMap, fmt::Result as FmtResult, io, path::PathBuf, str, sync::Once}; -use tracing::field::Visit; +use tracing::{field::Visit, Span}; use tracing_core::Field; +use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_subscriber::{ fmt, fmt::{format::Writer, time::FormatTime}, @@ -232,6 +233,49 @@ pub fn initialize_tracing(env: &Env) { }); } +/// Helper for extracting headers from HTTP Requests. This is used for OpenTelemetry context +/// propagation over HTTP. +/// See [this](https://github.com/open-telemetry/opentelemetry-rust/blob/main/examples/tracing-http-propagator/README.md) +/// for example usage. +pub struct HeaderExtractor(HashMap); + +impl HeaderExtractor { + pub fn new(req: &worker::Request) -> Self { + Self(req.headers().into_iter().collect()) + } +} + +impl opentelemetry::propagation::Extractor for HeaderExtractor { + fn get(&self, key: &str) -> Option<&str> { + self.0.get(key).map(|s| s.as_str()) + } + + fn keys(&self) -> Vec<&str> { + self.0.keys().map(|k| k.as_str()).collect::>() + } +} + +pub struct HeaderInjector<'a>(pub &'a mut worker::Headers); + +impl<'a> opentelemetry::propagation::Injector for HeaderInjector<'a> { + fn set(&mut self, key: &str, value: String) { + self.0.set(key, &value).expect("header name is invalid"); + } +} + +pub fn add_tracing_headers(req: &mut worker::Request) { + let ctx = Span::current().context(); + opentelemetry::global::get_text_map_propagator(|propagator| { + propagator.inject_context( + &ctx, + &mut HeaderInjector( + req.headers_mut() + .expect("this request was received and can't be sent"), + ), + ); + }); +} + #[cfg(test)] mod test { use std::path::PathBuf;