diff --git a/src/frontend/src/instance.rs b/src/frontend/src/instance.rs index 5b3a036493d3..3ac00dd77591 100644 --- a/src/frontend/src/instance.rs +++ b/src/frontend/src/instance.rs @@ -59,7 +59,7 @@ use servers::query_handler::sql::SqlQueryHandler; use servers::query_handler::{ InfluxdbLineProtocolHandler, OpentsdbProtocolHandler, PrometheusProtocolHandler, ScriptHandler, }; -use session::context::{QueryContext, QueryContextRef}; +use session::context::QueryContextRef; use snafu::prelude::*; use sql::dialect::GenericDialect; use sql::parser::ParserContext; @@ -496,13 +496,16 @@ impl SqlQueryHandler for Instance { } } - async fn do_promql_query(&self, query: &PromQuery, _: QueryContextRef) -> Vec> { - let result = - PromHandler::do_query(self, query) - .await - .with_context(|_| ExecutePromqlSnafu { - query: format!("{query:?}"), - }); + async fn do_promql_query( + &self, + query: &PromQuery, + query_ctx: QueryContextRef, + ) -> Vec> { + let result = PromHandler::do_query(self, query, query_ctx) + .await + .with_context(|_| ExecutePromqlSnafu { + query: format!("{query:?}"), + }); vec![result] } @@ -538,12 +541,16 @@ impl SqlQueryHandler for Instance { #[async_trait] impl PromHandler for Instance { - async fn do_query(&self, query: &PromQuery) -> server_error::Result { + async fn do_query( + &self, + query: &PromQuery, + query_ctx: QueryContextRef, + ) -> server_error::Result { let stmt = QueryLanguageParser::parse_promql(query).with_context(|_| ParsePromQLSnafu { query: query.clone(), })?; self.statement_executor - .execute_stmt(stmt, QueryContext::arc()) + .execute_stmt(stmt, query_ctx) .await .map_err(BoxedError::new) .with_context(|_| ExecuteQuerySnafu { diff --git a/src/servers/src/prom.rs b/src/servers/src/prom.rs index 053bbf3c62e1..7b9e59564cc1 100644 --- a/src/servers/src/prom.rs +++ b/src/servers/src/prom.rs @@ -21,6 +21,7 @@ use async_trait::async_trait; use axum::body::BoxBody; use axum::extract::{Query, State}; use axum::{routing, Form, Json, Router}; +use common_catalog::consts::DEFAULT_SCHEMA_NAME; use common_error::prelude::ErrorExt; use common_error::status_code::StatusCode; use common_query::Output; @@ -38,6 +39,7 @@ use promql_parser::parser::{ use query::parser::PromQuery; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +use session::context::{QueryContext, QueryContextRef}; use snafu::{ensure, OptionExt, ResultExt}; use tokio::sync::oneshot::Sender; use tokio::sync::{oneshot, Mutex}; @@ -59,7 +61,7 @@ pub type PromHandlerRef = Arc; #[async_trait] pub trait PromHandler { - async fn do_query(&self, query: &PromQuery) -> Result; + async fn do_query(&self, query: &PromQuery, query_ctx: QueryContextRef) -> Result; } /// PromServer represents PrometheusServer which handles the compliance with prometheus HTTP API @@ -372,6 +374,7 @@ pub struct RangeQuery { end: Option, step: Option, timeout: Option, + db: Option, } #[axum_macros::debug_handler] @@ -386,7 +389,13 @@ pub async fn range_query( end: params.end.or(form_params.end).unwrap_or_default(), step: params.step.or(form_params.step).unwrap_or_default(), }; - let result = handler.do_query(&prom_query).await; + + let db = ¶ms.db.unwrap_or(DEFAULT_SCHEMA_NAME.to_string()); + let (catalog, schema) = super::parse_catalog_and_schema_from_client_database_name(db); + + let query_ctx = QueryContext::with(catalog, schema); + + let result = handler.do_query(&prom_query, Arc::new(query_ctx)).await; let metric_name = retrieve_metric_name(&prom_query.query).unwrap_or_default(); PromJsonResponse::from_query_result(result, metric_name).await }