From c6f016e2a9c20f3518b28e5ebf5b1c1744c9db45 Mon Sep 17 00:00:00 2001 From: Nick Harring Date: Wed, 9 Mar 2022 09:40:51 -0800 Subject: [PATCH 01/11] initial addition of broker high watermark per partition --- src/dataflow/src/source/kafka.rs | 51 +++++++++++++++++++++--- src/dataflow/src/source/kafka/metrics.rs | 43 ++++++++++++++++++++ src/dataflow/src/source/metrics.rs | 6 +++ 3 files changed, 95 insertions(+), 5 deletions(-) create mode 100644 src/dataflow/src/source/kafka/metrics.rs diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index d487ac73813de..f6ab3ef4a8f32 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -35,8 +35,11 @@ use mz_repr::adt::jsonb::Jsonb; use crate::logging::materialized::{Logger, MaterializedEvent}; use crate::source::{NextMessage, SourceMessage, SourceReader}; +use self::metrics::KafkaPartitionMetrics; use super::metrics::SourceBaseMetrics; +mod metrics; + /// Contains all information necessary to ingest data from Kafka pub struct KafkaSourceReader { /// Name of the topic on which this source is backed on @@ -69,6 +72,16 @@ pub struct KafkaSourceReader { // Drop order is important here, we want the thread to be unparked after the `partition_info` // Arc has been dropped, so that the unpacked thread notices it and exits immediately _metadata_thread_handle: UnparkOnDropHandle<()>, + /// A handle to the partition specific metrics + partition_metrics: KafkaPartitionMetrics, +} + +struct UnparkOnDrop(JoinHandle); + +impl Drop for UnparkOnDrop { + fn drop(&mut self) { + self.0.thread().unpark(); + } } impl SourceReader for KafkaSourceReader { @@ -87,7 +100,7 @@ impl SourceReader for KafkaSourceReader { restored_offsets: Vec<(PartitionId, Option)>, _: SourceDataEncoding, logger: Option, - _: SourceBaseMetrics, + base_metrics: SourceBaseMetrics, ) -> Result { let kc = match connector { ExternalSourceConnector::Kafka(kc) => kc, @@ -168,10 +181,10 @@ impl SourceReader for KafkaSourceReader { .unwrap() .unpark_on_drop() }; - + let partition_ids = start_offsets.keys().map(|i| *i).collect(); Ok(KafkaSourceReader { - topic_name: topic, - source_name, + topic_name: topic.clone(), + source_name: source_name.clone(), id: source_id, partition_consumers: VecDeque::new(), consumer, @@ -184,6 +197,13 @@ impl SourceReader for KafkaSourceReader { last_stats: None, partition_info, _metadata_thread_handle: metadata_thread_handle, + partition_metrics: KafkaPartitionMetrics::new( + &base_metrics, + partition_ids, + topic, + source_name, + source_id, + ), }) } @@ -390,7 +410,28 @@ impl KafkaSourceReader { old: self.last_stats.take(), new: Some(stats.clone()), }); - self.last_stats = Some(stats); + self.last_stats = Some(stats.clone()); + } + if let Some(obj) = stats.as_ref().to_serde_json().as_object() { + // The high watermark is per partition, get the per partition map from the json stats with the path topics.$topic_name.partitions + if let Some(partitions) = obj + .get("topics") + .and_then(|v| v.get(self.topic_name.clone())) + .and_then(|v| v.get("partitions")) + .and_then(|v| v.as_object()) + { + partitions.iter().for_each(|i| { + if let Ok(id) = str::parse::(i.0) { + if let Some(offset) = i.1.as_i64() { + self.partition_metrics.set_offset_max(id, offset); + } else { + error!("unable to get hi_offset for {}", id); + } + } else { + error!("unable to get partition id from {:?}", i.0); + } + }); + } } } } diff --git a/src/dataflow/src/source/kafka/metrics.rs b/src/dataflow/src/source/kafka/metrics.rs new file mode 100644 index 0000000000000..44203ef6b6818 --- /dev/null +++ b/src/dataflow/src/source/kafka/metrics.rs @@ -0,0 +1,43 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::collections::HashMap; + +use prometheus::core::AtomicI64; + +use mz_expr::SourceInstanceId; +use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt}; + +use crate::source::metrics::SourceBaseMetrics; + +pub(super) struct KafkaPartitionMetrics { + partition_offset_map: HashMap>>, +} + +impl KafkaPartitionMetrics { + pub fn new( + base: &SourceBaseMetrics, + ids: Vec, + topic: String, + source_name: String, + source_id: SourceInstanceId, + ) -> Self { + let pm = &base.partition_specific; + Self { + partition_offset_map: HashMap::from_iter(ids.iter().map(|id| { + let labels = &[topic.clone(), source_name.clone(), source_id.to_string(), format!("{}", id)]; + (*id, pm.partition_offset_max.get_delete_on_drop_gauge(labels.to_vec())) + })), + } + } + + pub fn set_offset_max(&self, id: i32, offset: i64) { + self.partition_offset_map.get(&id).and_then(|g| Some(g.set(offset))); + } +} diff --git a/src/dataflow/src/source/metrics.rs b/src/dataflow/src/source/metrics.rs index debb0043d69ea..fba33cfd69f46 100644 --- a/src/dataflow/src/source/metrics.rs +++ b/src/dataflow/src/source/metrics.rs @@ -110,6 +110,7 @@ pub(super) struct PartitionSpecificMetrics { pub(super) offset_received: IntGaugeVec, pub(super) closed_ts: UIntGaugeVec, pub(super) messages_ingested: IntCounterVec, + pub(super) partition_offset_max: IntGaugeVec, } impl PartitionSpecificMetrics { @@ -136,6 +137,11 @@ impl PartitionSpecificMetrics { help: "The number of messages ingested per partition.", var_labels: ["topic", "source_id", "source_instance", "partition_id"], )), + partition_offset_max: registry.register(metric!( + name: "mz_kafka_partition_offset_max", + help: "High watermark offset on broker for partition", + var_labels: ["topic", "source_id", "source_instance", "partition_id"], + )), } } } From e610ee24532823878e51882c80bbb8a8a92683e2 Mon Sep 17 00:00:00 2001 From: Nick Harring Date: Wed, 9 Mar 2022 14:46:16 -0800 Subject: [PATCH 02/11] lint --- src/dataflow/src/source/kafka/metrics.rs | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/dataflow/src/source/kafka/metrics.rs b/src/dataflow/src/source/kafka/metrics.rs index 44203ef6b6818..168a804ea845b 100644 --- a/src/dataflow/src/source/kafka/metrics.rs +++ b/src/dataflow/src/source/kafka/metrics.rs @@ -31,13 +31,24 @@ impl KafkaPartitionMetrics { let pm = &base.partition_specific; Self { partition_offset_map: HashMap::from_iter(ids.iter().map(|id| { - let labels = &[topic.clone(), source_name.clone(), source_id.to_string(), format!("{}", id)]; - (*id, pm.partition_offset_max.get_delete_on_drop_gauge(labels.to_vec())) + let labels = &[ + topic.clone(), + source_name.clone(), + source_id.to_string(), + format!("{}", id), + ]; + ( + *id, + pm.partition_offset_max + .get_delete_on_drop_gauge(labels.to_vec()), + ) })), } } pub fn set_offset_max(&self, id: i32, offset: i64) { - self.partition_offset_map.get(&id).and_then(|g| Some(g.set(offset))); + self.partition_offset_map + .get(&id) + .and_then(|g| Some(g.set(offset))); } } From 8d28582d45e0421acb4de968d163d104dc9b9c0a Mon Sep 17 00:00:00 2001 From: Nick Harring Date: Mon, 14 Mar 2022 09:22:38 -0700 Subject: [PATCH 03/11] fix rebase --- src/dataflow/src/source/kafka.rs | 8 -------- 1 file changed, 8 deletions(-) diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index f6ab3ef4a8f32..4950ada7457d0 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -76,14 +76,6 @@ pub struct KafkaSourceReader { partition_metrics: KafkaPartitionMetrics, } -struct UnparkOnDrop(JoinHandle); - -impl Drop for UnparkOnDrop { - fn drop(&mut self) { - self.0.thread().unpark(); - } -} - impl SourceReader for KafkaSourceReader { type Key = Option>; type Value = Option>; From 5c5f32a388230242f84e33b7e9ae5c93fbfdc4c0 Mon Sep 17 00:00:00 2001 From: Nick Harring Date: Mon, 14 Mar 2022 15:31:02 -0700 Subject: [PATCH 04/11] clean up deserialization --- src/dataflow/src/source/kafka.rs | 53 +++++++++++++----------- src/dataflow/src/source/kafka/metrics.rs | 25 +++++++---- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 4950ada7457d0..626d1f5ef19bc 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -7,18 +7,19 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use std::collections::{BTreeMap, HashMap, VecDeque}; -use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::Duration; - use mz_dataflow_types::sources::AwsExternalId; use rdkafka::consumer::base_consumer::PartitionQueue; use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext}; use rdkafka::error::KafkaError; use rdkafka::message::BorrowedMessage; +use rdkafka::statistics::Statistics; use rdkafka::topic_partition_list::Offset; use rdkafka::{ClientConfig, ClientContext, Message, TopicPartitionList}; +use serde::ser::StdError; +use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; use timely::scheduling::activate::SyncActivator; use tracing::{debug, error, info, warn}; use uuid::Uuid; @@ -190,7 +191,7 @@ impl SourceReader for KafkaSourceReader { partition_info, _metadata_thread_handle: metadata_thread_handle, partition_metrics: KafkaPartitionMetrics::new( - &base_metrics, + &base_metrics.partition_specific, partition_ids, topic, source_name, @@ -404,26 +405,30 @@ impl KafkaSourceReader { }); self.last_stats = Some(stats.clone()); } - if let Some(obj) = stats.as_ref().to_serde_json().as_object() { - // The high watermark is per partition, get the per partition map from the json stats with the path topics.$topic_name.partitions - if let Some(partitions) = obj - .get("topics") - .and_then(|v| v.get(self.topic_name.clone())) - .and_then(|v| v.get("partitions")) - .and_then(|v| v.as_object()) - { - partitions.iter().for_each(|i| { - if let Ok(id) = str::parse::(i.0) { - if let Some(offset) = i.1.as_i64() { - self.partition_metrics.set_offset_max(id, offset); - } else { - error!("unable to get hi_offset for {}", id); - } - } else { - error!("unable to get partition id from {:?}", i.0); - } + let mut writer = vec![]; + let write_res = stats.as_ref().to_writer(&mut writer); + // If the write failed log an error and continue to the next loop iteration + if let Err(e) = write_res { + error!("error serializing rdkafka stats: {}", e); + continue; + } + + // This temp var is required to satisfy the type checker + let res: Result = + serde_json::from_reader(writer.as_slice()); + match res { + Ok(statistics) => { + statistics.topics.get(&self.topic_name).and_then(|topic| { + topic.partitions.iter().for_each(|(id, partition)| { + self.partition_metrics + .set_offset_max(*id, partition.hi_offset); + }); + Some(()) }); } + Err(e) => { + error!("{}", e); + } } } } diff --git a/src/dataflow/src/source/kafka/metrics.rs b/src/dataflow/src/source/kafka/metrics.rs index 168a804ea845b..127148f3585d9 100644 --- a/src/dataflow/src/source/kafka/metrics.rs +++ b/src/dataflow/src/source/kafka/metrics.rs @@ -12,23 +12,25 @@ use std::collections::HashMap; use prometheus::core::AtomicI64; use mz_expr::SourceInstanceId; -use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt}; +use mz_ore::iter::IteratorExt; +use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt, IntGaugeVec}; -use crate::source::metrics::SourceBaseMetrics; +use crate::source::metrics::PartitionSpecificMetrics; pub(super) struct KafkaPartitionMetrics { + labels: Vec, + // gauge_vec: &'a IntGaugeVec, partition_offset_map: HashMap>>, } impl KafkaPartitionMetrics { pub fn new( - base: &SourceBaseMetrics, + metrics: &PartitionSpecificMetrics, ids: Vec, topic: String, source_name: String, source_id: SourceInstanceId, ) -> Self { - let pm = &base.partition_specific; Self { partition_offset_map: HashMap::from_iter(ids.iter().map(|id| { let labels = &[ @@ -39,16 +41,23 @@ impl KafkaPartitionMetrics { ]; ( *id, - pm.partition_offset_max + metrics + .partition_offset_max .get_delete_on_drop_gauge(labels.to_vec()), ) })), + labels: vec![topic.clone(), source_name.clone(), source_id.to_string()], + // gauge_vec: &metrics.partition_offset_max, } } - pub fn set_offset_max(&self, id: i32, offset: i64) { + pub fn set_offset_max(&mut self, id: i32, offset: i64) { self.partition_offset_map - .get(&id) - .and_then(|g| Some(g.set(offset))); + .entry(id) + // .or_insert_with_key(|id| { + // let labels = self.labels.iter().cloned().chain_one(format!("{}", id)).collect(); + // self.metrics.partition_offset_max.get_delete_on_drop_gauge(labels) + // }) + .and_modify(|gauge| gauge.set(offset)); } } From f790a7ad3ecbff4d0fb55ab715a4964b44f7e887 Mon Sep 17 00:00:00 2001 From: Nick Harring Date: Mon, 14 Mar 2022 15:55:40 -0700 Subject: [PATCH 05/11] use cursor instead of raw vec --- src/dataflow/src/source/kafka.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 626d1f5ef19bc..1d3d488bc0c92 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -405,8 +405,8 @@ impl KafkaSourceReader { }); self.last_stats = Some(stats.clone()); } - let mut writer = vec![]; - let write_res = stats.as_ref().to_writer(&mut writer); + let mut cursor = std::io::Cursor::new(vec![]); + let write_res = stats.as_ref().to_writer(&mut cursor); // If the write failed log an error and continue to the next loop iteration if let Err(e) = write_res { error!("error serializing rdkafka stats: {}", e); @@ -415,7 +415,7 @@ impl KafkaSourceReader { // This temp var is required to satisfy the type checker let res: Result = - serde_json::from_reader(writer.as_slice()); + serde_json::from_reader(cursor); match res { Ok(statistics) => { statistics.topics.get(&self.topic_name).and_then(|topic| { From d737730ac7a67757d126e49c098655e14bdd4371 Mon Sep 17 00:00:00 2001 From: Nick Harring Date: Mon, 14 Mar 2022 15:59:30 -0700 Subject: [PATCH 06/11] Lint --- src/dataflow/src/source/kafka.rs | 5 ++--- src/dataflow/src/source/kafka/metrics.rs | 8 ++++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 1d3d488bc0c92..5709c069ca304 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -15,7 +15,7 @@ use rdkafka::message::BorrowedMessage; use rdkafka::statistics::Statistics; use rdkafka::topic_partition_list::Offset; use rdkafka::{ClientConfig, ClientContext, Message, TopicPartitionList}; -use serde::ser::StdError; + use std::collections::{BTreeMap, HashMap, VecDeque}; use std::sync::{Arc, Mutex}; use std::thread; @@ -414,8 +414,7 @@ impl KafkaSourceReader { } // This temp var is required to satisfy the type checker - let res: Result = - serde_json::from_reader(cursor); + let res: Result = serde_json::from_reader(cursor); match res { Ok(statistics) => { statistics.topics.get(&self.topic_name).and_then(|topic| { diff --git a/src/dataflow/src/source/kafka/metrics.rs b/src/dataflow/src/source/kafka/metrics.rs index 127148f3585d9..1576154ba5a30 100644 --- a/src/dataflow/src/source/kafka/metrics.rs +++ b/src/dataflow/src/source/kafka/metrics.rs @@ -12,13 +12,13 @@ use std::collections::HashMap; use prometheus::core::AtomicI64; use mz_expr::SourceInstanceId; -use mz_ore::iter::IteratorExt; -use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt, IntGaugeVec}; + +use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt}; use crate::source::metrics::PartitionSpecificMetrics; pub(super) struct KafkaPartitionMetrics { - labels: Vec, + // labels: Vec, // gauge_vec: &'a IntGaugeVec, partition_offset_map: HashMap>>, } @@ -46,7 +46,7 @@ impl KafkaPartitionMetrics { .get_delete_on_drop_gauge(labels.to_vec()), ) })), - labels: vec![topic.clone(), source_name.clone(), source_id.to_string()], + // labels: vec![topic.clone(), source_name.clone(), source_id.to_string()], // gauge_vec: &metrics.partition_offset_max, } } From 06a9c4c3666c8addcfb50fd7557512e0398e4fad Mon Sep 17 00:00:00 2001 From: Nick Harring Date: Mon, 14 Mar 2022 17:37:21 -0700 Subject: [PATCH 07/11] address feedback --- src/dataflow/src/source/kafka.rs | 11 ++--------- src/dataflow/src/source/kafka/metrics.rs | 25 ++++++++++++------------ 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 5709c069ca304..da9cc689c1521 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -191,7 +191,7 @@ impl SourceReader for KafkaSourceReader { partition_info, _metadata_thread_handle: metadata_thread_handle, partition_metrics: KafkaPartitionMetrics::new( - &base_metrics.partition_specific, + base_metrics, partition_ids, topic, source_name, @@ -405,16 +405,9 @@ impl KafkaSourceReader { }); self.last_stats = Some(stats.clone()); } - let mut cursor = std::io::Cursor::new(vec![]); - let write_res = stats.as_ref().to_writer(&mut cursor); - // If the write failed log an error and continue to the next loop iteration - if let Err(e) = write_res { - error!("error serializing rdkafka stats: {}", e); - continue; - } // This temp var is required to satisfy the type checker - let res: Result = serde_json::from_reader(cursor); + let res: Result = serde_json::from_str(&stats.to_string()); match res { Ok(statistics) => { statistics.topics.get(&self.topic_name).and_then(|topic| { diff --git a/src/dataflow/src/source/kafka/metrics.rs b/src/dataflow/src/source/kafka/metrics.rs index 1576154ba5a30..09ac62afd9d79 100644 --- a/src/dataflow/src/source/kafka/metrics.rs +++ b/src/dataflow/src/source/kafka/metrics.rs @@ -14,23 +14,24 @@ use prometheus::core::AtomicI64; use mz_expr::SourceInstanceId; use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt}; - -use crate::source::metrics::PartitionSpecificMetrics; +use mz_ore::iter::IteratorExt; +use crate::source::metrics::SourceBaseMetrics; pub(super) struct KafkaPartitionMetrics { - // labels: Vec, - // gauge_vec: &'a IntGaugeVec, + labels: Vec, + base_metrics: SourceBaseMetrics, partition_offset_map: HashMap>>, } impl KafkaPartitionMetrics { pub fn new( - metrics: &PartitionSpecificMetrics, + base_metrics: SourceBaseMetrics, ids: Vec, topic: String, source_name: String, source_id: SourceInstanceId, ) -> Self { + let metrics = &base_metrics.partition_specific; Self { partition_offset_map: HashMap::from_iter(ids.iter().map(|id| { let labels = &[ @@ -46,18 +47,18 @@ impl KafkaPartitionMetrics { .get_delete_on_drop_gauge(labels.to_vec()), ) })), - // labels: vec![topic.clone(), source_name.clone(), source_id.to_string()], - // gauge_vec: &metrics.partition_offset_max, + labels: vec![topic.clone(), source_name.clone(), source_id.to_string()], + base_metrics, } } pub fn set_offset_max(&mut self, id: i32, offset: i64) { self.partition_offset_map .entry(id) - // .or_insert_with_key(|id| { - // let labels = self.labels.iter().cloned().chain_one(format!("{}", id)).collect(); - // self.metrics.partition_offset_max.get_delete_on_drop_gauge(labels) - // }) - .and_modify(|gauge| gauge.set(offset)); + .or_insert_with_key(|id| { + let labels = self.labels.iter().cloned().chain_one(format!("{}", id)).collect(); + self.base_metrics.partition_specific.partition_offset_max.get_delete_on_drop_gauge(labels) + }) + .set(offset); } } From 91b2ea2c18bb76503556e6ccf83514b147d831ad Mon Sep 17 00:00:00 2001 From: Nick Harring Date: Tue, 15 Mar 2022 15:16:53 -0700 Subject: [PATCH 08/11] add tests, fix issues found by tests --- src/dataflow/src/source/kafka.rs | 21 +++---- src/dataflow/src/source/kafka/metrics.rs | 10 ++++ .../kafka-dynamic-partition-metrics.td | 57 +++++++++++++++++++ test/metrics/kafka-partition-metrics.td | 46 +++++++++++++++ test/metrics/mzcompose | 14 +++++ test/metrics/mzcompose.py | 46 +++++++++++++++ 6 files changed, 184 insertions(+), 10 deletions(-) create mode 100644 test/metrics/kafka-dynamic-partition-metrics.td create mode 100644 test/metrics/kafka-partition-metrics.td create mode 100755 test/metrics/mzcompose create mode 100644 test/metrics/mzcompose.py diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index da9cc689c1521..2f83fe3c4567f 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -406,17 +406,18 @@ impl KafkaSourceReader { self.last_stats = Some(stats.clone()); } - // This temp var is required to satisfy the type checker - let res: Result = serde_json::from_str(&stats.to_string()); - match res { + match serde_json::from_str::(&stats.to_string()) { Ok(statistics) => { - statistics.topics.get(&self.topic_name).and_then(|topic| { - topic.partitions.iter().for_each(|(id, partition)| { - self.partition_metrics - .set_offset_max(*id, partition.hi_offset); - }); - Some(()) - }); + let topic = statistics.topics.get(&self.topic_name); + match topic { + Some(topic) => { + for (id, partition) in &topic.partitions { + self.partition_metrics + .set_offset_max(*id, partition.hi_offset); + } + } + None => error!("No stats found for topic: {}", &self.topic_name), + } } Err(e) => { error!("{}", e); diff --git a/src/dataflow/src/source/kafka/metrics.rs b/src/dataflow/src/source/kafka/metrics.rs index 09ac62afd9d79..089e1169036dd 100644 --- a/src/dataflow/src/source/kafka/metrics.rs +++ b/src/dataflow/src/source/kafka/metrics.rs @@ -15,6 +15,7 @@ use mz_expr::SourceInstanceId; use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt}; use mz_ore::iter::IteratorExt; +use tracing::error; use crate::source::metrics::SourceBaseMetrics; pub(super) struct KafkaPartitionMetrics { @@ -53,6 +54,15 @@ impl KafkaPartitionMetrics { } pub fn set_offset_max(&mut self, id: i32, offset: i64) { + // Valid partition ids start at 0, librdkafka uses -1 as a sentinel for unassigned partitions + if id < 0 { + return; + } + // This offset value is another librdkafka sentinel indicating it got an invalid + if offset == -1001 { + error!("Got invalid high watermark for partition {}", id); + return; + } self.partition_offset_map .entry(id) .or_insert_with_key(|id| { diff --git a/test/metrics/kafka-dynamic-partition-metrics.td b/test/metrics/kafka-dynamic-partition-metrics.td new file mode 100644 index 0000000000000..e2e965fccd0a7 --- /dev/null +++ b/test/metrics/kafka-dynamic-partition-metrics.td @@ -0,0 +1,57 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# +# Test per partition kafka metrics on initial and dynamically added partitions +# + +$ kafka-create-topic topic=input_csv + +> CREATE MATERIALIZED SOURCE input_csv (first, second) + FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-input_csv-${testdrive.seed}' WITH (topic_metadata_refresh_interval_ms=5000) + FORMAT CSV WITH 2 COLUMNS; + +$ kafka-ingest format=bytes topic=input_csv +1,2 +2,3 + +> SELECT * from input_csv; +first second mz_offset +---------------------- +1 2 1 +2 3 2 + +> SELECT labels->>'partition_id' AS partition_id, max(value) as value FROM mz_metrics WHERE metric='mz_kafka_partition_offset_max' group by partition_id; +partition_id value +------------------ +0 2 + +$ kafka-add-partitions topic=input_csv total-partitions=3 + +$ kafka-ingest format=bytes topic=input_csv partition=1 +3,4 +4,5 +$ kafka-ingest format=bytes topic=input_csv partition=2 +5,6 + +> SELECT * from input_csv; +first second mz_offset +---------------------- +1 2 1 +2 3 2 +3 4 1 +4 5 2 +5 6 1 + +> SELECT labels->>'partition_id' AS partition_id, max(value) as value FROM mz_metrics WHERE metric='mz_kafka_partition_offset_max' group by partition_id; +partition_id value +------------------ +0 2 +1 2 +2 1 \ No newline at end of file diff --git a/test/metrics/kafka-partition-metrics.td b/test/metrics/kafka-partition-metrics.td new file mode 100644 index 0000000000000..b9e2fd4735b94 --- /dev/null +++ b/test/metrics/kafka-partition-metrics.td @@ -0,0 +1,46 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +# +# Test per partition kafka metrics on initial and dynamically added partitions +# + +$ kafka-create-topic topic=input_csv partitions=3 + +> CREATE MATERIALIZED SOURCE input_csv (first, second) + FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-input_csv-${testdrive.seed}' WITH (topic_metadata_refresh_interval_ms=5000) + FORMAT CSV WITH 2 COLUMNS; + +$ kafka-ingest format=bytes topic=input_csv partition=0 +1,2 +2,3 + +$ kafka-ingest format=bytes topic=input_csv partition=1 +3,4 +4,5 + +$ kafka-ingest format=bytes topic=input_csv partition=2 +5,6 + + +> SELECT * from input_csv; +first second mz_offset +---------------------- +1 2 1 +2 3 2 +3 4 1 +4 5 2 +5 6 1 + +> SELECT labels->>'partition_id' AS partition_id, max(value) as value FROM mz_metrics WHERE metric='mz_kafka_partition_offset_max' group by partition_id; +partition_id value +------------------ +0 2 +1 2 +2 1 diff --git a/test/metrics/mzcompose b/test/metrics/mzcompose new file mode 100755 index 0000000000000..1f866645dabc8 --- /dev/null +++ b/test/metrics/mzcompose @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. +# +# mzcompose — runs Docker Compose with Materialize customizations. + +exec "$(dirname "$0")"/../../bin/pyactivate -m materialize.cli.mzcompose "$@" diff --git a/test/metrics/mzcompose.py b/test/metrics/mzcompose.py new file mode 100644 index 0000000000000..4ed7b0d00560f --- /dev/null +++ b/test/metrics/mzcompose.py @@ -0,0 +1,46 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +from materialize.mzcompose import Composition, WorkflowArgumentParser +from materialize.mzcompose.services import ( + Kafka, + Materialized, + SchemaRegistry, + Testdrive, + Zookeeper, +) + +SERVICES = [ + Materialized(), + Testdrive(), + Zookeeper(), + Kafka(), + SchemaRegistry(), +] + + +def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None: + """Run testdrive against various sources to validate expected metrics behavior""" + parser.add_argument( + "files", + nargs="*", + default=["*.td"], + help="run against the specified files", + ) + + args = parser.parse_args() + c.start_and_wait_for_tcp( + services=[ + "zookeeper", + "kafka", + "schema-registry", + "materialized", + ] + ) + c.run("testdrive-svc", *args.files) From 2b6aa93452ad96b889e6560c21343da9a2a8dfa4 Mon Sep 17 00:00:00 2001 From: Nick Harring Date: Tue, 15 Mar 2022 15:30:12 -0700 Subject: [PATCH 09/11] lint --- src/dataflow/src/source/kafka/metrics.rs | 16 ++++++++++++---- test/metrics/kafka-dynamic-partition-metrics.td | 2 +- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/dataflow/src/source/kafka/metrics.rs b/src/dataflow/src/source/kafka/metrics.rs index 089e1169036dd..494157a4cfc70 100644 --- a/src/dataflow/src/source/kafka/metrics.rs +++ b/src/dataflow/src/source/kafka/metrics.rs @@ -13,10 +13,10 @@ use prometheus::core::AtomicI64; use mz_expr::SourceInstanceId; -use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt}; +use crate::source::metrics::SourceBaseMetrics; use mz_ore::iter::IteratorExt; +use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt}; use tracing::error; -use crate::source::metrics::SourceBaseMetrics; pub(super) struct KafkaPartitionMetrics { labels: Vec, @@ -66,8 +66,16 @@ impl KafkaPartitionMetrics { self.partition_offset_map .entry(id) .or_insert_with_key(|id| { - let labels = self.labels.iter().cloned().chain_one(format!("{}", id)).collect(); - self.base_metrics.partition_specific.partition_offset_max.get_delete_on_drop_gauge(labels) + self.base_metrics + .partition_specific + .partition_offset_max + .get_delete_on_drop_gauge( + self.labels + .iter() + .cloned() + .chain_one(format!("{}", id)) + .collect(), + ) }) .set(offset); } diff --git a/test/metrics/kafka-dynamic-partition-metrics.td b/test/metrics/kafka-dynamic-partition-metrics.td index e2e965fccd0a7..7e1da7d6a3261 100644 --- a/test/metrics/kafka-dynamic-partition-metrics.td +++ b/test/metrics/kafka-dynamic-partition-metrics.td @@ -54,4 +54,4 @@ partition_id value ------------------ 0 2 1 2 -2 1 \ No newline at end of file +2 1 From 1a8706d342d399ac0e3ec0fed504b49661c9b2f7 Mon Sep 17 00:00:00 2001 From: Nick Harring Date: Wed, 16 Mar 2022 09:31:52 -0700 Subject: [PATCH 10/11] feedback --- ci/test/pipeline.template.yml | 8 ++++ src/dataflow/src/source/kafka.rs | 19 ++++---- src/dataflow/src/source/kafka/metrics.rs | 10 ++-- .../kafka-dynamic-partition-metrics.td | 25 +++++----- test/metrics/kafka-partition-metrics.td | 46 ------------------- 5 files changed, 36 insertions(+), 72 deletions(-) delete mode 100644 test/metrics/kafka-partition-metrics.td diff --git a/ci/test/pipeline.template.yml b/ci/test/pipeline.template.yml index 11e1b874941fd..1860bc9eb710f 100644 --- a/ci/test/pipeline.template.yml +++ b/ci/test/pipeline.template.yml @@ -357,6 +357,14 @@ steps: - ./ci/plugins/mzcompose: composition: kafka-exactly-once + - id: metrics + label: mz_metrics tests + depends_on: build-x86_64 + artifact_paths: junit_mzcompose_*.xml + plugins: + - ./ci/plugins/mzcompose: + composition: metrics + - id: lang-csharp label: ":csharp: tests" depends_on: build-x86_64 diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index 2f83fe3c4567f..d455de9b4de9e 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -7,7 +7,11 @@ // the Business Source License, use of this software will be governed // by the Apache License, Version 2.0. -use mz_dataflow_types::sources::AwsExternalId; +use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::sync::{Arc, Mutex}; +use std::thread; +use std::time::Duration; + use rdkafka::consumer::base_consumer::PartitionQueue; use rdkafka::consumer::{BaseConsumer, Consumer, ConsumerContext}; use rdkafka::error::KafkaError; @@ -15,17 +19,12 @@ use rdkafka::message::BorrowedMessage; use rdkafka::statistics::Statistics; use rdkafka::topic_partition_list::Offset; use rdkafka::{ClientConfig, ClientContext, Message, TopicPartitionList}; - -use std::collections::{BTreeMap, HashMap, VecDeque}; -use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::Duration; use timely::scheduling::activate::SyncActivator; use tracing::{debug, error, info, warn}; use uuid::Uuid; use mz_dataflow_types::sources::{ - encoding::SourceDataEncoding, ExternalSourceConnector, KafkaOffset, KafkaSourceConnector, + encoding::SourceDataEncoding, AwsExternalId, ExternalSourceConnector, KafkaOffset, KafkaSourceConnector, MzOffset, }; use mz_expr::{PartitionId, SourceInstanceId}; @@ -36,8 +35,8 @@ use mz_repr::adt::jsonb::Jsonb; use crate::logging::materialized::{Logger, MaterializedEvent}; use crate::source::{NextMessage, SourceMessage, SourceReader}; -use self::metrics::KafkaPartitionMetrics; use super::metrics::SourceBaseMetrics; +use self::metrics::KafkaPartitionMetrics; mod metrics; @@ -174,7 +173,7 @@ impl SourceReader for KafkaSourceReader { .unwrap() .unpark_on_drop() }; - let partition_ids = start_offsets.keys().map(|i| *i).collect(); + let partition_ids = start_offsets.keys().copied().collect(); Ok(KafkaSourceReader { topic_name: topic.clone(), source_name: source_name.clone(), @@ -420,7 +419,7 @@ impl KafkaSourceReader { } } Err(e) => { - error!("{}", e); + error!("failed decoding librdkafka statistics JSON: {}", e); } } } diff --git a/src/dataflow/src/source/kafka/metrics.rs b/src/dataflow/src/source/kafka/metrics.rs index 494157a4cfc70..edbfffd90ed92 100644 --- a/src/dataflow/src/source/kafka/metrics.rs +++ b/src/dataflow/src/source/kafka/metrics.rs @@ -10,14 +10,13 @@ use std::collections::HashMap; use prometheus::core::AtomicI64; +use tracing::debug; use mz_expr::SourceInstanceId; - -use crate::source::metrics::SourceBaseMetrics; use mz_ore::iter::IteratorExt; use mz_ore::metrics::{DeleteOnDropGauge, GaugeVecExt}; -use tracing::error; +use crate::source::metrics::SourceBaseMetrics; pub(super) struct KafkaPartitionMetrics { labels: Vec, base_metrics: SourceBaseMetrics, @@ -58,9 +57,10 @@ impl KafkaPartitionMetrics { if id < 0 { return; } - // This offset value is another librdkafka sentinel indicating it got an invalid + // This offset value is another librdkafka sentinel indicating it got an invalid high watermark from the broker if offset == -1001 { - error!("Got invalid high watermark for partition {}", id); + // TODO(nharring-adjacent): This is potentially spammy so its at debug but it would be better as info with sampling + debug!("Got invalid high watermark for partition {}", id); return; } self.partition_offset_map diff --git a/test/metrics/kafka-dynamic-partition-metrics.td b/test/metrics/kafka-dynamic-partition-metrics.td index 7e1da7d6a3261..acaf832def151 100644 --- a/test/metrics/kafka-dynamic-partition-metrics.td +++ b/test/metrics/kafka-dynamic-partition-metrics.td @@ -11,40 +11,42 @@ # Test per partition kafka metrics on initial and dynamically added partitions # -$ kafka-create-topic topic=input_csv +$ kafka-create-topic topic=input_csv partitions=2 > CREATE MATERIALIZED SOURCE input_csv (first, second) FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-input_csv-${testdrive.seed}' WITH (topic_metadata_refresh_interval_ms=5000) FORMAT CSV WITH 2 COLUMNS; -$ kafka-ingest format=bytes topic=input_csv +$ kafka-ingest format=bytes topic=input_csv partition=0 1,2 +$ kafka-ingest format=bytes topic=input_csv partition=1 2,3 > SELECT * from input_csv; first second mz_offset ---------------------- 1 2 1 -2 3 2 +2 3 1 > SELECT labels->>'partition_id' AS partition_id, max(value) as value FROM mz_metrics WHERE metric='mz_kafka_partition_offset_max' group by partition_id; partition_id value ------------------ -0 2 +0 1 +1 1 -$ kafka-add-partitions topic=input_csv total-partitions=3 +$ kafka-add-partitions topic=input_csv total-partitions=4 -$ kafka-ingest format=bytes topic=input_csv partition=1 +$ kafka-ingest format=bytes topic=input_csv partition=2 3,4 4,5 -$ kafka-ingest format=bytes topic=input_csv partition=2 +$ kafka-ingest format=bytes topic=input_csv partition=3 5,6 > SELECT * from input_csv; first second mz_offset ---------------------- 1 2 1 -2 3 2 +2 3 1 3 4 1 4 5 2 5 6 1 @@ -52,6 +54,7 @@ first second mz_offset > SELECT labels->>'partition_id' AS partition_id, max(value) as value FROM mz_metrics WHERE metric='mz_kafka_partition_offset_max' group by partition_id; partition_id value ------------------ -0 2 -1 2 -2 1 +0 1 +1 1 +2 2 +3 1 diff --git a/test/metrics/kafka-partition-metrics.td b/test/metrics/kafka-partition-metrics.td deleted file mode 100644 index b9e2fd4735b94..0000000000000 --- a/test/metrics/kafka-partition-metrics.td +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright Materialize, Inc. and contributors. All rights reserved. -# -# Use of this software is governed by the Business Source License -# included in the LICENSE file at the root of this repository. -# -# As of the Change Date specified in that file, in accordance with -# the Business Source License, use of this software will be governed -# by the Apache License, Version 2.0. - -# -# Test per partition kafka metrics on initial and dynamically added partitions -# - -$ kafka-create-topic topic=input_csv partitions=3 - -> CREATE MATERIALIZED SOURCE input_csv (first, second) - FROM KAFKA BROKER '${testdrive.kafka-addr}' TOPIC 'testdrive-input_csv-${testdrive.seed}' WITH (topic_metadata_refresh_interval_ms=5000) - FORMAT CSV WITH 2 COLUMNS; - -$ kafka-ingest format=bytes topic=input_csv partition=0 -1,2 -2,3 - -$ kafka-ingest format=bytes topic=input_csv partition=1 -3,4 -4,5 - -$ kafka-ingest format=bytes topic=input_csv partition=2 -5,6 - - -> SELECT * from input_csv; -first second mz_offset ----------------------- -1 2 1 -2 3 2 -3 4 1 -4 5 2 -5 6 1 - -> SELECT labels->>'partition_id' AS partition_id, max(value) as value FROM mz_metrics WHERE metric='mz_kafka_partition_offset_max' group by partition_id; -partition_id value ------------------- -0 2 -1 2 -2 1 From 52e12b60d8b2f555f43d430a7d605e79cd4f7f0f Mon Sep 17 00:00:00 2001 From: Nick Harring Date: Wed, 16 Mar 2022 11:31:43 -0700 Subject: [PATCH 11/11] lint --- src/dataflow/src/source/kafka.rs | 6 +++--- src/dataflow/src/source/kafka/metrics.rs | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/dataflow/src/source/kafka.rs b/src/dataflow/src/source/kafka.rs index d455de9b4de9e..83ddcba09e395 100644 --- a/src/dataflow/src/source/kafka.rs +++ b/src/dataflow/src/source/kafka.rs @@ -24,8 +24,8 @@ use tracing::{debug, error, info, warn}; use uuid::Uuid; use mz_dataflow_types::sources::{ - encoding::SourceDataEncoding, AwsExternalId, ExternalSourceConnector, KafkaOffset, KafkaSourceConnector, - MzOffset, + encoding::SourceDataEncoding, AwsExternalId, ExternalSourceConnector, KafkaOffset, + KafkaSourceConnector, MzOffset, }; use mz_expr::{PartitionId, SourceInstanceId}; use mz_kafka_util::{client::MzClientContext, KafkaAddrs}; @@ -35,8 +35,8 @@ use mz_repr::adt::jsonb::Jsonb; use crate::logging::materialized::{Logger, MaterializedEvent}; use crate::source::{NextMessage, SourceMessage, SourceReader}; -use super::metrics::SourceBaseMetrics; use self::metrics::KafkaPartitionMetrics; +use super::metrics::SourceBaseMetrics; mod metrics; diff --git a/src/dataflow/src/source/kafka/metrics.rs b/src/dataflow/src/source/kafka/metrics.rs index edbfffd90ed92..854483827fc5c 100644 --- a/src/dataflow/src/source/kafka/metrics.rs +++ b/src/dataflow/src/source/kafka/metrics.rs @@ -59,7 +59,7 @@ impl KafkaPartitionMetrics { } // This offset value is another librdkafka sentinel indicating it got an invalid high watermark from the broker if offset == -1001 { - // TODO(nharring-adjacent): This is potentially spammy so its at debug but it would be better as info with sampling + // TODO(nharring-adjacent): This is potentially spammy so its at debug but it would be better as info with sampling debug!("Got invalid high watermark for partition {}", id); return; }