Skip to content

Commit

Permalink
Prepare version 0.18
Browse files Browse the repository at this point in the history
  • Loading branch information
LucaCanali committed Mar 18, 2022
1 parent 4a10dd4 commit d79938c
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 39 deletions.
30 changes: 16 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@

### SparkMeasure is a tool for performance troubleshooting of Apache Spark jobs
SparkMeasure simplifies the collection and analysis of Spark performance metrics.
Use sparkMeasure for troubleshooting **interactive and batch** Spark workloads.
Use it also to collect metrics for long-term retention or as part of a **CI/CD** pipeline.
Use sparkMeasure for
- troubleshooting **interactive** and **batch** Spark workloads.
- to collect metrics for long-term retention, or as part of a **CI/CD** pipeline.
- to sink to external systems like InfluxDB or Apache Kafka
SparkMeasure is also intended as a working example of how to use Spark Listeners for collecting Spark task metrics data.
* Main author and contact:
* Luca.Canali@cern.ch + credits to Viktor.Khristenko@cern.ch + thanks to PR contributors
* For Spark 2.x and 3.x
* Tested on Spark 2.4, 3.0, 3.1
* Luca.Canali@cern.ch - credits to Viktor Khristenko and to PR contributors
* Use with Spark 2.x and 3.x
* Note for Spark 2.1 and 2.2 -> please use sparkMeasure version 0.16

