Skip to content

Commit

Permalink
[SPARK-26285][CORE] accumulator metrics sources for LongAccumulator a…
Browse files Browse the repository at this point in the history
…nd Doub…

…leAccumulator

## What changes were proposed in this pull request?

This PR implements metric sources for LongAccumulator and DoubleAccumulator, such that a user can register these accumulators easily and have their values be reported by the driver's metric namespace.

## How was this patch tested?

Unit tests, and manual tests.

Please review http://spark.apache.org/contributing.html before opening a pull request.

Closes apache#23242 from abellina/SPARK-26285_accumulator_source.

Lead-authored-by: Alessandro Bellina <abellina@yahoo-inc.com>
Co-authored-by: Alessandro Bellina <abellina@oath.com>
Co-authored-by: Alessandro Bellina <abellina@gmail.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
  • Loading branch information
3 people authored and jackylee-ch committed Feb 18, 2019
1 parent 795b2b2 commit bbea25d
Show file tree
Hide file tree
Showing 3 changed files with 257 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.metrics.source

import com.codahale.metrics.{Gauge, MetricRegistry}

import org.apache.spark.SparkContext
import org.apache.spark.annotation.Experimental
import org.apache.spark.util.{AccumulatorV2, DoubleAccumulator, LongAccumulator}

/**
* AccumulatorSource is a Spark metric Source that reports the current value
* of the accumulator as a gauge.
*
* It is restricted to the LongAccumulator and the DoubleAccumulator, as those
* are the current built-in numerical accumulators with Spark, and excludes
* the CollectionAccumulator, as that is a List of values (hard to report,
* to a metrics system)
*/
private[spark] class AccumulatorSource extends Source {
private val registry = new MetricRegistry
protected def register[T](accumulators: Map[String, AccumulatorV2[_, T]]): Unit = {
accumulators.foreach {
case (name, accumulator) =>
val gauge = new Gauge[T] {
override def getValue: T = accumulator.value
}
registry.register(MetricRegistry.name(name), gauge)
}
}

override def sourceName: String = "AccumulatorSource"
override def metricRegistry: MetricRegistry = registry
}

@Experimental
class LongAccumulatorSource extends AccumulatorSource

@Experimental
class DoubleAccumulatorSource extends AccumulatorSource

/**
* :: Experimental ::
* Metrics source specifically for LongAccumulators. Accumulators
* are only valid on the driver side, so these metrics are reported
* only by the driver.
* Register LongAccumulators using:
* LongAccumulatorSource.register(sc, {"name" -> longAccumulator})
*/
@Experimental
object LongAccumulatorSource {
def register(sc: SparkContext, accumulators: Map[String, LongAccumulator]): Unit = {
val source = new LongAccumulatorSource
source.register(accumulators)
sc.env.metricsSystem.registerSource(source)
}
}

