Skip to content

Commit

Permalink
Add support for Prometheus Gateway in Flight Recorder mode.
Browse files Browse the repository at this point in the history
  • Loading branch information
LucaCanali committed Mar 6, 2024
1 parent 5ee6fe9 commit 4a6f525
Show file tree
Hide file tree
Showing 6 changed files with 316 additions and 17 deletions.
25 changes: 13 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ and spark-shell/pyspark environments.
- **Batch Job Analysis:** With Flight Recorder mode sparkMeasure records and analyzes batch job metrics
for thorough inspection.
- **Monitoring Capabilities:** Seamlessly integrates with external systems like InfluxDB, Apache Kafka,
and Prometheus Gateway for extensive monitoring.
and Prometheus PushPushgateway for extensive monitoring.
- **Educational Tool:** Serves as a practical example of implementing Spark Listeners for the collection
of detailed Spark task metrics.
- **Language Compatibility:** Fully supports Scala, Java, and Python, making it versatile for a wide range
Expand Down Expand Up @@ -216,12 +216,13 @@ SparkMeasure is one tool for many different use cases, languages, and environmen
* **Flight Recorder mode**:
SparkMeasure in flight recorder will collect metrics transparently, without any need for you
to change your code.
* Metrics can be saved to a file, locally or to a Hadoop-compliant filesystem
* or you can write metrics in near-realtime to an InfluxDB instance or to Apache Kafka
* Metrics can be saved to a file, locally, or to a Hadoop-compliant filesystem
* or you can write metrics in near-realtime to the followingsinks: InfluxDB, Apache Kafka, Prometheus PushPushgateway
* More details:
- **[Flight Recorder mode with file sink](docs/Flight_recorder_mode_FileSink.md)**
- **[Flight Recorder mode with InfluxDB sink](docs/Flight_recorder_mode_InfluxDBSink.md)**
- **[Flight Recorder mode with Apache Kafka sink](docs/Flight_recorder_mode_KafkaSink.md)**
- **[Flight Recorder mode with Prometheus Pushgateway sink](docs/Flight_recorder_mode_PrometheusPushgatewaySink.md)**


* **Additional documentation and examples**:
Expand Down Expand Up @@ -249,10 +250,10 @@ SparkMeasure is one tool for many different use cases, languages, and environmen
* metrics collection and processing can be at the Stage-level or Task-level. The user chooses which mode to use with the API.
* metrics are can be buffered into memory for real-time reporting, or they can be dumped to an external
system in the "flight recorder mode".
* supported external systems are File Systems supported by the Hadoop API, InfluxDB, and Apache Kafka.
* supported external systems are File Systems supported by the Hadoop API, InfluxDB, Apache Kafka, Prometheus Pushgateway.
* Metrics are flattened and collected into local memory structures in the driver (ListBuffer of a custom case class).
* sparkMeasure in flight recorder mode with InfluxDB sink and Apache Kafka do not buffer,
but rather write the collected metrics directly
* sparkMeasure in flight recorder mode when using one between the InfluxDB sink, Apache Kafka sink, and Prometheus Pushgateway sink, does not buffer,
but rather writes the collected metrics directly
* Metrics processing:
* metrics can be aggregated into a report showing the cumulative values for each metric
* aggregated metrics can also be returned as a Scala Map or Python dictionary
Expand Down Expand Up @@ -288,7 +289,7 @@ SparkMeasure is one tool for many different use cases, languages, and environmen
in notebooks and in application code for Scala, Java, and Python.
- sparkMeasure collects data in a flat structure, which makes it natural to use Spark SQL for
workload data analysis/
- sparkMeasure can sink metrics data into external systems (Filesystem, InfluxDB, Apache Kafka)
- sparkMeasure can sink metrics data into external systems (Filesystem, InfluxDB, Apache Kafka, Prometheus Pushgateway)

