Skip to content

Commit

Permalink
Propagate spans from daphne-server all the way to durable objects
Browse files Browse the repository at this point in the history
  • Loading branch information
mendess committed Jul 3, 2024
1 parent fbdde3b commit 24327d6
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 65 deletions.
95 changes: 95 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ hpke-rs-rust-crypto = "0.2.0"
http = "0.2"
hyper = "0.14.29"
itertools = "0.12.1"
mappable-rc = "0.1.1"
matchit = "0.7.3"
opentelemetry = "0.23.0"
opentelemetry-http = "0.12.0"
p256 = { version = "0.13.2", features = ["ecdsa-core", "ecdsa", "pem"] }
paste = "1.0.15"
pin-project = "1.1.5"
Expand All @@ -75,6 +78,7 @@ tokio = { version = "1.38.0", features = ["macros", "rt-multi-thread"] }
tower = "0.4.13"
tracing = "0.1.40"
tracing-core = "0.1.32"
tracing-opentelemetry = "0.24.0"
tracing-subscriber = "0.3.18"
url = { version = "2.5.2", features = ["serde"] }
webpki = "0.22.4"
Expand Down
5 changes: 4 additions & 1 deletion crates/daphne-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ futures.workspace = true
hex.workspace = true
http.workspace = true
hyper.workspace = true
mappable-rc = "0.1.1"
mappable-rc.workspace = true
opentelemetry.workspace = true
opentelemetry-http.workspace = true
p256.workspace = true
prio.workspace = true
rayon.workspace = true
Expand All @@ -31,6 +33,7 @@ thiserror.workspace = true
tokio.workspace = true
tower.workspace = true
tracing.workspace = true
tracing-opentelemetry.workspace = true
url.workspace = true

[dev-dependencies]
Expand Down
4 changes: 0 additions & 4 deletions crates/daphne-server/src/roles/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ use daphne_service_utils::{auth::DaphneAuth, http_headers};
use tracing::{error, info};
use url::Url;

use crate::storage_proxy_connection::method_http_1_0_to_reqwest_0_11;

