Skip to content
This repository has been archived by the owner on May 17, 2022. It is now read-only.

Commit

Permalink
Fixed byte/str issue #9 in serverextension
Browse files Browse the repository at this point in the history
Removed stray print statements in scala SparkListener.
  • Loading branch information
krishnan-r committed May 29, 2018
1 parent 7025e23 commit 5707371
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 25 deletions.
42 changes: 21 additions & 21 deletions extension/scalalistener/CustomListener.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,34 +30,34 @@ import java.io._
*/
class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {

println("SPARKLISTENER: Started SparkListener for Jupyter Notebook")
println("SPARKMONITOR_LISTENER: Started SparkListener for Jupyter Notebook")
val port = scala.util.Properties.envOrElse("SPARKMONITOR_KERNEL_PORT", "ERRORNOTFOUND")
println("SPARKLISTENER: Port obtained from environment: " + port)
println("SPARKMONITOR_LISTENER: Port obtained from environment: " + port)
var socket: Socket = null
var out: OutputStreamWriter = null
// Open the socket to the kernel. The kernel is the server already waiting for connections.
try {
socket = new Socket("localhost", port.toInt)
out = new OutputStreamWriter(socket.getOutputStream())
} catch {
case exception: Throwable => println("\nSPARKLISTENER: Exception creating socket:" + exception + "\n")
case exception: Throwable => println("\nSPARKMONITOR_LISTENER: Exception creating socket:" + exception + "\n")
}

/** Send a string message to the kernel using the open socket.*/
def send(msg: String): Unit = {
try {
//println("\nSPARKLISTENER: --------------Sending Message:------------------\n"+msg+
// "\nSPARKLISTENER: -------------------------------------------------\n") // Uncomment to see all events
//println("\nSPARKMONITOR_LISTENER: --------------Sending Message:------------------\n"+msg+
// "\nSPARKMONITOR_LISTENER: -------------------------------------------------\n") // Uncomment to see all events
out.write(msg + ";EOD:")
out.flush()
} catch {
case exception: Throwable => println("\nSPARKLISTENER: Exception sending socket message:" + exception + "\n")
case exception: Throwable => println("\nSPARKMONITOR_LISTENER: Exception sending socket message:" + exception + "\n")
}
}

/** Close the socket connection to the kernel.*/
def closeConnection(): Unit = {
println("SPARKLISTNER: Closing Connection")
println("SPARKMONITOR_LISTENER: Closing Connection")
out.close()
socket.close()
}
Expand Down Expand Up @@ -112,7 +112,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
override def onApplicationStart(appStarted: SparkListenerApplicationStart): Unit = {
startTime = appStarted.time
appId = appStarted.appId.getOrElse("null")
println("SPARKLISTENER Application Started: " + appId + " ...Start Time: " + appStarted.time)
println("SPARKMONITOR_LISTENER: Application Started: " + appId + " ...Start Time: " + appStarted.time)
val json = ("msgtype" -> "sparkApplicationStart") ~
("startTime" -> startTime) ~
("appId" -> appId) ~
Expand All @@ -129,7 +129,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
* Closes the socket connection to the kernel.
*/
override def onApplicationEnd(appEnded: SparkListenerApplicationEnd): Unit = {
println("SPARKLISTENER Application ending...End Time: " + appEnded.time)
println("SPARKMONITOR_LISTENER: Application ending...End Time: " + appEnded.time)
endTime = appEnded.time
val json = ("msgtype" -> "sparkApplicationEnd") ~
("endTime" -> endTime)
Expand Down Expand Up @@ -195,7 +195,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData)
}
val name = jobStart.properties.getProperty("callSite.short", "null")
println("Num Executors" + numExecutors.toInt)
// println("Num Executors" + numExecutors.toInt)
val json = ("msgtype" -> "sparkJobStart") ~
("jobGroup" -> jobGroup.getOrElse("null")) ~
("jobId" -> jobStart.jobId) ~
Expand All @@ -208,14 +208,14 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
("appId" -> appId) ~
("numExecutors" -> numExecutors) ~
("name" -> name)
println("SPARKLISTENER JobStart: \n" + pretty(render(json)) + "\n")
// println("SPARKMONITOR_LISTENER: JobStart: \n" + pretty(render(json)) + "\n")
send(pretty(render(json)))
}

