Skip to content

Commit

Permalink
First cut of SparkRRunner
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaram committed Feb 6, 2015
1 parent 08ff30b commit 7ca6512
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 5 deletions.
15 changes: 10 additions & 5 deletions pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,16 @@ sparkR.init <- function(
if (yarn_conf_dir != "") {
cp <- paste(cp, yarn_conf_dir, sep = ":")
}
launchBackend(classPath = cp,
mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
args = as.character(sparkRBackendPort),
javaOpts = paste("-Xmx", sparkMem, sep = ""))
Sys.sleep(2) # Wait for backend to come up
sparkRExistingPort <- Sys.getenv("SPARKR_BACKEND_PORT", "")
if (sparkRExistingPort != "") {
sparkRBackendPort <- sparkRExistingPort
} else {
launchBackend(classPath = cp,
mainClass = "edu.berkeley.cs.amplab.sparkr.SparkRBackend",
args = as.character(sparkRBackendPort),
javaOpts = paste("-Xmx", sparkMem, sep = ""))
Sys.sleep(2) # Wait for backend to come up
}
.sparkREnv$sparkRBackendPort <- sparkRBackendPort
connectBackend("localhost", sparkRBackendPort) # Connect to it

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package edu.berkeley.cs.amplab.sparkr

import java.io._
import java.net.URI

import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._

/**
* Main class used to launch SparkR applications using spark-submit. It executes R as a
* subprocess and then has it connect back to the JVM to access system properties etc.
*/
object SparkRRunner {
def main(args: Array[String]) {
val rFile = args(0)

val otherArgs = args.slice(1, args.length)
// TODO: Can we get this from SparkConf ?
val sparkRBackendPort = sys.env.getOrElse("SPARKR_BACKEND_PORT", "12345").toInt
val rCommand = "Rscript"

// val formattedPythonFile = formatPath(pythonFile)
// TODO: Normalize path ?
val rFileNormalized = rFile

// Launch a SparkR backend server for the R process to connect to; this will let it see our
// Java system properties etc.
val sparkRBackend = new SparkRBackend()
val sparkRBackendThread = new Thread() {
override def run() {
sparkRBackend.init(sparkRBackendPort)
sparkRBackend.run()
}

def stopBackend() {
sparkRBackend.close()
}
}

sparkRBackendThread.start()

// Launch R
val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs)
val env = builder.environment()
env.put("SPARKR_BACKEND_PORT", "" + sparkRBackendPort)
builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize
val process = builder.start()

new RedirectThread(process.getInputStream, System.out, "redirect output").start()

val returnCode = process.waitFor()
sparkRBackendThread.stopBackend()
System.exit(returnCode)
}

private class RedirectThread(
in: InputStream,
out: OutputStream,
name: String,
propagateEof: Boolean = false)
extends Thread(name) {

setDaemon(true)
override def run() {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
try {
val buf = new Array[Byte](1024)
var len = in.read(buf)
while (len != -1) {
out.write(buf, 0, len)
out.flush()
len = in.read(buf)
}
} finally {
if (propagateEof) {
out.close()
}
}
}
}
}

0 comments on commit 7ca6512

Please sign in to comment.