Skip to content

Commit

Permalink
chore(query): add connect duration metric for udf (#16407)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhang2014 authored Sep 6, 2024
1 parent 24e2749 commit dd21d7d
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 2 deletions.
11 changes: 11 additions & 0 deletions src/common/metrics/src/metrics/external_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,23 @@ use databend_common_base::runtime::metrics::FamilyHistogram;
use crate::VecLabels;

const METRIC_REQUEST_EXTERNAL_DURATION: &str = "external_request_duration";
const METRIC_CONNECT_EXTERNAL_DURATION: &str = "external_connect_duration";

static REQUEST_EXTERNAL_DURATION: LazyLock<FamilyHistogram<VecLabels>> =
LazyLock::new(|| register_histogram_family_in_seconds(METRIC_REQUEST_EXTERNAL_DURATION));

static CONNECT_EXTERNAL_DURATION: LazyLock<FamilyHistogram<VecLabels>> =
LazyLock::new(|| register_histogram_family_in_seconds(METRIC_CONNECT_EXTERNAL_DURATION));

const LABEL_FUNCTION_NAME: &str = "function_name";

pub fn record_connect_external_duration(function_name: String, duration: Duration) {
let labels = &vec![(LABEL_FUNCTION_NAME, function_name)];
CONNECT_EXTERNAL_DURATION
.get_or_create(labels)
.observe(duration.as_millis_f64());
}

pub fn record_request_external_duration(function_name: String, duration: Duration) {
let labels = &vec![(LABEL_FUNCTION_NAME, function_name)];
REQUEST_EXTERNAL_DURATION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use databend_common_expression::BlockEntry;
use databend_common_expression::DataBlock;
use databend_common_expression::DataField;
use databend_common_expression::DataSchema;
use databend_common_metrics::external_server::record_connect_external_duration;
use databend_common_metrics::external_server::record_request_external_duration;
use databend_common_pipeline_transforms::processors::AsyncTransform;
use databend_common_sql::executor::physical_plans::UdfFunctionDesc;
Expand All @@ -56,7 +57,6 @@ impl TransformUdfServer {
let s = Self {
ctx,
funcs,

connect_timeout,
request_timeout,
request_bacth_rows,
Expand Down Expand Up @@ -111,11 +111,18 @@ impl TransformUdfServer {
.with_tenant(ctx.get_tenant().tenant_name())?
.with_func_name(&func.func_name)?
.with_query_id(&ctx.get_id())?;

let connect_duration = instant.elapsed();
record_connect_external_duration(func.func_name.clone(), connect_duration);

Profile::record_usize_profile(ProfileStatisticsName::ExternalServerRequestCount, 1);
let result_batch = client
.do_exchange(&func.func_name, input_batch.clone())
.await?;
record_request_external_duration(func.func_name.clone(), instant.elapsed());

let request_duration = instant.elapsed() - connect_duration;
record_request_external_duration(func.func_name.clone(), request_duration);

let schema = DataSchema::try_from(&(*result_batch.schema()))?;
let (result_block, result_schema) = DataBlock::from_record_batch(&schema, &result_batch)
.map_err(|err| {
Expand Down

0 comments on commit dd21d7d

Please sign in to comment.