diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 78b7d3c9dfe4..52b8adb5615a 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -31,7 +31,6 @@ import java.util.Optional; import java.util.Random; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -210,8 +209,6 @@ public class StreamingDataflowWorker { private final long clientId; private final MetricTrackingWindmillServerStub metricTrackingWindmillServer; - private final java.util.concurrent.ConcurrentLinkedQueue pendingMonitoringInfos = - new ConcurrentLinkedQueue<>(); // Map from stage name to StageInfo containing metrics container registry and per stage counters. private final ConcurrentMap stageInfoMap; @@ -1218,9 +1215,6 @@ private void process( LOG.error(e.toString()); } - Iterables.addAll( - this.pendingMonitoringInfos, executionState.workExecutor().extractMetricUpdates()); - commitCallbacks.putAll(executionState.context().flushState()); // Release the execution state for another thread to use.