#[async_trait]
impl DapAuthorizedSender<DaphneAuth> for crate::App {
async fn authorize(
Expand Down Expand Up @@ -147,8 +145,6 @@ impl crate::App {
) -> Result<DapResponse, DapError> {
use reqwest::header::{self, HeaderMap, HeaderName, HeaderValue};

let method = method_http_1_0_to_reqwest_0_11(method);

let content_type = req
.media_type
.and_then(|mt| mt.as_str_for_version(req.version))
Expand Down
35 changes: 22 additions & 13 deletions crates/daphne-server/src/storage_proxy_connection/kv/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use tracing::{info_span, Instrument};

use crate::StorageProxyConfig;

use super::{status_http_1_0_to_reqwest_0_11, Error};
use super::Error;
pub(crate) use cache::Cache;

pub(crate) struct Kv<'h> {
Expand Down Expand Up @@ -228,13 +228,16 @@ impl<'h> Kv<'h> {
prefix = std::any::type_name::<P>()
);
async {
let resp = self
let mut req = self
.http
.get(self.config.url.join(&key).unwrap())
.bearer_auth(&self.config.auth_token)
.send()
.await?;
if resp.status() == status_http_1_0_to_reqwest_0_11(StatusCode::NOT_FOUND) {
.build()?;
super::add_tracing_headers(&mut req);

let resp = self.http.execute(req).await?;

if resp.status() == StatusCode::NOT_FOUND {
if opt.cache_not_found {
self.cache.write().await.put::<P>(key, None);
}
Expand Down Expand Up @@ -263,13 +266,16 @@ impl<'h> Kv<'h> {
{
let key = Self::to_key::<P>(key);
tracing::debug!(key, "PUT");
self.http
let mut req = self
.http
.post(self.config.url.join(&key).unwrap())
.bearer_auth(&self.config.auth_token)
.body(serde_json::to_vec(&value).unwrap())
.send()
.await?
.error_for_status()?;
.build()?;

super::add_tracing_headers(&mut req);

self.http.execute(req).await?.error_for_status()?;
self.cache.write().await.put::<P>(key, Some(value.into()));
Ok(())
}
Expand All @@ -294,15 +300,18 @@ impl<'h> Kv<'h> {
let key = Self::to_key::<P>(key);

tracing::debug!(key, "PUT if not exists");
let response = self
let mut req = self
.http
.put(self.config.url.join(&key).unwrap())
.bearer_auth(&self.config.auth_token)
.body(serde_json::to_vec(&value).unwrap())
.send()
.await?;
.build()?;

super::add_tracing_headers(&mut req);

let response = self.http.execute(req).await?;

if response.status() == status_http_1_0_to_reqwest_0_11(StatusCode::CONFLICT) {
if response.status() == StatusCode::CONFLICT {
Ok(Some(value))
} else {
response.error_for_status()?;
Expand Down
54 changes: 16 additions & 38 deletions crates/daphne-server/src/storage_proxy_connection/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,17 @@ pub(crate) mod kv;

use std::fmt::Debug;

use axum::http::{Method, StatusCode};
use axum::http::StatusCode;
use daphne_service_utils::durable_requests::{
bindings::{DurableMethod, DurableRequestPayload, DurableRequestPayloadExt},
DurableRequest, ObjectIdFrom, DO_PATH_PREFIX,
};
use opentelemetry_http::HeaderInjector;
use serde::de::DeserializeOwned;

pub(crate) use kv::Kv;
use tracing::Span;
use tracing_opentelemetry::OpenTelemetrySpanExt;

use crate::StorageProxyConfig;

Expand Down Expand Up @@ -77,20 +80,23 @@ impl<'d, B: DurableMethod + Debug, P: AsRef<[u8]>> RequestBuilder<'d, B, P> {
.url
.join(&format!("{DO_PATH_PREFIX}{}", self.path.to_uri()))
.unwrap();
let resp = self
let mut req = self
.durable
.http
.post(url)
.body(self.request.into_bytes())
.bearer_auth(&self.durable.config.auth_token)
.send()
.await?;
.build()?;

add_tracing_headers(&mut req);

let resp = self.durable.http.execute(req).await?;

if resp.status().is_success() {
Ok(resp.json().await?)
} else {
Err(Error::Http {
status: status_reqwest_0_11_to_http_1_0(resp.status()),
status: resp.status(),
body: resp.text().await?,
})
}
Expand Down Expand Up @@ -147,37 +153,9 @@ impl<'w> Do<'w> {
}
}

/// this is needed while [reqwest#2039](https://github.com/seanmonstar/reqwest/issues/2039) isn't
/// completed.
///
/// This is because axum is using http 1.0 and reqwest is still in http 0.2
pub fn method_http_1_0_to_reqwest_0_11(method: Method) -> reqwest::Method {
match method {
Method::GET => reqwest::Method::GET,
Method::POST => reqwest::Method::POST,
Method::PUT => reqwest::Method::PUT,
Method::PATCH => reqwest::Method::PATCH,
Method::HEAD => reqwest::Method::HEAD,
Method::TRACE => reqwest::Method::TRACE,
Method::OPTIONS => reqwest::Method::OPTIONS,
Method::CONNECT => reqwest::Method::CONNECT,
Method::DELETE => reqwest::Method::DELETE,
_ => unreachable!(),
}
}

/// this is needed while [reqwest#2039](https://github.com/seanmonstar/reqwest/issues/2039) isn't
/// completed.
///
/// This is because axum is using http 1.0 and reqwest is still in http 0.2
pub fn status_http_1_0_to_reqwest_0_11(status: StatusCode) -> reqwest::StatusCode {
reqwest::StatusCode::from_u16(status.as_u16()).unwrap()
}

/// this is needed while [reqwest#2039](https://github.com/seanmonstar/reqwest/issues/2039) isn't
/// completed.
///
/// This is because axum is using http 1.0 and reqwest is still in http 0.2
pub fn status_reqwest_0_11_to_http_1_0(status: reqwest::StatusCode) -> StatusCode {
StatusCode::from_u16(status.as_u16()).unwrap()
pub fn add_tracing_headers(req: &mut reqwest::Request) {
let ctx = Span::current().context();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&ctx, &mut HeaderInjector(req.headers_mut()));
});
}
3 changes: 3 additions & 0 deletions crates/daphne-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ daphne = { path = "../daphne", features = ["prometheus"] }
futures = { workspace = true, optional = true }
getrandom = { workspace = true, features = ["js"] }
hex.workspace = true
opentelemetry-http.workspace = true
opentelemetry.workspace = true
prio.workspace = true
prometheus.workspace = true
rand.workspace = true
Expand All @@ -32,6 +34,7 @@ serde-wasm-bindgen.workspace = true
serde.workspace = true
serde_json.workspace = true
tracing-core.workspace = true
tracing-opentelemetry.workspace = true
tracing-subscriber = { workspace = true, features = ["env-filter", "json"]}
tracing.workspace = true
url.workspace = true
Expand Down
Loading

0 comments on commit 24327d6

Please sign in to comment.