- What are known limitations and gotchas?
- sparkMeasure does not collect all the data available in the EventLog
Expand All @@ -299,11 +300,11 @@ SparkMeasure is one tool for many different use cases, languages, and environmen
- Metrics are collected on the driver, which could become a bottleneck. This is an issues affecting tools
based on Spark ListenerBus instrumentation, such as the Spark WebUI.
In addition, note that sparkMeasure in the current version buffers all data in the driver memory.
The notable exception is when using the Flight recorder mode with InfluxDB and
Apache Kafka sink, in this case metrics are directly sent to InfluxDB/Kafka
The notable exception is when using the Flight recorder mode with InfluxDB or
Apache Kafka or Prometheus Pushgateway sink, in this case metrics are directly sent to InfluxDB/Kafka/Prometheus Pushgateway.
- Task metrics values are collected by sparkMeasure only for successfully executed tasks. Note that
resources used by failed tasks are not collected in the current version. The notable exception is
with the Flight recorder mode with InfluxDB sink and with Apache Kafka.
with the Flight recorder mode with InfluxDB or Apache Kafka or Prometheus Pushgateway sink.
- sparkMeasure collects and processes data in order of stage and/or task completion. This means that
the metrics data is not available in real-time, but rather with a delay that depends on the workload
and the size of the data. Moreover, performance data of jobs executing at the same time can be mixed.
Expand All @@ -320,8 +321,8 @@ SparkMeasure is one tool for many different use cases, languages, and environmen
- How can I save/sink the collected metrics?
- You can print metrics data and reports to standard output or save them to files, using
a locally mounted filesystem or a Hadoop compliant filesystem (including HDFS).
Additionally, you can sink metrics to external systems (such as Prometheus).
The Flight Recorder mode can sink to InfluxDB and Apache Kafka.
Additionally, you can sink metrics to external systems (such as Prometheus Pushgateway).
The Flight Recorder mode can sink to InfluxDB, Apache Kafka or Prometheus Pushgateway.

- How can I process metrics data?
- You can use Spark to read the saved metrics data and perform further post-processing and analysis.
Expand Down
60 changes: 60 additions & 0 deletions docs/Flight_recorder_mode_PrometheusPushgatewaySink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# SparkMeasure Flight Recorder mode - Prometheus Pushgateway Sink

Use sparkMeasure in flight recorder mode to instrument Spark applications without touching their code.
Flight recorder mode attaches a Spark Listener that collects the metrics while the application runs.
This describes how to sink Spark metrics to a Prometheus Gateway.

## PushGatewaySink

**PushGatewaySink** is a class that extends the SparkListener infrastructure.
It collects and writes Spark metrics and application info in near real-time to a Prometheus Gateway instance.
provided by the user. Use this mode to monitor Spark execution workload.
Notes, the amount of data generated is relatively small in most applications: O(number_of_stages)

How to use: attach the PrometheusGatewaySink to a Spark Context using the listener infrastructure. Example:
- `--conf spark.extraListeners=ch.cern.sparkmeasure.PushGatewaySink`

Configuration for the is handled with Spark configuration parameters.
Note: you can add configuration using --config option when using spark-submit
use the .config method when allocating the Spark Session in Scala/Python/Java).
Configurations:
```
Option 1 (recommended) Start the listener for PushGatewaySink:
--conf spark.extraListeners=ch.cern.sparkmeasure.PushGatewaySink
Configuration - PushGatewaySink parameters:
--conf spark.sparkmeasure.pushgateway=SERVER:PORT
Example: --conf spark.sparkmeasure.pushgateway=localhost:9091
--conf spark.sparkmeasure.pushgateway.jobname=JOBNAME // defaut value is pushgateway
Example: --conf spark.sparkmeasure.pushgateway.jobname=myjob1
```

## Use case

- The use case for this sink it to extend Spark monitoring, by writing execution metrics into Prometheus via the Pushgateway,
as Prometheus has a pull-based architecture. You'll need to configure Prometheus to pull metrics from the Pushgateway.
You'll also need to set up a performance dashboard from the metrics collected by Prometheus.


## Example of how to use Prometheus PushGatewaySink

