From 1892bff7d596f95978cdbd8174af7fdafea77440 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Szak=C3=A1llas?= Date: Sat, 23 Jan 2021 02:31:23 +0100 Subject: [PATCH] apply metric filter to correct registry --- .../spark/metrics/sink/PrometheusSink.scala | 59 +++++++++++-------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/src/main/scala/com/banzaicloud/spark/metrics/sink/PrometheusSink.scala b/src/main/scala/com/banzaicloud/spark/metrics/sink/PrometheusSink.scala index bbfa57b..3143630 100644 --- a/src/main/scala/com/banzaicloud/spark/metrics/sink/PrometheusSink.scala +++ b/src/main/scala/com/banzaicloud/spark/metrics/sink/PrometheusSink.scala @@ -21,7 +21,6 @@ import java.net.{InetAddress, URI, URL, UnknownHostException} import java.util import java.util.Properties import java.util.concurrent.TimeUnit - import com.banzaicloud.spark.metrics.NameDecorator.Replace import com.banzaicloud.spark.metrics.PushTimestampDecorator.PushTimestampProvider import com.banzaicloud.spark.metrics.{DeduplicatedCollectorRegistry, SparkDropwizardExports, SparkJmxExports} @@ -31,6 +30,7 @@ import io.prometheus.client.exporter.PushGateway import io.prometheus.jmx.JmxCollector import org.apache.spark.internal.Logging +import java.util.function.BiConsumer import scala.collection.JavaConverters._ import scala.collection.immutable.ListMap import scala.collection.immutable @@ -51,14 +51,14 @@ abstract class PrometheusSink(property: Properties, pushGatewayBuilder: URL => PushGateway ) extends Logging { import sinkConfig._ - + private val lbv = raw"(.+)\s*=\s*(.*)".r - protected class Reporter(registry: MetricRegistry, metricFilter: MetricFilter) + protected object Reporter extends ScheduledReporter( - registry, + new MetricRegistry, "prometheus-reporter", - metricFilter, + MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.MILLISECONDS) { @@ -95,6 +95,33 @@ abstract class PrometheusSink(property: Properties, customGroupKey(role, executorId, groupKey) }.getOrElse(defaultGroupKey(role, executorId, appName, instance, labelsMap)) + val pushRegistry: CollectorRegistry = new DeduplicatedCollectorRegistry() + + if (enableDropwizardCollector) { + val filteredRegistry = new MetricRegistry + + filteredRegistry.registerAll(new MetricSet { + override def getMetrics: util.Map[String, Metric] = { + val metrics = new util.HashMap[String, Metric]() + registry.getMetrics.forEach(new BiConsumer[String, Metric] { + override def accept(t: String, u: Metric): Unit = { + if (metricsFilter.matches(t, u)) { + metrics.put(t, u) + } + } + }) + metrics + } + }) + + val sparkMetricExports = new SparkDropwizardExports(filteredRegistry, replace, labelsMap, pushTimestamp) + pushRegistry.register(sparkMetricExports) + } + + if (enableJmxCollector) { + pushRegistry.register(jmxMetrics) + } + pushGateway.pushAdd(pushRegistry, job, groupingKey.asJava) } } @@ -204,8 +231,6 @@ abstract class PrometheusSink(property: Properties, key -> value }.toMap - val pushRegistry: CollectorRegistry = new DeduplicatedCollectorRegistry() - private val pushTimestamp = if (enableTimestamp) Some(PushTimestampProvider()) else None private val replace = metricsNameCaptureRegex.map(Replace(_, metricsNameReplacement)) @@ -226,30 +251,16 @@ abstract class PrometheusSink(property: Properties, } .getOrElse(MetricFilter.ALL) - val reporter = new Reporter(registry, metricsFilter) - def start(): Unit = { - if (enableDropwizardCollector) { - sparkMetricExports.register(pushRegistry) - } - if (enableJmxCollector) { - jmxMetrics.register(pushRegistry) - } - reporter.start(pollPeriod, pollUnit) + Reporter.start(pollPeriod, pollUnit) } def stop(): Unit = { - reporter.stop() - if (enableDropwizardCollector) { - pushRegistry.unregister(sparkMetricExports) - } - if (enableJmxCollector) { - pushRegistry.unregister(jmxMetrics) - } + Reporter.stop() } def report(): Unit = { - reporter.report() + Reporter.report() } private def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {