Skip to content

Commit

Permalink
Integration test
Browse files Browse the repository at this point in the history
Add Jaeger propagation
  • Loading branch information
bryn committed Apr 18, 2022
1 parent 55c15f5 commit 36015f6
Show file tree
Hide file tree
Showing 28 changed files with 530 additions and 376 deletions.
21 changes: 21 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,33 @@ commands:
command: |
sudo apt-get update
sudo apt-get install -y libssl-dev cmake
- run:
name: Download jaeger
command: |
curl -L https://github.com/jaegertracing/jaeger/releases/download/v1.33.0/jaeger-1.33.0-linux-amd64.tar.gz --output jaeger.tar.gz
tar -xf jaeger.tar.gz
mv jaeger-1.33.0-linux-amd64 jaeger
macos_install_baseline:
steps:
- run: echo "HOMEBREW_NO_AUTO_UPDATE=1" >> $BASH_ENV
- run: echo "export OPENSSL_ROOT_DIR=/usr/local/opt/openssl@1.1" >> $BASH_ENV
- run: test -e "$OPENSSL_ROOT_DIR"
- run: brew install cmake
- run:
name: Download jaeger
command: |
curl -L https://github.com/jaegertracing/jaeger/releases/download/v1.33.0/jaeger-1.33.0-darwin-amd64.tar.gz --output jaeger.tar.gz
tar -xf jaeger.tar.gz
mv jaeger-1.33.0-darwin-amd64 jaeger
windows_install_baseline:
steps:
- run:
name: Download jaeger
shell: bash.exe
command: |
curl -L https://github.com/jaegertracing/jaeger/releases/download/v1.33.0/jaeger-1.33.0-windows-amd64.tar.gz --output jaeger.tar.gz
tar -xf jaeger.tar.gz
mv jaeger-1.33.0-windows-amd64 jaeger
# This job sets up our nodejs dependencies,
# and makes sure everything is ready to run integration tests
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ apollo-spaceport/proto/reports.proto
#
# Prevent accidental commits of router.yaml in the root
/router.yaml
/jaeger/
4 changes: 2 additions & 2 deletions apollo-router-core/src/query_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ impl QueryCache {
}

