From d50be23163bc17bd32d4f09f40e8fb2a6b2bbdba Mon Sep 17 00:00:00 2001 From: Jayaj Poudel Date: Thu, 14 Dec 2023 23:38:15 +0000 Subject: [PATCH] test commit #2 --- .../dataflow/worker/StreamingStepMetricsContainer.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java index 875a2d649ece2..f5233c7bbf957 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java @@ -56,8 +56,7 @@ public class StreamingStepMetricsContainer implements MetricsContainer { private MetricsMap counters = new MetricsMap<>(DeltaCounterCell::new); - private MetricsMap perWorkerCounters = - new MetricsMap<>(DeltaCounterCell::new); + private ConcurrentHashMap perWorkerCounters = new ConcurrentHashMap<>(); private MetricsMap gauges = new MetricsMap<>(GaugeCell::new); @@ -88,7 +87,7 @@ public Counter getCounter(MetricName metricName) { @Override public Counter getPerWorkerCounter(MetricName metricName) { if (enablePerWorkerMetrics) { - return perWorkerCounters.get(metricName); + return new RemoveSafeDeltaCounterCell(metricName, perWorkerCounters); } else { return MetricsContainer.super.getPerWorkerCounter(metricName); }