/**
* :: Experimental ::
* Metrics source specifically for DoubleAccumulators. Accumulators
* are only valid on the driver side, so these metrics are reported
* only by the driver.
* Register DoubleAccumulators using:
* DoubleAccumulatorSource.register(sc, {"name" -> doubleAccumulator})
*/
@Experimental
object DoubleAccumulatorSource {
def register(sc: SparkContext, accumulators: Map[String, DoubleAccumulator]): Unit = {
val source = new DoubleAccumulatorSource
source.register(accumulators)
sc.env.metricsSystem.registerSource(source)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.metrics.source

import com.codahale.metrics.MetricRegistry
import org.mockito.ArgumentCaptor
import org.mockito.Mockito.{mock, never, spy, times, verify, when}

import org.apache.spark.{SparkContext, SparkEnv, SparkFunSuite}
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{DoubleAccumulator, LongAccumulator}

class AccumulatorSourceSuite extends SparkFunSuite {
test("that that accumulators register against the metric system's register") {
val acc1 = new LongAccumulator()
val acc2 = new LongAccumulator()
val mockContext = mock(classOf[SparkContext])
val mockEnvironment = mock(classOf[SparkEnv])
val mockMetricSystem = mock(classOf[MetricsSystem])
when(mockEnvironment.metricsSystem) thenReturn (mockMetricSystem)
when(mockContext.env) thenReturn (mockEnvironment)
val accs = Map("my-accumulator-1" -> acc1,
"my-accumulator-2" -> acc2)
LongAccumulatorSource.register(mockContext, accs)
val captor = new ArgumentCaptor[AccumulatorSource]()
verify(mockMetricSystem, times(1)).registerSource(captor.capture())
val source = captor.getValue()
val gauges = source.metricRegistry.getGauges()
assert (gauges.size == 2)
assert (gauges.firstKey == "my-accumulator-1")
assert (gauges.lastKey == "my-accumulator-2")
}

test("the accumulators value property is checked when the gauge's value is requested") {
val acc1 = new LongAccumulator()
acc1.add(123)
val acc2 = new LongAccumulator()
acc2.add(456)
val mockContext = mock(classOf[SparkContext])
val mockEnvironment = mock(classOf[SparkEnv])
val mockMetricSystem = mock(classOf[MetricsSystem])
when(mockEnvironment.metricsSystem) thenReturn (mockMetricSystem)
when(mockContext.env) thenReturn (mockEnvironment)
val accs = Map("my-accumulator-1" -> acc1,
"my-accumulator-2" -> acc2)
LongAccumulatorSource.register(mockContext, accs)
val captor = new ArgumentCaptor[AccumulatorSource]()
verify(mockMetricSystem, times(1)).registerSource(captor.capture())
val source = captor.getValue()
val gauges = source.metricRegistry.getGauges()
assert(gauges.get("my-accumulator-1").getValue() == 123)
assert(gauges.get("my-accumulator-2").getValue() == 456)
}

test("the double accumulators value propety is checked when the gauge's value is requested") {
val acc1 = new DoubleAccumulator()
acc1.add(123.123)
val acc2 = new DoubleAccumulator()
acc2.add(456.456)
val mockContext = mock(classOf[SparkContext])
val mockEnvironment = mock(classOf[SparkEnv])
val mockMetricSystem = mock(classOf[MetricsSystem])
when(mockEnvironment.metricsSystem) thenReturn (mockMetricSystem)
when(mockContext.env) thenReturn (mockEnvironment)
val accs = Map(
"my-accumulator-1" -> acc1,
"my-accumulator-2" -> acc2)
DoubleAccumulatorSource.register(mockContext, accs)
val captor = new ArgumentCaptor[AccumulatorSource]()
verify(mockMetricSystem, times(1)).registerSource(captor.capture())
val source = captor.getValue()
val gauges = source.metricRegistry.getGauges()
assert(gauges.get("my-accumulator-1").getValue() == 123.123)
assert(gauges.get("my-accumulator-2").getValue() == 456.456)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// scalastyle:off println
package org.apache.spark.examples

import org.apache.spark.metrics.source.{DoubleAccumulatorSource, LongAccumulatorSource}
import org.apache.spark.sql.SparkSession

/**
* Usage: AccumulatorMetricsTest [numElem]
*
* This example shows how to register accumulators against the accumulator source.
* A simple RDD is created, and during the map, the accumulators are incremented.
*
* The only argument, numElem, sets the number elements in the collection to parallize.
*
* The result is output to stdout in the driver with the values of the accumulators.
* For the long accumulator, it should equal numElem the double accumulator should be
* roughly 1.1 x numElem (within double precision.) This example also sets up a
* ConsoleSink (metrics) instance, and so registered codahale metrics (like the
* accumulator source) are reported to stdout as well.
*/
object AccumulatorMetricsTest {
def main(args: Array[String]) {

val spark = SparkSession
.builder()
.config("spark.metrics.conf.*.sink.console.class",
"org.apache.spark.metrics.sink.ConsoleSink")
.getOrCreate()

val sc = spark.sparkContext

val acc = sc.longAccumulator("my-long-metric")
// register the accumulator, the metric system will report as
// [spark.metrics.namespace].[execId|driver].AccumulatorSource.my-long-metric
LongAccumulatorSource.register(sc, List(("my-long-metric" -> acc)).toMap)

val acc2 = sc.doubleAccumulator("my-double-metric")
// register the accumulator, the metric system will report as
// [spark.metrics.namespace].[execId|driver].AccumulatorSource.my-double-metric
DoubleAccumulatorSource.register(sc, List(("my-double-metric" -> acc2)).toMap)

val num = if (args.length > 0) args(0).toInt else 1000000

val startTime = System.nanoTime

val accumulatorTest = sc.parallelize(1 to num).foreach(_ => {
acc.add(1)
acc2.add(1.1)
})

// Print a footer with test time and accumulator values
println("Test took %.0f milliseconds".format((System.nanoTime - startTime) / 1E6))
println("Accumulator values:")
println("*** Long accumulator (my-long-metric): " + acc.value)
println("*** Double accumulator (my-double-metric): " + acc2.value)

spark.stop()
}
}
// scalastyle:on println

0 comments on commit bbea25d

Please sign in to comment.