/// Attempt to parse a string to a [`Query`] using cache if possible.
#[tracing::instrument(skip_all, level = "info" name = "get_query")]
pub async fn get_query(&self, query: impl AsRef<str>) -> Option<Arc<Query>> {
#[tracing::instrument(skip_all, level = "info" name = "parse_query")]
pub async fn parse_query(&self, query: impl AsRef<str>) -> Option<Arc<Query>> {
let key = query.as_ref().to_string();

match self.cm.get(key).await {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,6 @@ impl Service<QueryPlannerRequest> for BridgeQueryPlanner {

#[async_trait]
impl QueryPlanner for BridgeQueryPlanner {
#[tracing::instrument(skip_all, level = "info", name = "plan")]
async fn get(
&self,
query: String,
Expand Down
1 change: 0 additions & 1 deletion apollo-router-core/src/services/execution_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ impl Service<ExecutionRequest> for ExecutionService {
Poll::Ready(Ok(()))
}

#[tracing::instrument(skip_all, level = "info", name = "execute")]
fn call(&mut self, req: ExecutionRequest) -> Self::Future {
let this = self.clone();
let fut = async move {
Expand Down
2 changes: 1 addition & 1 deletion apollo-router-core/src/services/router_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ where
let body = context.request.body();
let variables = body.variables.clone();
let query = query_cache
.get_query(
.parse_query(
body.query
.as_ref()
.expect("apollo.ensure-query-is-present has checked this already; qed")
Expand Down
38 changes: 31 additions & 7 deletions apollo-router/src/plugins/telemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ use crate::plugins::telemetry::tracing::TracingConfigurator;
use crate::subscriber::replace_layer;
use ::tracing::{info_span, Span};
use apollo_router_core::{
http_compat, register_plugin, Handler, Plugin, ResponseBody, RouterRequest, RouterResponse,
http_compat, register_plugin, ExecutionRequest, ExecutionResponse, Handler, Plugin,
QueryPlannerRequest, QueryPlannerResponse, ResponseBody, RouterRequest, RouterResponse,
ServiceBuilderExt, SubgraphRequest, SubgraphResponse,
};
use apollo_spaceport::server::ReportSpaceport;
Expand Down Expand Up @@ -107,6 +108,7 @@ impl Drop for Telemetry {
opentelemetry::trace::TracerProvider::force_flush(&tracer_provider);
});
futures::executor::block_on(jh).expect("failed to flush tracer provider");
global::shutdown_tracer_provider();
});
});
}
Expand Down Expand Up @@ -268,14 +270,37 @@ impl Plugin for Telemetry {
.boxed()
}

fn query_planning_service(
&mut self,
service: BoxService<QueryPlannerRequest, QueryPlannerResponse, BoxError>,
) -> BoxService<QueryPlannerRequest, QueryPlannerResponse, BoxError> {
ServiceBuilder::new()
.instrument(move |_| info_span!("query_planning"))
.service(service)
.boxed()
}

fn execution_service(
&mut self,
service: BoxService<ExecutionRequest, ExecutionResponse, BoxError>,
) -> BoxService<ExecutionRequest, ExecutionResponse, BoxError> {
ServiceBuilder::new()
.instrument(move |_| info_span!("execution"))
.service(service)
.boxed()
}

fn subgraph_service(
&mut self,
name: &str,
service: BoxService<SubgraphRequest, SubgraphResponse, BoxError>,
) -> BoxService<SubgraphRequest, SubgraphResponse, BoxError> {
let metrics = BasicMetrics::new(&self.meter_provider);
let subgraph_attribute = KeyValue::new("subgraph", name.to_string());
service
let name = name.to_owned();
ServiceBuilder::new()
.instrument(move |_| info_span!("subgraph", name = name.as_str()))
.service(service)
.map_future(move |f| {
let metrics = metrics.clone();
let subgraph_attribute = subgraph_attribute.clone();
Expand Down Expand Up @@ -430,24 +455,23 @@ impl Telemetry {

move |request: &RouterRequest| {
let http_request = &request.context.request;
let headers = http_request.headers();
let query = http_request.body().query.clone().unwrap_or_default();
let operation_name = http_request
.body()
.operation_name
.clone()
.unwrap_or_else(|| "".to_string());
let client_name = http_request
.headers()
let client_name = headers
.get(&client_name_header)
.cloned()
.unwrap_or_else(|| HeaderValue::from_static(""));
let client_version = http_request
.headers()
let client_version = headers
.get(&client_version_header)
.cloned()
.unwrap_or_else(|| HeaderValue::from_static(""));
let span = info_span!(
"graphql_request",
"router",
query = query.as_str(),
operation_name = operation_name.as_str(),
client_name = client_name.to_str().unwrap_or_default(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -333,7 +333,7 @@ impl SpanExporter for Exporter {
/*
* Process the batch
*/
for span in batch.iter().filter(|span| span.name == "graphql_request") {
for span in batch.iter().filter(|span| span.name == "router") {
// We can't process a span if we don't have a query
if let Some(query) = span
.attributes
Expand Down
20 changes: 0 additions & 20 deletions apollo-router/src/plugins/telemetry/tracing/datadog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,23 +42,3 @@ impl TracingConfigurator for Config {
Ok(builder.with_batch_exporter(exporter, opentelemetry::runtime::Tokio))
}
}

#[cfg(test)]
mod tests {
use crate::plugins::telemetry::tracing::test::ExternalRouter;
use tower::BoxError;

// This test can be run manually from your IDE to help with testing otel
// It is set to ignore by default as Datadog may not be set up
#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn test_tracing() -> Result<(), BoxError> {
let tracer = opentelemetry_datadog::new_pipeline()
.with_service_name("my_app")
.install_batch(opentelemetry::runtime::Tokio)?;

let router = ExternalRouter::new(tracer, opentelemetry_datadog::DatadogPropagator::new());
router.run_query().await;
Ok(())
}
}
60 changes: 0 additions & 60 deletions apollo-router/src/plugins/telemetry/tracing/jaeger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,63 +76,3 @@ impl TracingConfigurator for Config {
Ok(builder.with_batch_exporter(exporter, opentelemetry::runtime::Tokio))
}
}

#[cfg(test)]
mod tests {
use crate::plugins::telemetry::tracing::test::ExternalRouter;
use jsonpath_lib::Selector;
use serde_json::{json, Value};
use std::collections::HashSet;
use std::time::Duration;
use tower::BoxError;

// This test can be run manually from your IDE to help with testing otel
// It is set to ignore by default as jaeger may not be set up
#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn test_tracing() -> Result<(), BoxError> {
let tracer = opentelemetry_jaeger::new_pipeline()
.with_service_name("my_app")
.install_batch(opentelemetry::runtime::Tokio)?;

let router = ExternalRouter::new(tracer, opentelemetry_jaeger::Propagator::new());
let id = router.run_query().await;

let tags = json!({ "unit_test": id });
let params = url::form_urlencoded::Serializer::new(String::new())
.append_pair("service", "my_app")
.append_pair("tags", &tags.to_string())
.finish();

let url = format!("http://localhost:16686/api/traces?{}", params);
for _ in 0..10 {
if find_valid_trace(&url).await.is_ok() {
return Ok(());
}
tokio::time::sleep(Duration::from_millis(1000)).await;
}
panic!("did not get full otel trace within 10 seconds");
}

async fn find_valid_trace(url: &str) -> Result<(), BoxError> {
let json: Value = reqwest::get(url).await?.json().await?;

let spans = Selector::new()
.str_path("$..serviceName")?
.value(&json)
.select()?
.into_iter()
.filter_map(|s| s.as_str())
.map(|s| s.to_string())
.collect::<HashSet<_>>();

tracing::info!("{:?}", spans);

if spans != HashSet::from(["my_app", "router", "products"].map(|s| s.into())) {
return Err(BoxError::from("incomplete traces"));
}
tracing::info!("{}", serde_json::to_string_pretty(&json)?);

Ok(())
}
}
99 changes: 0 additions & 99 deletions apollo-router/src/plugins/telemetry/tracing/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,102 +12,3 @@ pub mod zipkin;
pub trait TracingConfigurator {
fn apply(&self, builder: Builder, trace_config: &Trace) -> Result<Builder, BoxError>;
}

#[cfg(test)]
mod test {
use http::header::CONTENT_TYPE;
use http::{HeaderValue, Method, Request, Uri};
use opentelemetry::global;
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry::sdk::trace::Tracer;
use opentelemetry_http::HttpClient;
use std::process::{Child, Command, Stdio};
use std::time::Duration;
use tracing::info_span;
use tracing_core::LevelFilter;
use tracing_opentelemetry::OpenTelemetrySpanExt;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::{EnvFilter, Layer, Registry};
use uuid::Uuid;

pub struct ExternalRouter {
router: Child,
}

impl ExternalRouter {
pub fn new<P: TextMapPropagator + Send + Sync + 'static>(
tracer: Tracer,
propagator: P,
) -> Self {
let telemetry = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_filter(LevelFilter::INFO);
let subscriber = Registry::default().with(telemetry).with(
tracing_subscriber::fmt::Layer::default()
.compact()
.with_filter(EnvFilter::from_default_env()),
);

tracing::subscriber::set_global_default(subscriber).unwrap();
global::set_text_map_propagator(propagator);

Self {
router: Command::new("target/debug/router")
.current_dir("..")
.args([
"--hr",
"--config",
"router.yaml",
"--supergraph",
"examples/graphql/local.graphql",
])
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
.expect("Router should start"),
}
}

pub async fn run_query(&self) -> String {
let client = reqwest::Client::new();
let id = Uuid::new_v4().to_string();
let span = info_span!("client_request", unit_test = id.as_str());
let _span_guard = span.enter();

for _i in 0..100 {
let mut request = Request::builder()
.method(Method::POST)
.header(CONTENT_TYPE, HeaderValue::from_static("application/json"))
.uri(Uri::from_static("http://localhost:4000"))
.body(
r#"{"query":"query {\n topProducts {\n name\n }\n}","variables":{}}"#
.into(),
)
.unwrap();

global::get_text_map_propagator(|propagator| {
propagator.inject_context(
&span.context(),
&mut opentelemetry_http::HeaderInjector(request.headers_mut()),
)
});
if let Ok(result) = client.send(request).await {
tracing::debug!(
"got {}",
String::from_utf8(result.body().to_vec()).unwrap_or_default()
);
return id;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
panic!("unable to send successful request to router")
}
}

impl Drop for ExternalRouter {
fn drop(&mut self) {
self.router.kill().expect("router could not be halted");
global::shutdown_tracer_provider();
}
}
}
22 changes: 0 additions & 22 deletions apollo-router/src/plugins/telemetry/tracing/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,3 @@ impl TracingConfigurator for super::super::otlp::Config {
))
}
}

#[cfg(test)]
mod tests {
use crate::plugins::telemetry::tracing::test::ExternalRouter;
use opentelemetry::sdk::propagation::TraceContextPropagator;
use tower::BoxError;

// This test can be run manually from your IDE to help with testing otel
// It is set to ignore by default as otlp may not be set up
#[ignore]
#[tokio::test(flavor = "multi_thread")]
async fn test_tracing() -> Result<(), BoxError> {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(opentelemetry_otlp::new_exporter().http())
.install_batch(opentelemetry::runtime::Tokio)?;

let router = ExternalRouter::new(tracer, TraceContextPropagator::new());
router.run_query().await;
Ok(())
}
}
Loading

0 comments on commit 36015f6

Please sign in to comment.