### Getting started with sparkMeasure
* Note: sparkMeasure is available on [Maven Central](https://mvnrepository.com/artifact/ch.cern.sparkmeasure/spark-measure)
* Spark 3.x and 2.4 with scala 2.12:
- Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17`
- Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17`
* Spark 3.x and 2.4 with Scala 2.12:
- Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.18`
- Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.18`
- note: `pip install sparkmeasure` to get the Python wrapper API.
* Spark 2.x with Scala 2.11:
- Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.11:0.17`
- Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.11:0.17`
- Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.11:0.18`
- Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.11:0.18`
- note: `pip install sparkmeasure` to get the Python wrapper API.
* Bleeding edge: build sparkMeasure jar using sbt: `sbt +package` and use `--jars`
with the jar just built instead of using `--packages`.
Expand All @@ -39,15 +40,15 @@ SparkMeasure is also intended as a working example of how to use Spark Listeners
- CLI: spark-shell and PySpark
```
# Scala CLI, Spark 3.x
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.18
val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show())
```
```
# Python CLI, Spark 3.x
pip install sparkmeasure
bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17
bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.18
from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)
Expand Down Expand Up @@ -92,7 +93,7 @@ shuffleRecordsWritten => 8
- CLI: spark-shell, measure workload metrics aggregating from raw task metrics
```
# Scala CLI, Spark 3.x
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.17
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.18
val taskMetrics = ch.cern.sparkmeasure.TaskMetrics(spark)
taskMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show())
Expand All @@ -114,13 +115,14 @@ from `jupyter-notebook`. Works with Python and Scala kernels.
or you can opt to write metrics in near-realtime to an InfluxDB instance.
- **[Flight Recorder mode with file output](docs/Flight_recorder_mode.md)**
- **[Flight Recorder mode with DB write output](docs/Flight_recorder_DBwrite.md)**
- Support to sink metrics into Apache Kafka
* Additional info:
- Presentations at Spark Summit:
- [Performance Troubleshooting Using Apache Spark Metrics](https://databricks.com/session_eu19/performance-troubleshooting-using-apache-spark-metrics)
- [Apache Spark Performance Troubleshooting at Scale, Challenges, Tools, and Methodologies](https://spark-summit.org/eu-2017/events/apache-spark-performance-troubleshooting-at-scale-challenges-tools-and-methodologies/)
- Blogs:
- [2018: SparkMeasure, a tool for performance troubleshooting of Apache Spark workloads](https://db-blog.web.cern.ch/blog/luca-canali/2018-08-sparkmeasure-tool-performance-troubleshooting-apache-spark-workloads),
- [2017 blog post](http://db-blog.web.cern.ch/blog/luca-canali/2017-03-measuring-apache-spark-workload-metrics-performance-troubleshooting)
- [2017: SparkMeasure blog post](http://db-blog.web.cern.ch/blog/luca-canali/2017-03-measuring-apache-spark-workload-metrics-performance-troubleshooting)
- [TODO list and known issues](docs/TODO_and_issues.md)

### Architecture diagram
Expand Down
14 changes: 7 additions & 7 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,21 +1,21 @@
name := "spark-measure"

version := "0.18-SNAPSHOT"
version := "0.18"

scalaVersion := "2.12.10"
crossScalaVersions := Seq("2.11.12", "2.12.10")

licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0"))

isSnapshot := true
isSnapshot := false

libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.8"
libraryDependencies += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.13.0"
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.26"
libraryDependencies += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.13.2"
libraryDependencies += "org.slf4j" % "slf4j-api" % "1.7.36"
libraryDependencies += "org.influxdb" % "influxdb-java" % "2.14"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "2.0.1"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.7" % "test"
libraryDependencies += "com.github.tomakehurst" % "wiremock" % "2.23.2" % "test"
libraryDependencies += "org.apache.kafka" % "kafka-clients" % "3.0.1"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.9" % "test"
libraryDependencies += "com.github.tomakehurst" % "wiremock" % "2.27.2" % "test"

// publishing to Sonatype Nexus repository and Maven
publishMavenStyle := true
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version = 1.5.5
sbt.version = 1.6.2
2 changes: 1 addition & 1 deletion src/main/scala/ch/cern/sparkmeasure/kafkasink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.util.Try
* example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-stageinfo
*
* This code depends on "kafka clients", you may need to add the dependency:
* --packages org.apache.kafka:kafka-clients:2.0.1
* --packages org.apache.kafka:kafka-clients:3.0.1
*
* Output: each message contains the name, it is acknowledged as metrics name as well.
* Note: the amount of data generated is relatively small in most applications: O(number_of_stages)
Expand Down
29 changes: 13 additions & 16 deletions src/main/scala/ch/cern/sparkmeasure/stagemetrics.scala
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package ch.cern.sparkmeasure

//import java.lang.Long
import java.util.Properties
import java.util.LinkedHashMap
import java.util.Map

import collection.JavaConversions._

import org.apache.spark.scheduler._
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.collection.mutable.ListBuffer
import org.slf4j.LoggerFactory

Expand All @@ -26,8 +23,8 @@ import org.slf4j.LoggerFactory
*
*/

// contains the list of task metrics and other measurements of interest at the Stage level, as a case class
// Note, remoteBytesReadToDisk is not there for backward compatibility, as it has been introduced in Spark 2.3.0
// contains the list of task metrics and other measurements of interest at the Stage level,
// packaged into a case class
case class StageVals (jobId: Int, jobGroup:String, stageId: Int, name: String,
submissionTime: Long, completionTime: Long, stageDuration: Long, numTasks: Int,
executorRunTime: Long, executorCpuTime: Long,
Expand Down Expand Up @@ -92,7 +89,7 @@ class StageInfoRecorderListener extends SparkListener {
)
stageMetricsData += currentStage

/** Collect data from accumulators, with additional care to keep only numerical values */
// Collect data from accumulators, with additional care to keep only numerical values
stageInfo.accumulables.foreach(acc => try {
val value = acc._2.value.getOrElse(0L).asInstanceOf[Long]
val name = acc._2.name.getOrElse("")
Expand All @@ -112,11 +109,11 @@ case class StageMetrics(sparkSession: SparkSession) {

lazy val logger = LoggerFactory.getLogger(this.getClass.getName)

/** This inserts the custom Spark Listener into the live Spark Context */
// This inserts the custom Spark Listener into the live Spark Context
val listenerStage = new StageInfoRecorderListener
sparkSession.sparkContext.addSparkListener(listenerStage)

/** Variables used to store the start and end time of the period of interest for the metrics report */
// Variables used to store the start and end time of the period of interest for the metrics report
var beginSnapshot: Long = 0L
var endSnapshot: Long = 0L

Expand All @@ -131,7 +128,7 @@ case class StageMetrics(sparkSession: SparkSession) {
endSnapshot
}

/** Move data recorded from the custom listener into a DataFrame and register it as a view for easier processing */
// Move data recorded from the custom listener into a DataFrame and register it as a view for easier processing
def createStageMetricsDF(nameTempView: String = "PerfStageMetrics"): DataFrame = {
import sparkSession.implicits._
val resultDF = listenerStage.stageMetricsData.toDF
Expand Down Expand Up @@ -193,7 +190,7 @@ case class StageMetrics(sparkSession: SparkSession) {
resultMap
}

/** Custom aggregations and post-processing of metrics data */
// Custom aggregations and post-processing of metrics data
def report(): String = {
val nameTempView = "PerfStageMetrics"
createStageMetricsDF(nameTempView)
Expand Down Expand Up @@ -225,7 +222,7 @@ case class StageMetrics(sparkSession: SparkSession) {
println(report())
}

/** for internal metrics sum all the values, for the accumulables compute max value for each accId and name */
// For internal metrics sum all the values, for the accumulables compute max value for each accId and name
def reportAccumulables(): String = {
import sparkSession.implicits._

Expand Down Expand Up @@ -316,19 +313,19 @@ case class StageMetrics(sparkSession: SparkSession) {

/** Shortcut to run and measure the metrics for Spark execution, built after spark.time() */
def runAndMeasure[T](f: => T): T = {
this.begin()
begin()
val startTime = System.nanoTime()
val ret = f
val endTime = System.nanoTime()
this.end()
end()
println(s"Time taken: ${(endTime - startTime) / 1000000} ms")
printReport()
ret
}

/** Helper method to save data, we expect to have small amounts of data so collapsing to 1 partition seems OK */
// Helper method to save data, we expect to have small amounts of data so collapsing to 1 partition seems OK
def saveData(df: DataFrame, fileName: String, fileFormat: String = "json", saveMode: String = "default") = {
df.repartition(1).write.format(fileFormat).mode(saveMode).save(fileName)
df.coalesce(1).write.format(fileFormat).mode(saveMode).save(fileName)
logger.warn(s"Stage metric data saved into $fileName using format=$fileFormat")
}

Expand Down

0 comments on commit d79938c

Please sign in to comment.