Skip to content

Commit

Permalink
Merge pull request #109 from markhamstra/csd-1.5
Browse files Browse the repository at this point in the history
Merged Apache bug fixes
  • Loading branch information
markhamstra committed Oct 27, 2015
2 parents dc5064d + 432ae9e commit 7c9155e
Show file tree
Hide file tree
Showing 36 changed files with 288 additions and 87 deletions.
27 changes: 13 additions & 14 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -1510,18 +1510,17 @@ setMethod("except",
#' spark.sql.sources.default will be used.
#'
#' Additionally, mode is used to specify the behavior of the save operation when
#' data already exists in the data source. There are four modes:
#' append: Contents of this DataFrame are expected to be appended to existing data.
#' overwrite: Existing data is expected to be overwritten by the contents of
# this DataFrame.
#' error: An exception is expected to be thrown.
#' data already exists in the data source. There are four modes: \cr
#' append: Contents of this DataFrame are expected to be appended to existing data. \cr
#' overwrite: Existing data is expected to be overwritten by the contents of this DataFrame. \cr
#' error: An exception is expected to be thrown. \cr
#' ignore: The save operation is expected to not save the contents of the DataFrame
# and to not change the existing data.
#' and to not change the existing data. \cr
#'
#' @param df A SparkSQL DataFrame
#' @param path A name for the table
#' @param source A name for external data source
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode
#'
#' @rdname write.df
#' @name write.df
Expand All @@ -1534,6 +1533,7 @@ setMethod("except",
#' path <- "path/to/file.json"
#' df <- jsonFile(sqlContext, path)
#' write.df(df, "myfile", "parquet", "overwrite")
#' saveDF(df, parquetPath2, "parquet", mode = saveMode, mergeSchema = mergeSchema)
#' }
setMethod("write.df",
signature(df = "DataFrame", path = "character"),
Expand Down Expand Up @@ -1575,18 +1575,17 @@ setMethod("saveDF",
#' spark.sql.sources.default will be used.
#'
#' Additionally, mode is used to specify the behavior of the save operation when
#' data already exists in the data source. There are four modes:
#' append: Contents of this DataFrame are expected to be appended to existing data.
#' overwrite: Existing data is expected to be overwritten by the contents of
# this DataFrame.
#' error: An exception is expected to be thrown.
#' data already exists in the data source. There are four modes: \cr
#' append: Contents of this DataFrame are expected to be appended to existing data. \cr
#' overwrite: Existing data is expected to be overwritten by the contents of this DataFrame. \cr
#' error: An exception is expected to be thrown. \cr
#' ignore: The save operation is expected to not save the contents of the DataFrame
# and to not change the existing data.
#' and to not change the existing data. \cr
#'
#' @param df A SparkSQL DataFrame
#' @param tableName A name for the table
#' @param source A name for external data source
#' @param mode One of 'append', 'overwrite', 'error', 'ignore'
#' @param mode One of 'append', 'overwrite', 'error', 'ignore' save mode
#'
#' @rdname saveAsTable
#' @name saveAsTable
Expand Down
16 changes: 11 additions & 5 deletions R/pkg/R/SQLContext.R
Original file line number Diff line number Diff line change
Expand Up @@ -444,14 +444,21 @@ dropTempTable <- function(sqlContext, tableName) {
#'
#' @param sqlContext SQLContext to use
#' @param path The path of files to load
#' @param source the name of external data source
#' @param source The name of external data source
#' @param schema The data schema defined in structType
#' @return DataFrame
#' @rdname read.df
#' @name read.df
#' @export
#' @examples
#'\dontrun{
#' sc <- sparkR.init()
#' sqlContext <- sparkRSQL.init(sc)
#' df <- read.df(sqlContext, "path/to/file.json", source = "json")
#' df1 <- read.df(sqlContext, "path/to/file.json", source = "json")
#' schema <- structType(structField("name", "string"),
#' structField("info", "map<string,double>"))
#' df2 <- read.df(sqlContext, mapTypeJsonPath, "json", schema)
#' df3 <- loadDF(sqlContext, "data/test_table", "parquet", mergeSchema = "true")
#' }

read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
Expand All @@ -474,9 +481,8 @@ read.df <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...)
dataFrame(sdf)
}

#' @aliases loadDF
#' @export

#' @rdname read.df
#' @name loadDF
loadDF <- function(sqlContext, path = NULL, source = NULL, schema = NULL, ...) {
read.df(sqlContext, path, source, schema, ...)
}
Expand Down
8 changes: 8 additions & 0 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ sparkR.stop <- function() {
sc <- get(".sparkRjsc", envir = env)
callJMethod(sc, "stop")
rm(".sparkRjsc", envir = env)

if (exists(".sparkRSQLsc", envir = env)) {
rm(".sparkRSQLsc", envir = env)
}

if (exists(".sparkRHivesc", envir = env)) {
rm(".sparkRHivesc", envir = env)
}
}

if (exists(".backendLaunched", envir = env)) {
Expand Down
10 changes: 10 additions & 0 deletions R/pkg/inst/tests/test_context.R
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,16 @@ test_that("repeatedly starting and stopping SparkR", {
}
})

test_that("repeatedly starting and stopping SparkR SQL", {
for (i in 1:4) {
sc <- sparkR.init()
sqlContext <- sparkRSQL.init(sc)
df <- createDataFrame(sqlContext, data.frame(a = 1:20))
expect_equal(count(df), 20)
sparkR.stop()
}
})

test_that("rdd GC across sparkR.stop", {
sparkR.stop()
sc <- sparkR.init() # sc should get id 0
Expand Down
1 change: 1 addition & 0 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ else
ASSEMBLY_DIR="$SPARK_HOME/assembly/target/scala-$SPARK_SCALA_VERSION"
fi

GREP_OPTIONS=
num_jars="$(ls -1 "$ASSEMBLY_DIR" | grep "^spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" -a -z "$SPARK_ASSEMBLY_JAR" ]; then
echo "Failed to find Spark assembly in $ASSEMBLY_DIR." 1>&2
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ private[spark] object SparkConf extends Logging {
/**
* Return whether the given config should be passed to an executor on start-up.
*
* Certain akka and authentication configs are required of the executor when it connects to
* Certain akka and authentication configs are required from the executor when it connects to
* the scheduler, while the rest of the spark configs can be inherited from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
Expand Down
50 changes: 37 additions & 13 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus): SparkEnv = {
SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))
}

private[spark] def env: SparkEnv = _env
Expand Down Expand Up @@ -1750,6 +1750,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}
SparkEnv.set(null)
}
// Unset YARN mode system env variable, to allow switching between cluster types.
System.clearProperty("SPARK_YARN_MODE")
SparkContext.clearActiveContext()
logInfo("Successfully stopped SparkContext")
}
Expand Down Expand Up @@ -2572,25 +2574,29 @@ object SparkContext extends Logging {
res
}

/**
* The number of driver cores to use for execution in local mode, 0 otherwise.
*/
private[spark] def numDriverCores(master: String): Int = {
def convertToInt(threads: String): Int = {
if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt
}
master match {
case "local" => 1
case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)
case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _) => convertToInt(threads)
case _ => 0 // driver is not used for execution
}
}

/**
* Create a task scheduler based on a given master URL.
* Return a 2-tuple of the scheduler backend and the task scheduler.
*/
private def createTaskScheduler(
sc: SparkContext,
master: String): (SchedulerBackend, TaskScheduler) = {
// Regular expression used for local[N] and local[*] master formats
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """spark://(.*)""".r
// Regular expression for connection to Mesos cluster by mesos:// or zk:// url
val MESOS_REGEX = """(mesos|zk)://.*""".r
// Regular expression for connection to Simr cluster
val SIMR_REGEX = """simr://(.*)""".r
import SparkMasterRegex._

// When running locally, don't try to re-execute tasks on failure.
val MAX_LOCAL_TASK_FAILURES = 1
Expand Down Expand Up @@ -2731,6 +2737,24 @@ object SparkContext extends Logging {
}
}

/**
* A collection of regexes for extracting information from the master string.
*/
private object SparkMasterRegex {
// Regular expression used for local[N] and local[*] master formats
val LOCAL_N_REGEX = """local\[([0-9]+|\*)\]""".r
// Regular expression for local[N, maxRetries], used in tests with failing tasks
val LOCAL_N_FAILURES_REGEX = """local\[([0-9]+|\*)\s*,\s*([0-9]+)\]""".r
// Regular expression for simulating a Spark cluster of [N, cores, memory] locally
val LOCAL_CLUSTER_REGEX = """local-cluster\[\s*([0-9]+)\s*,\s*([0-9]+)\s*,\s*([0-9]+)\s*]""".r
// Regular expression for connecting to Spark deploy clusters
val SPARK_REGEX = """spark://(.*)""".r
// Regular expression for connection to Mesos cluster by mesos:// or zk:// url
val MESOS_REGEX = """(mesos|zk)://.*""".r
// Regular expression for connection to Simr cluster
val SIMR_REGEX = """simr://(.*)""".r
}

/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ object SparkEnv extends Logging {
conf: SparkConf,
isLocal: Boolean,
listenerBus: LiveListenerBus,
numCores: Int,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {
assert(conf.contains("spark.driver.host"), "spark.driver.host is not set on the driver!")
assert(conf.contains("spark.driver.port"), "spark.driver.port is not set on the driver!")
Expand All @@ -197,6 +198,7 @@ object SparkEnv extends Logging {
port,
isDriver = true,
isLocal = isLocal,
numUsableCores = numCores,
listenerBus = listenerBus,
mockOutputCommitCoordinator = mockOutputCommitCoordinator
)
Expand Down Expand Up @@ -236,8 +238,8 @@ object SparkEnv extends Logging {
port: Int,
isDriver: Boolean,
isLocal: Boolean,
numUsableCores: Int,
listenerBus: LiveListenerBus = null,
numUsableCores: Int = 0,
mockOutputCommitCoordinator: Option[OutputCommitCoordinator] = None): SparkEnv = {

// Listener bus is only used on the driver
Expand Down
11 changes: 10 additions & 1 deletion core/src/main/scala/org/apache/spark/deploy/RRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,16 @@ object RRunner {

// Time to wait for SparkR backend to initialize in seconds
val backendTimeout = sys.env.getOrElse("SPARKR_BACKEND_TIMEOUT", "120").toInt
val rCommand = "Rscript"
val rCommand = {
// "spark.sparkr.r.command" is deprecated and replaced by "spark.r.command",
// but kept here for backward compatibility.
var cmd = sys.props.getOrElse("spark.sparkr.r.command", "Rscript")
cmd = sys.props.getOrElse("spark.r.command", cmd)
if (sys.props.getOrElse("spark.submit.deployMode", "client") == "client") {
cmd = sys.props.getOrElse("spark.r.driver.command", cmd)
}
cmd
}

// Check if the file path exists.
// If not, change directory to current working directory for YARN cluster mode
Expand Down
30 changes: 15 additions & 15 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -380,27 +380,27 @@ class SparkHadoopUtil extends Logging {

object SparkHadoopUtil {

private val hadoop = {
val yarnMode = java.lang.Boolean.valueOf(
System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
try {
Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
.newInstance()
.asInstanceOf[SparkHadoopUtil]
} catch {
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}
} else {
new SparkHadoopUtil
}
private lazy val hadoop = new SparkHadoopUtil
private lazy val yarn = try {
Utils.classForName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil")
.newInstance()
.asInstanceOf[SparkHadoopUtil]
} catch {
case e: Exception => throw new SparkException("Unable to load YARN support", e)
}

val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"

val SPARK_YARN_CREDS_COUNTER_DELIM = "-"

def get: SparkHadoopUtil = {
hadoop
// Check each time to support changing to/from YARN
val yarnMode = java.lang.Boolean.valueOf(
System.getProperty("SPARK_YARN_MODE", System.getenv("SPARK_YARN_MODE")))
if (yarnMode) {
yarn
} else {
hadoop
}
}
}
2 changes: 2 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ object SparkSubmit {
case (STANDALONE, CLUSTER) if args.isR =>
printErrorAndExit("Cluster deploy mode is currently not supported for R " +
"applications on standalone clusters.")
case (LOCAL, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is not compatible with master \"local\"")
case (_, CLUSTER) if isShell(args.primaryResource) =>
printErrorAndExit("Cluster deploy mode is not applicable to Spark shells.")
case (_, CLUSTER) if isSqlShell(args.mainClass) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,9 @@ private[spark] object TestClient {
val url = args(0)
val conf = new SparkConf
val rpcEnv = RpcEnv.create("spark", Utils.localHostName(), 0, conf, new SecurityManager(conf))
val executorClassname = TestExecutor.getClass.getCanonicalName.stripSuffix("$")
val desc = new ApplicationDescription("TestClient", Some(1), 512,
Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
Command(executorClassname, Seq(), Map(), Seq(), Seq(), Seq()), "ignored")
val listener = new TestListener
val client = new AppClient(rpcEnv, Array(url), desc, listener, new SparkConf)
client.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private[spark] class MetricsSystem private (
}
} catch {
case e: Exception => {
logError("Sink class " + classPath + " cannot be instantialized")
logError("Sink class " + classPath + " cannot be instantiated")
throw e
}
}
Expand Down
Loading

0 comments on commit 7c9155e

Please sign in to comment.