Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Instant Query for dataplane metrics server #501

Merged
merged 6 commits into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 46 additions & 1 deletion dataplane-webserver/src/metrics/expression_validator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use promql_parser::parser::{Expr, VectorSelector};
use promql_parser::util::{walk_expr, ExprVisitor};

use crate::metrics::types::InstantQuery;
use crate::metrics::types::RangeQuery;
use actix_web::web::Query;
use actix_web::HttpResponse;
Expand Down Expand Up @@ -72,7 +73,7 @@ impl ExprVisitor for NamespaceVisitor {

// Returns the query if it's valid
// otherwise returns an error in the form of HttpResponse
pub fn check_query_only_accesses_namespace(
pub fn check_range_query_only_accesses_namespace(
range_query: &Query<RangeQuery>,
namespace: &String,
) -> Result<String, HttpResponse> {
Expand Down Expand Up @@ -115,3 +116,47 @@ pub fn check_query_only_accesses_namespace(
}
Ok(query)
}

pub fn check_query_only_accesses_namespace(
instant_query: &Query<InstantQuery>,
namespace: &String,
) -> Result<String, HttpResponse> {
// Get the query parameters
let query = instant_query.query.clone();

// Parse the query
let abstract_syntax_tree = match parser::parse(&query) {
Ok(ast) => ast,
Err(e) => {
error!("Query parse error: {}", e);
return Err(HttpResponse::UnprocessableEntity().json("Failed to parse PromQL query"));
}
};

// Recurse through all terms in the expression to find any terms that specify
// label matching, and make sure all of them specify the namespace label.
let mut visitor = NamespaceVisitor {
namespace: namespace.clone(),
};
let all_metrics_specify_namespace = walk_expr(&mut visitor, &abstract_syntax_tree);

// Check if we are performing an unauthorized query.
match all_metrics_specify_namespace {
Ok(true) => {
info!(
"Authorized request: namespace '{}', query '{}'",
namespace, query
);
}
_ => {
warn!(
"Unauthorized request: namespace '{}', query '{}'",
namespace, query
);
return Err(
HttpResponse::Forbidden().json("Must include namespace in all vector selectors")
);
}
}
Ok(query)
}
67 changes: 65 additions & 2 deletions dataplane-webserver/src/metrics/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::config::Config;
use crate::metrics::types::RangeQuery;
use crate::metrics::types::{InstantQuery, RangeQuery};
use actix_web::http::StatusCode;
use actix_web::web::{Data, Query};
use actix_web::HttpResponse;
Expand All @@ -11,13 +11,76 @@ use std::time::{Duration, SystemTime};
pub mod expression_validator;
pub mod types;

pub async fn query_prometheus_instant(
cfg: Data<Config>,
http_client: Data<Client>,
instant_query: Query<InstantQuery>,
namespace: String,
) -> HttpResponse {
// Internal async block that returns Result
let result = async {
let query = match expression_validator::check_query_only_accesses_namespace(
&instant_query,
&namespace,
) {
Ok(value) => value,
Err(http_response) => return Err(http_response),
};

let time = instant_query.time.unwrap_or_else(|| {
SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Time went backwards")
.as_secs()
});

let timeout = format!("{}ms", cfg.prometheus_timeout_ms);

let query_url = format!("{}/api/v1/query", cfg.prometheus_url.trim_end_matches('/'));
let query_params = [
("query", &query),
("time", &time.to_string()),
("timeout", &timeout),
];

let response = http_client
.get(&query_url)
.query(&query_params)
.timeout(std::time::Duration::from_millis(
cfg.prometheus_timeout_ms as u64 + 500,
))
.send()
.await
.map_err(|e| {
error!("Failed to query Prometheus: {}", e);
HttpResponse::GatewayTimeout().json("Failed to query Prometheus")
})?;

let status_code = response.status();

let json_response: Value = response.json().await.map_err(|e| {
error!("Failed to parse Prometheus response: {}", e);
HttpResponse::InternalServerError().json("Failed to parse Prometheus response")
})?;

Ok(HttpResponse::Ok().json(json_response))
}
.await;

// Handle the result outside of the async block
match result {
Ok(response) => response,
Err(error_response) => error_response,
}
}

pub async fn query_prometheus(
cfg: Data<Config>,
http_client: Data<Client>,
range_query: Query<RangeQuery>,
namespace: String,
) -> HttpResponse {
let query = match expression_validator::check_query_only_accesses_namespace(
let query = match expression_validator::check_range_query_only_accesses_namespace(
&range_query.clone(),
&namespace,
) {
Expand Down
6 changes: 6 additions & 0 deletions dataplane-webserver/src/metrics/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,9 @@ pub struct RangeQuery {
pub end: Option<f64>,
pub step: Option<String>,
}

#[derive(Deserialize, Clone)]
pub struct InstantQuery {
pub query: String,
pub time: Option<u64>,
}
15 changes: 14 additions & 1 deletion dataplane-webserver/src/routes/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::{config, metrics};

use crate::metrics::types::RangeQuery;
use crate::metrics::types::{InstantQuery, RangeQuery};
use actix_web::{get, web, Error, HttpRequest, HttpResponse};

use reqwest::Client;
Expand Down Expand Up @@ -36,3 +36,16 @@ pub async fn query_range(

Ok(metrics::query_prometheus(cfg, http_client, range_query, namespace).await)
}

#[get("/query")]
pub async fn query(
cfg: web::Data<config::Config>,
http_client: web::Data<Client>,
instant_query: web::Query<InstantQuery>,
_req: HttpRequest,
path: web::Path<(String,)>,
) -> Result<HttpResponse, Error> {
let (namespace,) = path.into_inner();

Ok(metrics::query_prometheus_instant(cfg, http_client, instant_query, namespace).await)
}
41 changes: 41 additions & 0 deletions dataplane-webserver/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,21 @@ mod tests {
return query_url.trim_start_matches("http://localhost").to_string();
}

fn format_prometheus_instant_query(url: &str, query: &str) -> String {
let time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("Failed to get UNIX time")
.as_secs()
.to_string();
let query_params = vec![("query", query), ("time", &time)];
let url = format!("http://localhost{}", url);
Url::parse_with_params(url.as_str(), &query_params)
.expect("Failed to format query parameters")
.to_string()
.trim_start_matches("http://localhost")
.to_string()
}

#[actix_web::test]
async fn test_metrics_query_range() {
let cfg = config::Config::default();
Expand Down Expand Up @@ -111,4 +126,30 @@ mod tests {
// It should be a client error if we try to request a namespace we do not own
assert!(resp.status().is_client_error());
}

#[actix_web::test]
async fn test_metrics_query_instant() {
let cfg = config::Config::default();
let http_client = reqwest::Client::builder()
.build()
.expect("Failed to create HTTP client");

let app = test::init_service(
App::new()
.app_data(web::Data::new(cfg.clone()))
.app_data(web::Data::new(http_client.clone()))
.service(web::scope("/{namespace}/metrics").service(metrics::query)),
)
.await;

let url = "/org-coredb-inst-control-plane-dev/metrics/query";
let query =
"sum(rate(http_requests_total{namespace=\"org-coredb-inst-control-plane-dev\"}[5m]))";
let query_url = format_prometheus_instant_query(url, query);
let req = test::TestRequest::get()
.uri(query_url.as_str())
.to_request();
let resp = test::call_service(&app, req).await;
assert!(resp.status().is_success());
}
}
Loading