Skip to content

Commit

Permalink
Propagate spans from daphne-server all the way to durable objects
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Aug 30, 2024
1 parent 0784a6a commit 7e70715
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 22 deletions.
155 changes: 153 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ hyper = "0.14.29"
itertools = "0.12.1"
mappable-rc = "0.1.1"
matchit = "0.7.3"
opentelemetry = "0.24.0"
opentelemetry-http = "0.13.0"
p256 = { version = "0.13.2", features = ["ecdsa-core", "ecdsa", "pem"] }
paste = "1.0.15"
pin-project = "1.1.5"
Expand All @@ -84,6 +86,7 @@ tower = "0.4.13"
tower-service = "0.3"
tracing = "0.1.40"
tracing-core = "0.1.32"
tracing-opentelemetry = "0.25.0"
tracing-subscriber = "0.3.18"
url = { version = "2.5.2", features = ["serde"] }
webpki = "0.22.4"
Expand Down
3 changes: 3 additions & 0 deletions crates/daphne-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ hex.workspace = true
http = "0.2" # held back to use http 0.2
hyper.workspace = true
mappable-rc.workspace = true
opentelemetry = "0.23.0" # held back to use http 0.2
opentelemetry-http = "0.12.0" # held back to use http 0.2
p256.workspace = true
prio.workspace = true
rayon.workspace = true
Expand All @@ -30,6 +32,7 @@ thiserror.workspace = true
tokio.workspace = true
tower.workspace = true
tracing.workspace = true
tracing-opentelemetry = "0.24.0" # held back to use http 0.2
url.workspace = true

[dependencies.reqwest]
Expand Down
32 changes: 23 additions & 9 deletions crates/daphne-server/src/storage_proxy_connection/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,12 +230,16 @@ impl<'h> Kv<'h> {
prefix = std::any::type_name::<P>()
);
async {
let resp = self
let mut req = self
.http
.get(self.config.url.join(&key).unwrap())
.bearer_auth(&self.config.auth_token)
.send()
.await?;
.build()?;

super::add_tracing_headers(&mut req);

let resp = self.http.execute(req).await?;

if resp.status() == StatusCode::NOT_FOUND {
if opt.cache_not_found {
self.cache.write().await.put::<P>(key, None);
Expand Down Expand Up @@ -276,13 +280,18 @@ impl<'h> Kv<'h> {
.http
.post(self.config.url.join(&key).unwrap())
.bearer_auth(&self.config.auth_token)
.body(serde_json::to_vec(&value).unwrap());
.body(serde_json::to_vec(&value).unwrap())
.build()?;

if let Some(expiration) = expiration {
request = request.header(STORAGE_PROXY_PUT_KV_EXPIRATION, expiration);
request
.headers_mut()
.insert(STORAGE_PROXY_PUT_KV_EXPIRATION, expiration.into());
}

request.send().await?.error_for_status()?;
super::add_tracing_headers(&mut request);

self.http.execute(request).await?.error_for_status()?;

self.cache.write().await.put::<P>(key, Some(value.into()));
Ok(())
Expand Down Expand Up @@ -337,13 +346,18 @@ impl<'h> Kv<'h> {
.http
.put(self.config.url.join(&key).unwrap())
.bearer_auth(&self.config.auth_token)
.body(serde_json::to_vec(&value).unwrap());
.body(serde_json::to_vec(&value).unwrap())
.build()?;

if let Some(expiration) = expiration {
request = request.header(STORAGE_PROXY_PUT_KV_EXPIRATION, expiration);
request
.headers_mut()
.insert(STORAGE_PROXY_PUT_KV_EXPIRATION, expiration.into());
}

let response = request.send().await?;
super::add_tracing_headers(&mut request);

let response = self.http.execute(request).await?;

if response.status() == StatusCode::CONFLICT {
Ok(Some(value))
Expand Down
19 changes: 16 additions & 3 deletions crates/daphne-server/src/storage_proxy_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@ use daphne_service_utils::durable_requests::{
bindings::{DurableMethod, DurableRequestPayload, DurableRequestPayloadExt},
DurableRequest, ObjectIdFrom, DO_PATH_PREFIX,
};
use opentelemetry_http::HeaderInjector;
use serde::de::DeserializeOwned;

pub(crate) use kv::Kv;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::StorageProxyConfig;

Expand Down Expand Up @@ -74,14 +77,17 @@ impl<'d, B: DurableMethod + Debug, P: AsRef<[u8]>> RequestBuilder<'d, B, P> {
.url
.join(&format!("{DO_PATH_PREFIX}{}", self.path.to_uri()))
.unwrap();
let resp = self
let mut req = self
.durable
.http
.post(url)
.body(self.request.into_bytes())
.bearer_auth(&self.durable.config.auth_token)
.send()
.await?;
.build()?;

add_tracing_headers(&mut req);

let resp = self.durable.http.execute(req).await?;

if resp.status().is_success() {
Ok(resp.json().await?)
Expand Down Expand Up @@ -144,3 +150,10 @@ impl<'w> Do<'w> {
}
}
}

pub fn add_tracing_headers(req: &mut reqwest::Request) {
let ctx = Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&ctx, &mut HeaderInjector(req.headers_mut()));
});
}
3 changes: 3 additions & 0 deletions crates/daphne-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ headers.workspace = true
hex.workspace = true
http-body-util.workspace = true
http.workspace = true
opentelemetry-http.workspace = true
opentelemetry.workspace = true
prio.workspace = true
prometheus.workspace = true
rand.workspace = true
Expand All @@ -39,6 +41,7 @@ serde-wasm-bindgen.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing-core.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter", "json"]}
tracing.workspace = true
url.workspace = true
Expand Down
Loading

0 comments on commit 7e70715

Please sign in to comment.