Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add prometheus metric for per partition high watermark #11148

Merged
merged 11 commits into from
Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 38 additions & 5 deletions src/dataflow/src/source/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,11 @@ use mz_repr::adt::jsonb::Jsonb;
use crate::logging::materialized::{Logger, MaterializedEvent};
use crate::source::{NextMessage, SourceMessage, SourceReader};

use self::metrics::KafkaPartitionMetrics;
use super::metrics::SourceBaseMetrics;

mod metrics;

/// Contains all information necessary to ingest data from Kafka
pub struct KafkaSourceReader {
/// Name of the topic on which this source is backed on
Expand Down Expand Up @@ -69,6 +72,8 @@ pub struct KafkaSourceReader {
// Drop order is important here, we want the thread to be unparked after the `partition_info`
// Arc has been dropped, so that the unpacked thread notices it and exits immediately
_metadata_thread_handle: UnparkOnDropHandle<()>,
/// A handle to the partition specific metrics
partition_metrics: KafkaPartitionMetrics,
}

impl SourceReader for KafkaSourceReader {
Expand All @@ -87,7 +92,7 @@ impl SourceReader for KafkaSourceReader {
restored_offsets: Vec<(PartitionId, Option<MzOffset>)>,
_: SourceDataEncoding,
logger: Option<Logger>,
_: SourceBaseMetrics,
base_metrics: SourceBaseMetrics,
) -> Result<Self, anyhow::Error> {
let kc = match connector {
ExternalSourceConnector::Kafka(kc) => kc,
Expand Down Expand Up @@ -168,10 +173,10 @@ impl SourceReader for KafkaSourceReader {
.unwrap()
.unpark_on_drop()
};

let partition_ids = start_offsets.keys().map(|i| *i).collect();
nharring-adjacent marked this conversation as resolved.
Show resolved Hide resolved
Ok(KafkaSourceReader {
topic_name: topic,
source_name,
topic_name: topic.clone(),
source_name: source_name.clone(),
id: source_id,
partition_consumers: VecDeque::new(),
consumer,
Expand All @@ -184,6 +189,13 @@ impl SourceReader for KafkaSourceReader {
last_stats: None,
partition_info,
_metadata_thread_handle: metadata_thread_handle,
partition_metrics: KafkaPartitionMetrics::new(
&base_metrics,
partition_ids,
topic,
source_name,
source_id,
),
})
}

Expand Down Expand Up @@ -390,7 +402,28 @@ impl KafkaSourceReader {
old: self.last_stats.take(),
new: Some(stats.clone()),
});
self.last_stats = Some(stats);
self.last_stats = Some(stats.clone());
}
if let Some(obj) = stats.as_ref().to_serde_json().as_object() {
nharring-adjacent marked this conversation as resolved.
Show resolved Hide resolved
// The high watermark is per partition, get the per partition map from the json stats with the path topics.$topic_name.partitions
if let Some(partitions) = obj
.get("topics")
.and_then(|v| v.get(self.topic_name.clone()))
.and_then(|v| v.get("partitions"))
.and_then(|v| v.as_object())
{
partitions.iter().for_each(|i| {
if let Ok(id) = str::parse::<i32>(i.0) {
if let Some(offset) = i.1.as_i64() {
self.partition_metrics.set_offset_max(id, offset);
} else {
error!("unable to get hi_offset for {}", id);
}
} else {
error!("unable to get partition id from {:?}", i.0);
}
});
}
}
}
}
Expand Down
54 changes: 54 additions & 0 deletions src/dataflow/src/source/kafka/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::HashMap;

use prometheus::core::AtomicI64;

use mz_expr::SourceInstanceId;
use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt};

use crate::source::metrics::SourceBaseMetrics;

pub(super) struct KafkaPartitionMetrics {
partition_offset_map: HashMap<i32, DeleteOnDropGauge<'static, AtomicI64, Vec<String>>>,
}

impl KafkaPartitionMetrics {
pub fn new(
base: &SourceBaseMetrics,
ids: Vec<i32>,
topic: String,
source_name: String,
source_id: SourceInstanceId,
) -> Self {
let pm = &base.partition_specific;
Self {
partition_offset_map: HashMap::from_iter(ids.iter().map(|id| {
nharring-adjacent marked this conversation as resolved.
Show resolved Hide resolved
let labels = &[
topic.clone(),
source_name.clone(),
source_id.to_string(),
format!("{}", id),
];
(
*id,
pm.partition_offset_max
.get_delete_on_drop_gauge(labels.to_vec()),
)
})),
}
}

pub fn set_offset_max(&self, id: i32, offset: i64) {
self.partition_offset_map
.get(&id)
.and_then(|g| Some(g.set(offset)));
}
}
6 changes: 6 additions & 0 deletions src/dataflow/src/source/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub(super) struct PartitionSpecificMetrics {
pub(super) offset_received: IntGaugeVec,
pub(super) closed_ts: UIntGaugeVec,
pub(super) messages_ingested: IntCounterVec,
pub(super) partition_offset_max: IntGaugeVec,
}

impl PartitionSpecificMetrics {
Expand All @@ -136,6 +137,11 @@ impl PartitionSpecificMetrics {
help: "The number of messages ingested per partition.",
var_labels: ["topic", "source_id", "source_instance", "partition_id"],
)),
partition_offset_max: registry.register(metric!(
name: "mz_kafka_partition_offset_max",
help: "High watermark offset on broker for partition",
var_labels: ["topic", "source_id", "source_instance", "partition_id"],
)),
}
}
}
Expand Down