- Start the Prometheus Pushgateway
- Download and start the Pushgateway, from the [Prometheus download page](https://prometheus.io/download/)

- Start Spark with sparkMeasure and attach the PushGatewaySink listener
-Note: make sure there is no firewall blocking connectivity between the driver and
the Pushgateway
```
Examples:
bin/spark-shell \
--conf spark.extraListeners=ch.cern.sparkmeasure.PushGatewaySink \
--conf spark.sparkmeasure.pushgateway=localhost:9091 \
--packages ch.cern.sparkmeasure:spark-measure_2.12:0.24
```

- Look at the metrics being written to the Pushgateway
- Use the Web UI to look at the metrics being written to the Pushgateway
- Open a web browser and go to the WebUI, for example: http://localhost:9091/metrics
- You should see the metrics being written to the Pushgateway as jobs are run in Spark
```
24 changes: 21 additions & 3 deletions docs/Reference_SparkMeasure_API_and_Configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,7 @@ spark.sparkmeasure.influxdbEnableBatch, boolean, default true
This code depends on "influxdb.java", you may need to add the dependency explicitly:
--packages org.influxdb:influxdb-java:2.14
Note currently we need to use version 2.14 as newer versions generate jar conflicts (tested up to Spark 3.3.0)
Note currently we need to use version 2.14 as newer versions generate jar conflicts
```

## KafkaSink and KafkaSinkExtended
Expand Down Expand Up @@ -428,8 +428,25 @@ Configuration - KafkaSink parameters:
Example: --conf spark.sparkmeasure.kafkaTopic=sparkmeasure-stageinfo
Note: the topic will be created if it does not yet exist
This code depends on "kafka-clients", you may need to add the dependency explicitly:
--packages org.apache.kafka:kafka-clients:3.2.1
This code depends on "kafka-clients", you may need to add the dependency explicitly, example:
--packages org.apache.kafka:kafka-clients:3.7.0
```

## PushGatewaySink
```
class PushGatewaySink(conf: SparkConf) extends SparkListener
**PushGatewaySink** is a class that extends the SparkListener infrastructure.
It collects and writes Spark metrics and application info in near real-time to a Prometheus Pushgateway instance
provided by the user. Use this mode to monitor Spark execution workload.
Notes, the amount of data generated is relatively small in most applications: O(number_of_stages)
* How to use: attach the PrometheusGatewaySink to a Spark Context using the listener infrastructure. Example:
* --conf spark.extraListeners=ch.cern.sparkmeasure.PushGatewaySink
Configuration - PushGatewaySink parameters
--conf spark.sparkmeasure.pushgateway=SERVER:PORT
Example: --conf spark.sparkmeasure.pushgateway=localhost:9091
--conf spark.sparkmeasure.pushgateway.jobname=JOBNAME // default is pushgateway
```

## IOUtils
Expand Down Expand Up @@ -480,4 +497,5 @@ def parseInfluxDBCredentials(conf: SparkConf, logger: Logger) : (String,String)
def parseInfluxDBName(conf: SparkConf, logger: Logger) : String
def parseInfluxDBStagemetrics(conf: SparkConf, logger: Logger) : Boolean
def parseKafkaConfig(conf: SparkConf, logger: Logger) : (String,String)
def parsePushGatewayConfig(conf: SparkConf, logger: Logger): (String, String)
```
2 changes: 1 addition & 1 deletion src/main/scala/ch/cern/sparkmeasure/PushGateway.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ case class PushGateway(serverIPnPort: String, metricsJob: String) {
val responseCode = connection.getResponseCode()
val responseMessage = connection.getResponseMessage()
connection.disconnect();
if (responseCode != 202) // 202 Accepted, 400 Bad Request
if (responseCode != 200 && responseCode != 202) // 200 and 202 Accepted, 400 Bad Request
logger.error(s"Data sent error, url: '$urlFull', response: $responseCode '$responseMessage'")
} catch {
case ste: java.net.SocketTimeoutException =>
Expand Down
Loading

0 comments on commit 4a6f525

Please sign in to comment.