Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove redudant PromStoreProtocolHandler::write #3553

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 3 additions & 51 deletions src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use api::prom_store::remote::read_request::ResponseType;
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse};
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
Expand Down Expand Up @@ -46,7 +46,6 @@ use crate::error::{
TableNotFoundSnafu,
};
use crate::instance::Instance;
use crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES;

const SAMPLES_RESPONSE_TYPE: i32 = ResponseType::Samples as i32;

Expand Down Expand Up @@ -163,43 +162,6 @@ impl Instance {
#[async_trait]
impl PromStoreProtocolHandler for Instance {
async fn write(
&self,
request: WriteRequest,
ctx: QueryContextRef,
with_metric_engine: bool,
) -> ServerResult<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
.context(AuthSnafu)?;
let interceptor_ref = self
.plugins
.get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_write(&request, ctx.clone())?;

let (requests, samples) = prom_store::to_grpc_row_insert_requests(&request)?;
let output = if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
.to_string();
self.handle_metric_row_inserts(requests, ctx.clone(), physical_table.to_string())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?
} else {
self.handle_row_inserts(requests, ctx.clone())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?
};

PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok(output)
}

async fn write_fast(
&self,
request: RowInsertRequests,
ctx: QueryContextRef,
Expand Down Expand Up @@ -316,14 +278,13 @@ impl ExportMetricHandler {
impl PromStoreProtocolHandler for ExportMetricHandler {
async fn write(
&self,
request: WriteRequest,
request: RowInsertRequests,
ctx: QueryContextRef,
_: bool,
) -> ServerResult<Output> {
let (requests, _) = prom_store::to_grpc_row_insert_requests(&request)?;
self.inserter
.handle_metric_row_inserts(
requests,
request,
ctx,
&self.statement_executor,
GREPTIME_PHYSICAL_TABLE.to_string(),
Expand All @@ -333,15 +294,6 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
.context(error::ExecuteGrpcQuerySnafu)
}

async fn write_fast(
&self,
_request: RowInsertRequests,
_ctx: QueryContextRef,
_with_metric_engine: bool,
) -> ServerResult<Output> {
unimplemented!()
}

async fn read(
&self,
_request: ReadRequest,
Expand Down
7 changes: 0 additions & 7 deletions src/frontend/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,6 @@ lazy_static! {
.with_label_values(&["insert"]);
pub static ref EXECUTE_SCRIPT_ELAPSED: Histogram = HANDLE_SCRIPT_ELAPSED
.with_label_values(&["execute"]);

/// The samples count of Prometheus remote write.
pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounter = register_int_counter!(
"greptime_frontend_prometheus_remote_write_samples",
"frontend prometheus remote write samples"
)
.unwrap();
pub static ref OTLP_METRICS_ROWS: IntCounter = register_int_counter!(
"greptime_frontend_otlp_metrics_rows",
"frontend otlp metrics rows"
Expand Down
15 changes: 13 additions & 2 deletions src/servers/src/export_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use snafu::{ensure, ResultExt};
use tokio::time::{self, Interval};

use crate::error::{InvalidExportMetricsConfigSnafu, Result, SendPromRemoteRequestSnafu};
use crate::prom_store::snappy_compress;
use crate::prom_store::{snappy_compress, to_grpc_row_insert_requests};
use crate::query_handler::PromStoreProtocolHandlerRef;

/// Use to export the metrics generated by greptimedb, encoded to Prometheus [RemoteWrite format](https://prometheus.io/docs/concepts/remote_write_spec/),
Expand Down Expand Up @@ -256,8 +256,19 @@ pub async fn write_system_metric_by_handler(
filter.as_ref(),
Timestamp::current_millis().value(),
);
if let Err(e) = handler.write(request, ctx.clone(), false).await {

let (requests, samples) = match to_grpc_row_insert_requests(&request) {
Ok((requests, samples)) => (requests, samples),
Err(e) => {
error!(e; "Failed to convert gathered metrics to RowInsertRequests");
continue;
}
};

if let Err(e) = handler.write(requests, ctx.clone(), false).await {
error!("report export metrics by handler failed, error {}", e);
} else {
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
}
}
}
Expand Down
32 changes: 17 additions & 15 deletions src/servers/src/http/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::sync::Arc;

use api::prom_store::remote::{ReadRequest, WriteRequest};
use api::prom_store::remote::ReadRequest;
use api::v1::RowInsertRequests;
use axum::extract::{Query, RawBody, State};
use axum::http::{header, HeaderValue, StatusCode};
Expand Down Expand Up @@ -75,13 +75,14 @@ pub async fn route_write_without_metric_engine(
.with_label_values(&[db.as_str()])
.start_timer();

let request = decode_remote_write_request(body).await?;
let (request, samples) = decode_remote_write_request(body).await?;
// reject if physical table is specified when metric engine is disabled
if params.physical_table.is_some() {
return UnexpectedPhysicalTableSnafu {}.fail();
}

let output = handler.write(request, query_ctx, false).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
Expand All @@ -104,15 +105,16 @@ pub async fn remote_write(
.with_label_values(&[db.as_str()])
.start_timer();

let request = decode_remote_write_request_to_row_inserts(body).await?;
let (request, samples) = decode_remote_write_request_to_row_inserts(body).await?;

if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
query_ctx = Arc::new(new_query_ctx);
}

let output = handler.write_fast(request, query_ctx, true).await?;
let output = handler.write(request, query_ctx, true).await?;
crate::metrics::PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
Expand Down Expand Up @@ -159,7 +161,9 @@ pub async fn remote_read(
handler.read(request, query_ctx).await
}

async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result<RowInsertRequests> {
async fn decode_remote_write_request_to_row_inserts(
body: Body,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
Expand All @@ -171,24 +175,22 @@ async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result<RowIns
request
.merge(buf)
.context(error::DecodePromRemoteRequestSnafu)?;
let (requests, samples) = request.as_row_insert_requests();
crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES.observe(samples as f64);
Ok(requests)
Ok(request.as_row_insert_requests())
}

async fn decode_remote_write_request(body: Body) -> Result<WriteRequest> {
async fn decode_remote_write_request(body: Body) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;

let buf = snappy_decompress(&body[..])?;

let request = WriteRequest::decode(&buf[..]).context(error::DecodePromRemoteRequestSnafu)?;
crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES
.observe(request.timeseries.len() as f64);
let buf = Bytes::from(snappy_decompress(&body[..])?);

Ok(request)
let mut request = PromWriteRequest::default();
request
.merge(buf)
.context(error::DecodePromRemoteRequestSnafu)?;
Ok(request.as_row_insert_requests())
}

async fn decode_remote_read_request(body: Body) -> Result<ReadRequest> {
Expand Down
7 changes: 4 additions & 3 deletions src/servers/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,10 @@ lazy_static! {
/// Duration to convert prometheus write request to gRPC request.
pub static ref METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED: Histogram = METRIC_HTTP_PROM_STORE_CODEC_ELAPSED
.with_label_values(&["convert"]);
pub static ref METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES: Histogram = register_histogram!(
"greptime_servers_http_prometheus_decode_num_series",
"servers http prometheus decode num series",
/// The samples count of Prometheus remote write.
pub static ref PROM_STORE_REMOTE_WRITE_SAMPLES: IntCounter = register_int_counter!(
"greptime_servers_prometheus_remote_write_samples",
"frontend prometheus remote write samples"
)
.unwrap();
/// Http prometheus read duration per database.
Expand Down
5 changes: 4 additions & 1 deletion src/servers/src/proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ pub struct PromLabel {
}

impl Clear for PromLabel {
fn clear(&mut self) {}
fn clear(&mut self) {
self.name.clear();
self.value.clear();
}
}

impl PromLabel {
Expand Down
10 changes: 1 addition & 9 deletions src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub mod sql;
use std::collections::HashMap;
use std::sync::Arc;

use api::prom_store::remote::{ReadRequest, WriteRequest};
use api::prom_store::remote::ReadRequest;
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use common_query::Output;
Expand Down Expand Up @@ -90,14 +90,6 @@ pub struct PromStoreResponse {
pub trait PromStoreProtocolHandler {
/// Handling prometheus remote write requests
async fn write(
&self,
request: WriteRequest,
ctx: QueryContextRef,
with_metric_engine: bool,
) -> Result<Output>;

/// Handling prometheus remote write requests
async fn write_fast(
&self,
request: RowInsertRequests,
ctx: QueryContextRef,
Expand Down
11 changes: 1 addition & 10 deletions src/servers/tests/http/prom_store_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,7 @@ impl GrpcQueryHandler for DummyInstance {

#[async_trait]
impl PromStoreProtocolHandler for DummyInstance {
async fn write(&self, request: WriteRequest, ctx: QueryContextRef, _: bool) -> Result<Output> {
let _ = self
.tx
.send((ctx.current_schema().to_owned(), request.encode_to_vec()))
.await;

Ok(Output::new_with_affected_rows(0))
}

async fn write_fast(
async fn write(
&self,
_request: RowInsertRequests,
_ctx: QueryContextRef,
Expand Down
4 changes: 3 additions & 1 deletion tests-integration/src/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ mod tests {
use prost::Message;
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::prom_store;
use servers::prom_store::to_grpc_row_insert_requests;
use servers::query_handler::sql::SqlQueryHandler;
use servers::query_handler::PromStoreProtocolHandler;
use session::context::QueryContext;
Expand Down Expand Up @@ -107,8 +108,9 @@ mod tests {
.unwrap()
.is_ok());

let (row_inserts, _) = to_grpc_row_insert_requests(&write_request).unwrap();
instance
.write(write_request, ctx.clone(), true)
.write(row_inserts, ctx.clone(), true)
.await
.unwrap();

Expand Down