/** Called when a job ends. */
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {
println("SPARKLISTENER:Job completed for unknown job: " + jobEnd.jobId)
println("SPARKMONITOR_LISTENER: Job completed for unknown job: " + jobEnd.jobId)
new JobUIData(jobId = jobEnd.jobId)
}
jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)
Expand Down Expand Up @@ -266,7 +266,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
val stage = stageCompleted.stageInfo
stageIdToInfo(stage.stageId) = stage
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {
println("SPARKLISTENER: Stage completed for unknown stage " + stage.stageId)
println("SPARKMONITOR_LISTENER: Stage completed for unknown stage " + stage.stageId)
new StageUIData
})
var status = "UNKNOWN"
Expand Down Expand Up @@ -306,7 +306,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
("numTasks" -> stage.numTasks) ~
("status" -> status)

println("SPARKLISTENER Stage Completed: \n" + pretty(render(json)) + "\n")
// println("SPARKMONITOR_LISTENER: Stage Completed: \n" + pretty(render(json)) + "\n")
send(pretty(render(json)))
}

Expand Down Expand Up @@ -342,7 +342,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
("parentIds" -> stage.parentIds) ~
("submissionTime" -> submissionTime) ~
("jobIds" -> jobIds)
println("SPARKLISTENER Stage Submitted: \n" + pretty(render(json)) + "\n")
// println("SPARKMONITOR_LISTENER Stage Submitted: \n" + pretty(render(json)) + "\n")
send(pretty(render(json)))
}

Expand All @@ -351,7 +351,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
println("SPARKLISTENER: Task start for unknown stage " + taskStart.stageId)
println("SPARKMONITOR_LISTENER: Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
stageData.numActiveTasks += 1
Expand Down Expand Up @@ -387,7 +387,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
("status" -> taskInfo.status) ~
("speculative" -> taskInfo.speculative)

//println("SPARKLISTENER Task Started: \n"+ pretty(render(json)) + "\n")
//println("SPARKMONITOR_LISTENER: Task Started: \n"+ pretty(render(json)) + "\n")
send(pretty(render(json)))
}

Expand All @@ -400,7 +400,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
var errorMessage: Option[String] = None
if (info != null && taskEnd.stageAttemptId != -1) {
val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), {
println("SPARKLISTENER: Task end for unknown stage " + taskEnd.stageId)
println("SPARKMONITOR_LISTENER: Task end for unknown stage " + taskEnd.stageId)
new StageUIData
})
stageData.numActiveTasks -= 1
Expand Down Expand Up @@ -517,7 +517,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
("errorMessage" -> errorMessage) ~
("metrics" -> jsonMetrics)

println("SPARKLISTENER Task Ended: \n" + pretty(render(json)) + "\n")
// println("SPARKMONITOR_LISTENER: Task Ended: \n" + pretty(render(json)) + "\n")
send(pretty(render(json)))
}

Expand Down Expand Up @@ -573,7 +573,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
("numCores" -> executorAdded.executorInfo.totalCores) ~
("totalCores" -> totalCores) // Sending this as browser data can be lost during reloads

println("SPARKLISTENER Executor Added: \n" + pretty(render(json)) + "\n")
// println("SPARKMONITOR_LISTENER: Executor Added: \n" + pretty(render(json)) + "\n")
send(pretty(render(json)))
}

Expand All @@ -586,7 +586,7 @@ class JupyterSparkMonitorListener(conf: SparkConf) extends SparkListener {
("time" -> executorRemoved.time) ~
("totalCores" -> totalCores) // Sending this as browser data can be lost during reloads

println("SPARKLISTENER Executor Removed: \n" + pretty(render(json)) + "\n")
// println("SPARKMONITOR_LISTENER: Executor Removed: \n" + pretty(render(json)) + "\n")
send(pretty(render(json)))
}
}
Expand Down
7 changes: 3 additions & 4 deletions extension/sparkmonitor/serverextension.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ def get(self):
self.request.uri.index(proxy_root) + len(proxy_root) + 1):]
self.replace_path = self.request.uri[:self.request.uri.index(
proxy_root) + len(proxy_root)]
print("SPARKMONITOR_SERVER: Request_path " +
request_path + " \n Replace_path:" + self.replace_path)
# print("SPARKMONITOR_SERVER: Request_path " + request_path + " \n Replace_path:" + self.replace_path)
backendurl = url_path_join(url, request_path)
self.debug_url = url
self.backendurl = backendurl
Expand All @@ -64,8 +63,8 @@ def handle_response(self, response):
if "text/html" in content_type:
content = replace(response.body, self.replace_path)
elif "javascript" in content_type:
content = response.body.replace(
"location.origin", "location.origin +'" + self.replace_path + "' ")
body="location.origin +'" + self.replace_path + "' "
content = response.body.replace(b"location.origin",body.encode())
else:
# Probably binary response, send it directly.
content = response.body
Expand Down

0 comments on commit 5707371

Please sign in to comment.