Skip to content

Commit

Permalink
Prepare SparkMeasure 0.24
Browse files Browse the repository at this point in the history
  • Loading branch information
LucaCanali committed Mar 11, 2024
1 parent fa58795 commit faf31ee
Show file tree
Hide file tree
Showing 17 changed files with 102 additions and 79 deletions.
62 changes: 36 additions & 26 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ Choose the sparkMeasure version for your environment:

Examples:
* Spark with Scala 2.12:
- **Scala:** `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23`
- **Python:** `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23`
- **Scala:** `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24`
- **Python:** `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24`
- note: you also need `pip install sparkmeasure` to get the [Python wrapper API](https://pypi.org/project/sparkmeasure/)

* Spark with Scala 2.13:
- Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.13:0.23`
- Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.13:0.23`
- Scala: `bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.13:0.24`
- Python: `bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.13:0.24`
- note: `pip install sparkmeasure` to get the Python wrapper API

* Spark 2.4 and 2.3 with Scala 2.11:
Expand All @@ -69,15 +69,15 @@ Examples:

* Where to get sparkMeasure:
* [sparkMeasure on Maven Central](https://mvnrepository.com/artifact/ch.cern.sparkmeasure/spark-measure)
* Jars in sparkMeasure's [release notes](https://github.com/LucaCanali/sparkMeasure/releases/tag/v0.23)
* Jars in sparkMeasure's [release notes](https://github.com/LucaCanali/sparkMeasure/releases/tag/v0.24)
* Bleeding edge jars as artifacts in [GitHub actions](https://github.com/LucaCanali/sparkMeasure/actions)
* Build jars from master using sbt: `sbt +package`

* Some practical examples of how to set the configuration to use sparkMeasure with Spark
* `--packages ch.cern.sparkmeasure:spark-measure_2.12:0.23`
* `--jars /path/to/spark-measure_2.12-0.23.jar`
* `--jars https://github.com/LucaCanali/sparkMeasure/releases/download/v0.23/spark-measure_2.12-0.23.jar`
* `--conf spark.driver.extraClassPath=/path/to/spark-measure_2.12-0.23.jar`
* `--packages ch.cern.sparkmeasure:spark-measure_2.12:0.24`
* `--jars /path/to/spark-measure_2.12-0.24.jar`
* `--jars https://github.com/LucaCanali/sparkMeasure/releases/download/v0.24/spark-measure_2.12-0.24.jar`
* `--conf spark.driver.extraClassPath=/path/to/spark-measure_2.12-0.24.jar`

---
### Examples of interactive use of sparkMeasure
Expand All @@ -94,39 +94,47 @@ Examples:
- Stage-level metrics from the command line:
```
# Scala CLI
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show())
```
```
# Python CLI
pip install sparkmeasure
bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
from sparkmeasure import StageMetrics
stagemetrics = StageMetrics(spark)
stagemetrics.runandmeasure(globals(), 'spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show()')
```
The output should look like this:
```
+----------+
| count(1)|
+----------+
|1000000000|
+----------+
Time taken: 3833 ms
Scheduling mode = FIFO
Spark Context default degree of parallelism = 8
Aggregated Spark stage metrics:
numStages => 3
numTasks => 17
elapsedTime => 1291 (1 s)
stageDuration => 1058 (1 s)
executorRunTime => 2774 (3 s)
executorCpuTime => 2004 (2 s)
executorDeserializeTime => 2868 (3 s)
executorDeserializeCpuTime => 1051 (1 s)
resultSerializationTime => 5 (5 ms)
jvmGCTime => 88 (88 ms)
elapsedTime => 1112 (1 s)
stageDuration => 864 (0.9 s)
executorRunTime => 3358 (3 s)
executorCpuTime => 2168 (2 s)
executorDeserializeTime => 892 (0.9 s)
executorDeserializeCpuTime => 251 (0.3 s)
resultSerializationTime => 72 (72 ms)
jvmGCTime => 0 (0 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 16 (16 ms)
resultSize => 16091 (15.0 KB)
shuffleWriteTime => 36 (36 ms)
resultSize => 16295 (15.9 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
Expand All @@ -145,10 +153,12 @@ shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 472 (472 Bytes)
shuffleRecordsWritten => 8
Average number of active tasks => 3.0
Stages and their duration:
Stage 0 duration => 593 (0.6 s)
Stage 1 duration => 416 (0.4 s)
Stage 3 duration => 49 (49 ms)
Stage 0 duration => 355 (0.4 s)
Stage 1 duration => 411 (0.4 s)
Stage 3 duration => 98 (98 ms)
```

- Stage metrics collection mode has an optional memory report command
Expand Down Expand Up @@ -176,15 +186,15 @@ Stage 3 OnHeapExecutionMemory maxVal bytes => 0 (0 Bytes)
- this is similar but slightly different from the example above as it collects metrics at the Task-level rather than Stage-level
```
# Scala CLI
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
val taskMetrics = ch.cern.sparkmeasure.TaskMetrics(spark)
taskMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show())
```
```
# Python CLI
pip install sparkmeasure
bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
from sparkmeasure import TaskMetrics
taskmetrics = TaskMetrics(spark)
Expand Down
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "spark-measure"

version := "0.24-SNAPSHOT"
version := "0.24"

scalaVersion := "2.12.18"
crossScalaVersions := Seq("2.12.18", "2.13.8")
Expand All @@ -9,7 +9,7 @@ licenses += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0"))

// publishing to Sonatype Nexus repository and Maven
publishMavenStyle := true
isSnapshot := true
isSnapshot := false

libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.5.1"
libraryDependencies += "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.16.1"
Expand Down
6 changes: 3 additions & 3 deletions docs/Flight_recorder_mode_FileSink.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ This example collected metrics with Task granularity.
(note: source the Hadoop environment before running this)
```
bin/spark-submit --master yarn --deploy-mode cluster \
--packages ch.cern.sparkmeasure:spark-measure_2.12:0.23 \
--packages ch.cern.sparkmeasure:spark-measure_2.12:0.24 \
--conf spark.extraListeners=ch.cern.sparkmeasure.FlightRecorderTaskMetrics \
--conf spark.sparkmeasure.outputFormat=json_to_hadoop \
--conf spark.sparkmeasure.outputFilename="hdfs://myclustername/user/luca/test/myoutput_$(date +%s).json" \
Expand All @@ -96,7 +96,7 @@ Example, use spark-3.3.0, Kubernetes, Scala 2.12 and write output to S3:
bin/spark-submit --master k8s://https://XXX.XXX.XXX.XXX --deploy-mode client --conf spark.executor.instances=3 \
--conf spark.executor.cores=2 --executor-memory 6g --driver-memory 8g \
--conf spark.kubernetes.container.image=<registry-URL>/spark:v3.0.0_20190529_hadoop32 \
--packages org.apache.hadoop:hadoop-aws:3.3.2,ch.cern.sparkmeasure:spark-measure_2.12:0.23 \
--packages org.apache.hadoop:hadoop-aws:3.3.2,ch.cern.sparkmeasure:spark-measure_2.12:0.24 \
--conf spark.hadoop.fs.s3a.secret.key="YYY..." \
--conf spark.hadoop.fs.s3a.access.key="ZZZ..." \
--conf spark.hadoop.fs.s3a.endpoint="https://s3.cern.ch" \
Expand All @@ -115,7 +115,7 @@ To post-process the saved metrics you will need to deserialize objects saved by
This is an example of how to do that using the supplied helper object sparkmeasure.Utils

```
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
val myMetrics = ch.cern.sparkmeasure.IOUtils.readSerializedStageMetricsJSON("/tmp/stageMetrics_flightRecorder")
// use ch.cern.sparkmeasure.IOUtils.readSerializedStageMetrics("/tmp/stageMetrics.serialized") for java serialization
Expand Down
6 changes: 4 additions & 2 deletions docs/Flight_recorder_mode_InfluxDBSink.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

Use sparkMeasure in flight recorder mode to instrument Spark applications without touching their code.
Flight recorder mode attaches a Spark Listener that collects the metrics while the application runs.
This describes how to sink Spark metrics to an InfluxDB instance.
This describes how to sink Spark metrics to an InfluxDB instance.
Note this is for InfluxDB version 1.x, for version 2.x some changes are needed.
You can use this also with VictoriaMetrics, ingesting the InfluxDB line protocol.

## InfluxDBSink and InfluxDBSinkExtended

Expand Down Expand Up @@ -85,7 +87,7 @@ bin/spark-shell \
--conf spark.sparkmeasure.influxdbURL="http://localhost:8086" \
--conf spark.extraListeners=ch.cern.sparkmeasure.InfluxDBSink,ch.cern.sparkmeasure.InfluxDBSinkExtended \
--conf spark.sparkmeasure.influxdbStagemetrics=true
--packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
--packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
// run a Spark job, this will produce metrics
spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show
Expand Down
2 changes: 1 addition & 1 deletion docs/Flight_recorder_mode_KafkaSink.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ bin/spark-shell \
--conf spark.extraListeners=ch.cern.sparkmeasure.KafkaSink \
--conf spark.sparkmeasure.kafkaBroker=my_kafka_server:9092 \
--conf spark.sparkmeasure.kafkaTopic=metrics
--packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
--packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
```

- Look at the metrics being written to Kafka:
Expand Down
8 changes: 4 additions & 4 deletions docs/Instrument_Python_code.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,10 @@ The details are discussed in the [examples for Python shell and notebook](docs/P

- This is how to run sparkMeasure using a packaged version in Maven Central
```
bin/spark-submit --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23 your_python_code.py
bin/spark-submit --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24 your_python_code.py
// alternative: just download and use the jar (it is only needed in the driver) as in:
bin/spark-submit --conf spark.driver.extraClassPath=<path>/spark-measure_2.12-0.23.jar ...
bin/spark-submit --conf spark.driver.extraClassPath=<path>/spark-measure_2.12-0.24.jar ...
```

### Download and build sparkMeasure (optional)
Expand All @@ -73,8 +73,8 @@ The details are discussed in the [examples for Python shell and notebook](docs/P
pip install .
# Run as in one of these examples:
bin/spark-submit --jars path>/spark-measure_2.12-0.24-SNAPSHOT.jar ...
bin/spark-submit --jars path>/spark-measure_2.12-0.25-SNAPSHOT.jar ...
# alternative, set classpath for the driver (sparkmeasure code runs only in the driver)
bin/spark-submit --conf spark.driver.extraClassPath=<path>/spark-measure_2.12-0.24-SNAPSHOT.jar ...
bin/spark-submit --conf spark.driver.extraClassPath=<path>/spark-measure_2.12-0.25-SNAPSHOT.jar ...
```
8 changes: 4 additions & 4 deletions docs/Instrument_Scala_code.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ See details at: [Prometheus Pushgateway](Prometheus.md)

- This is how to run sparkMeasure using a packaged version in Maven Central
```
bin/spark-submit --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
bin/spark-submit --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24

// or just download and use the jar (it is only needed in the driver) as in:
bin/spark-submit --conf spark.driver.extraClassPath=<path>/spark-measure_2.12-0.23.jar ...
bin/spark-submit --conf spark.driver.extraClassPath=<path>/spark-measure_2.12-0.24.jar ...
```
- The alternative, see paragraph above, is to build a jar from master (See below).

Expand All @@ -88,8 +88,8 @@ See details at: [Prometheus Pushgateway](Prometheus.md)
ls -l target/scala-2.12/spark-measure*.jar # location of the compiled jar
# Run as in one of these examples:
bin/spark-submit --jars path>/spark-measure_2.12-0.23-SNAPSHOT.jar
bin/spark-submit --jars path>/spark-measure_2.12-0.25-SNAPSHOT.jar
# alternative, set classpath for the driver (it is only needed in the driver)
bin/spark-submit --conf spark.driver.extraClassPath=<path>/spark-measure_2.12-0.23-SNAPSHOT.jar ...
bin/spark-submit --conf spark.driver.extraClassPath=<path>/spark-measure_2.12-0.25-SNAPSHOT.jar ...
```
4 changes: 2 additions & 2 deletions docs/Notes_on_metrics_analysis.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
## Notes on analysis of Spark performance metrics collecteed with sparkMeasure
## Notes on analysis of Spark performance metrics collected using sparkMeasure

One of the key features of sparkMeasure is that it makes data easily accessible for analysis.
This is achieved by exporting the collected data into Spark DataFrames where they can be queries with Spark APIs and/or SQL.
In addition the metrics can be used for plotting and other visualizations, for example using Jupyter notebooks.
In addition ,the metrics can be used for plotting and other visualizations, for example using Jupyter notebooks.

Example of analysis of Task Metrics using a Jupyter notebook at: [SparkTaskMetricsAnalysisExample.ipynb](../examples/SparkTaskMetricsAnalysisExample.ipynb)

Expand Down
4 changes: 2 additions & 2 deletions docs/Prometheus.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ The Prometheus Pushgateway can accept collected data and keep it for Prometheus

**Parameters:**

* serverIPnPort: String with prometheus pushgateway hostIP:Port,
* serverIPnPort: String with Prometheus Pushgateway hostIP:Port,
* metricsJob: job name,
* labelName: metrics label name,
* labelValue: metrics label value
Expand All @@ -35,7 +35,7 @@ https://prometheus.io/docs/instrumenting/exposition_formats/

1. Measure metrics at the Stage level (example in Scala):
```
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.19
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.begin()
Expand Down
2 changes: 1 addition & 1 deletion docs/Python_shell_and_Jupyter.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ See also [README](../README.md) for an introduction to sparkMeasure and its arch
- Use PyPi to install the Python wrapper and take the jar from Maven central:
```
pip install sparkmeasure
bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
bin/pyspark --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
```
- If you prefer to build from the latest development version:
```
Expand Down
46 changes: 28 additions & 18 deletions docs/Scala_shell_and_notebooks.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ See also [README](../README.md) for an introduction to sparkMeasure and its arch

- The alternative, see paragraph above, is to build a jar from master.
```
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
// or just download and use the jar (it is only needed in the driver) as in:
bin/spark-shell --conf spark.driver.extraClassPath=<path>/spark-measure_2.12-0.23.jar
bin/spark-shell --conf spark.driver.extraClassPath=<path>/spark-measure_2.12-0.24.jar
```

### Download and build sparkMeasure (optional)
Expand All @@ -24,17 +24,17 @@ See also [README](../README.md) for an introduction to sparkMeasure and its arch
ls -l target/scala-2.12/spark-measure*.jar # location of the compiled jar
# Run as in one of these examples:
bin/spark-shell --jars <path>/spark-measure_2.12-0.23-SNAPSHOT.jar
bin/spark-shell --jars <path>/spark-measure_2.12-0.24-SNAPSHOT.jar
# alternative, set classpath for the driver (the jar is only needed in the driver)
bin/spark-shell --conf spark.driver.extraClassPath=<path>/spark-measure_2.11-0.23-SNAPSHOT.jar
bin/spark-shell --conf spark.driver.extraClassPath=<path>/spark-measure_2.11-0.24-SNAPSHOT.jar
```
### Example: collect and print stage metrics with sparkMeasure
1. Measure metrics at the Stage level, a basic example:
```
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23
bin/spark-shell --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
val stageMetrics = ch.cern.sparkmeasure.StageMetrics(spark)
stageMetrics.runAndMeasure(spark.sql("select count(*) from range(1000) cross join range(1000) cross join range(1000)").show)
Expand All @@ -45,23 +45,31 @@ See also [README](../README.md) for an introduction to sparkMeasure and its arch
Example output:
```
+----------+
| count(1)|
+----------+
|1000000000|
+----------+

Time taken: 3833 ms

Scheduling mode = FIFO
Spark Context default degree of parallelism = 8

Aggregated Spark stage metrics:
numStages => 3
numTasks => 17
elapsedTime => 1218 (1 s)
stageDuration => 994 (1.0 s)
executorRunTime => 2625 (3 s)
executorCpuTime => 2224 (2 s)
executorDeserializeTime => 2945 (3 s)
executorDeserializeCpuTime => 1153 (1 s)
resultSerializationTime => 8 (8 ms)
jvmGCTime => 80 (80 ms)
elapsedTime => 1112 (1 s)
stageDuration => 864 (0.9 s)
executorRunTime => 3358 (3 s)
executorCpuTime => 2168 (2 s)
executorDeserializeTime => 892 (0.9 s)
executorDeserializeCpuTime => 251 (0.3 s)
resultSerializationTime => 72 (72 ms)
jvmGCTime => 0 (0 ms)
shuffleFetchWaitTime => 0 (0 ms)
shuffleWriteTime => 30 (30 ms)
resultSize => 16220 (15.0 KB)
shuffleWriteTime => 36 (36 ms)
resultSize => 16295 (15.9 KB)
diskBytesSpilled => 0 (0 Bytes)
memoryBytesSpilled => 0 (0 Bytes)
peakExecutionMemory => 0
Expand All @@ -80,10 +88,12 @@ shuffleRemoteBytesReadToDisk => 0 (0 Bytes)
shuffleBytesWritten => 472 (472 Bytes)
shuffleRecordsWritten => 8

Average number of active tasks => 3.0

Stages and their duration:
Stage 0 duration => 573 (0.6 s)
Stage 1 duration => 354 (0.4 s)
Stage 3 duration => 67 (67 ms)
Stage 0 duration => 355 (0.4 s)
Stage 1 duration => 411 (0.4 s)
Stage 3 duration => 98 (98 ms)
```
- New in sparkMeasure v01: memory metrics report:
Expand Down
2 changes: 1 addition & 1 deletion examples/testSparkMeasureScala/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,5 @@ How to run the example:
# build the example jar
sbt package
bin/spark-submit --master local[*] --packages ch.cern.sparkmeasure:spark-measure_2.12:0.23 --class ch.cern.testSparkMeasure.testSparkMeasure <path_to_the_example_jar>/testsparkmeasurescala_2.12-0.1.jar
bin/spark-submit --master local[*] --packages ch.cern.sparkmeasure:spark-measure_2.12:0.24 --class ch.cern.testSparkMeasure.testSparkMeasure <path_to_the_example_jar>/testsparkmeasurescala_2.12-0.1.jar
```
Loading

0 comments on commit faf31ee

Please sign in to comment.