Skip to content

Commit

Permalink
Merge pull request apache#24 from dashar/addLogging
Browse files Browse the repository at this point in the history
[YSPARK-1607] Add logging examples for driver and executor
  • Loading branch information
Dhruve Ashar authored and GitHub Enterprise committed May 6, 2020
2 parents 6f5bf96 + fdb93a8 commit 4519c3e
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 5 deletions.
7 changes: 7 additions & 0 deletions src/main/python/python_word_count.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
.appName("Python Word Count")\
.getOrCreate()

# Add log messages to spark using the same log4j logger.
# Note: This pattern works only for the driver.
jvmLogger = spark.sparkContext._jvm.org.apache.log4j
logger = jvmLogger.LogManager.getLogger('SparkStarter')
logger.info('Input : ' + sys.argv[1])
logger.info('Input : ' + sys.argv[2])

lines = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])
counts = lines.flatMap(lambda x: x.split(' ')) \
.map(lambda x: (x, 1)) \
Expand Down
29 changes: 24 additions & 5 deletions src/main/scala/com/yahoo/spark/starter/ScalaWordCount.scala
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.yahoo.spark.starter

import org.apache.spark._
import org.apache.spark.sql.SparkSession

// Simple example of Word Count in Scala
object ScalaWordCount {
Expand All @@ -11,21 +11,40 @@ object ScalaWordCount {
System.exit(1)
}

val conf = new SparkConf().setAppName("Scala Word Count")
val sc = new SparkContext(conf)
val spark = SparkSession
.builder
.appName("Scala Word Count")
.getOrCreate()

// get the input file uri
val inputFilesUri = args(0)

// get the output file uri
val outputFilesUri = args(1)

// Get a logger on the Driver/AppMaster
val logger = SparkStarterUtil.logger

logger.info("Input : " + inputFilesUri)
logger.info("Output : " + outputFilesUri)

val textFile = spark.sparkContext.textFile(inputFilesUri)

val textFile = sc.textFile(inputFilesUri)
// How to log at the executor. This logic only serves as an example.
textFile.foreachPartition(iterator => {
val logger = SparkStarterUtil.logger
logger.info("Partition Size: " + iterator.size)
})

val counts = textFile.flatMap(line => line.split(" "))
.map(word => (word, 1))
.reduceByKey(_ + _)
counts.saveAsTextFile(outputFilesUri)

sc.stop()
spark.stop()
}
}

object SparkStarterUtil {
lazy val logger = org.apache.log4j.Logger.getLogger("SparkStarter")
}

0 comments on commit 4519c3e

Please sign in to comment.