Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Apply metric filter to correct registry #73

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.net.{InetAddress, URI, URL, UnknownHostException}
import java.util
import java.util.Properties
import java.util.concurrent.TimeUnit

import com.banzaicloud.spark.metrics.NameDecorator.Replace
import com.banzaicloud.spark.metrics.PushTimestampDecorator.PushTimestampProvider
import com.banzaicloud.spark.metrics.{DeduplicatedCollectorRegistry, SparkDropwizardExports, SparkJmxExports}
Expand All @@ -31,6 +30,7 @@ import io.prometheus.client.exporter.PushGateway
import io.prometheus.jmx.JmxCollector
import org.apache.spark.internal.Logging

import java.util.function.BiConsumer
import scala.collection.JavaConverters._
import scala.collection.immutable.ListMap
import scala.collection.immutable
Expand All @@ -51,14 +51,14 @@ abstract class PrometheusSink(property: Properties,
pushGatewayBuilder: URL => PushGateway
) extends Logging {
import sinkConfig._

private val lbv = raw"(.+)\s*=\s*(.*)".r

protected class Reporter(registry: MetricRegistry, metricFilter: MetricFilter)
protected object Reporter
extends ScheduledReporter(
registry,
new MetricRegistry,
"prometheus-reporter",
metricFilter,
MetricFilter.ALL,
TimeUnit.SECONDS,
TimeUnit.MILLISECONDS) {

Expand Down Expand Up @@ -95,6 +95,33 @@ abstract class PrometheusSink(property: Properties,
customGroupKey(role, executorId, groupKey)
}.getOrElse(defaultGroupKey(role, executorId, appName, instance, labelsMap))

val pushRegistry: CollectorRegistry = new DeduplicatedCollectorRegistry()

if (enableDropwizardCollector) {
val filteredRegistry = new MetricRegistry

filteredRegistry.registerAll(new MetricSet {
override def getMetrics: util.Map[String, Metric] = {
val metrics = new util.HashMap[String, Metric]()
registry.getMetrics.forEach(new BiConsumer[String, Metric] {
override def accept(t: String, u: Metric): Unit = {
if (metricsFilter.matches(t, u)) {
metrics.put(t, u)
}
}
})
metrics
}
})

val sparkMetricExports = new SparkDropwizardExports(filteredRegistry, replace, labelsMap, pushTimestamp)
pushRegistry.register(sparkMetricExports)
}

if (enableJmxCollector) {
pushRegistry.register(jmxMetrics)
}

pushGateway.pushAdd(pushRegistry, job, groupingKey.asJava)
}
}
Expand Down Expand Up @@ -204,8 +231,6 @@ abstract class PrometheusSink(property: Properties,
key -> value
}.toMap

val pushRegistry: CollectorRegistry = new DeduplicatedCollectorRegistry()

private val pushTimestamp = if (enableTimestamp) Some(PushTimestampProvider()) else None

private val replace = metricsNameCaptureRegex.map(Replace(_, metricsNameReplacement))
Expand All @@ -226,30 +251,16 @@ abstract class PrometheusSink(property: Properties,
}
.getOrElse(MetricFilter.ALL)

val reporter = new Reporter(registry, metricsFilter)

def start(): Unit = {
if (enableDropwizardCollector) {
sparkMetricExports.register(pushRegistry)
}
if (enableJmxCollector) {
jmxMetrics.register(pushRegistry)
}
reporter.start(pollPeriod, pollUnit)
Reporter.start(pollPeriod, pollUnit)
}

def stop(): Unit = {
reporter.stop()
if (enableDropwizardCollector) {
pushRegistry.unregister(sparkMetricExports)
}
if (enableJmxCollector) {
pushRegistry.unregister(jmxMetrics)
}
Reporter.stop()
}

def report(): Unit = {
reporter.report()
Reporter.report()
}

private def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
Expand Down