diff --git a/src/dataflow/src/source/kafka/metrics.rs b/src/dataflow/src/source/kafka/metrics.rs index 44203ef6b6818..168a804ea845b 100644 --- a/src/dataflow/src/source/kafka/metrics.rs +++ b/src/dataflow/src/source/kafka/metrics.rs @@ -31,13 +31,24 @@ impl KafkaPartitionMetrics { let pm = &base.partition_specific; Self { partition_offset_map: HashMap::from_iter(ids.iter().map(|id| { - let labels = &[topic.clone(), source_name.clone(), source_id.to_string(), format!("{}", id)]; - (*id, pm.partition_offset_max.get_delete_on_drop_gauge(labels.to_vec())) + let labels = &[ + topic.clone(), + source_name.clone(), + source_id.to_string(), + format!("{}", id), + ]; + ( + *id, + pm.partition_offset_max + .get_delete_on_drop_gauge(labels.to_vec()), + ) })), } } pub fn set_offset_max(&self, id: i32, offset: i64) { - self.partition_offset_map.get(&id).and_then(|g| Some(g.set(offset))); + self.partition_offset_map + .get(&id) + .and_then(|g| Some(g.set(offset))); } }