Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into dynamic-allocat…
Browse files Browse the repository at this point in the history
…ion-sc

Conflicts:
	core/src/main/scala/org/apache/spark/SparkContext.scala
  • Loading branch information
Andrew Or committed Dec 10, 2014
2 parents 59baf6c + 36bdb5b commit 187070d
Show file tree
Hide file tree
Showing 15 changed files with 114 additions and 43 deletions.
6 changes: 3 additions & 3 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,14 @@ else
assembly_folder="$ASSEMBLY_DIR"
fi

num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)"
num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" ]; then
echo "Failed to find Spark assembly in $assembly_folder"
echo "You need to build Spark before running this program."
exit 1
fi
if [ "$num_jars" -gt "1" ]; then
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar")
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar$")
echo "Found multiple Spark assembly jars in $assembly_folder:"
echo "$jars_list"
echo "Please remove all but one jar."
Expand Down Expand Up @@ -108,7 +108,7 @@ else
datanucleus_dir="$FWDIR"/lib_managed/jars
fi

datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar")"
datanucleus_jars="$(find "$datanucleus_dir" 2>/dev/null | grep "datanucleus-.*\\.jar$")"
datanucleus_jars="$(echo "$datanucleus_jars" | tr "\n" : | sed s/:$//g)"

if [ -n "$datanucleus_jars" ]; then
Expand Down
7 changes: 7 additions & 0 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ source "$FWDIR"/bin/utils.sh
SUBMIT_USAGE_FUNCTION=usage
gatherSparkSubmitOpts "$@"

# SPARK-4161: scala does not assume use of the java classpath,
# so we need to add the "-Dscala.usejavacp=true" flag mnually. We
# do this specifically for the Spark shell because the scala REPL
# has its own class loader, and any additional classpath specified
# through spark.driver.extraClassPath is not automatically propagated.
SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"

function main() {
if $cygwin; then
# Workaround for issue involving JLine and Cygwin
Expand Down
14 changes: 8 additions & 6 deletions core/src/main/scala/org/apache/spark/Accumulators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark

import java.io.{ObjectInputStream, Serializable}
import java.util.concurrent.atomic.AtomicLong
import java.lang.ThreadLocal

import scala.collection.generic.Growable
import scala.collection.mutable.Map
Expand Down Expand Up @@ -278,10 +279,12 @@ object AccumulatorParam {

// TODO: The multi-thread support in accumulators is kind of lame; check
// if there's a more intuitive way of doing it right
private object Accumulators {
private[spark] object Accumulators {
// TODO: Use soft references? => need to make readObject work properly then
val originals = Map[Long, Accumulable[_, _]]()
val localAccums = Map[Thread, Map[Long, Accumulable[_, _]]]()
val localAccums = new ThreadLocal[Map[Long, Accumulable[_, _]]]() {
override protected def initialValue() = Map[Long, Accumulable[_, _]]()
}
var lastId: Long = 0

def newId(): Long = synchronized {
Expand All @@ -293,22 +296,21 @@ private object Accumulators {
if (original) {
originals(a.id) = a
} else {
val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
accums(a.id) = a
localAccums.get()(a.id) = a
}
}

// Clear the local (non-original) accumulators for the current thread
def clear() {
synchronized {
localAccums.remove(Thread.currentThread)
localAccums.get.clear
}
}

// Get the values of the local accumulators for the current thread (by ID)
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map())) {
for ((id, accum) <- localAccums.get) {
ret(id) = accum.localValue
}
return ret
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {

private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
// When spilling is enabled sorting will happen externally, but not necessarily with an
// ExternalSorter.
private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)

@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
combineValuesByKey(iter, null)

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
if (!externalSorting) {
if (!isSpillEnabled) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
Expand Down Expand Up @@ -71,9 +73,9 @@ case class Aggregator[K, V, C] (
combineCombinersByKey(iter, null)

def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext)
: Iterator[(K, C)] =
: Iterator[(K, C)] =
{
if (!externalSorting) {
if (!isSpillEnabled) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
val update = (hadValue: Boolean, oldValue: C) => {
Expand Down
10 changes: 9 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -357,8 +357,12 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
private val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false)
private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
if (conf.getBoolean("spark.dynamicAllocation.enabled", false)) {
if (dynamicAllocationEnabled) {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Dynamic allocation of executors is currently only supported in YARN mode")
Some(new ExecutorAllocationManager(this, listenerBus, conf))
} else {
None
Expand Down Expand Up @@ -989,6 +993,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def requestExecutors(numAdditionalExecutors: Int): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Requesting executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.requestExecutors(numAdditionalExecutors)
Expand All @@ -1005,6 +1011,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*/
@DeveloperApi
override def killExecutors(executorIds: Seq[String]): Boolean = {
assert(master.contains("yarn") || dynamicAllocationTesting,
"Killing executors is currently only supported in YARN mode")
schedulerBackend match {
case b: CoarseGrainedSchedulerBackend =>
b.killExecutors(executorIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

private val pageSize = 20
private val plusOrMinus = 2

def render(request: HttpServletRequest): Seq[Node] = {
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
Expand All @@ -39,6 +40,9 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
val last = Math.min(actualFirst + pageSize, allApps.size) - 1
val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)

val secondPageFromLeft = 2
val secondPageFromRight = pageCount - 1

val appTable = UIUtils.listingTable(appHeader, appRow, apps)
val providerConfig = parent.getProviderConfig()
val content =
Expand All @@ -48,13 +52,38 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
{providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
</ul>
{
// This displays the indices of pages that are within `plusOrMinus` pages of
// the current page. Regardless of where the current page is, this also links
// to the first and last page. If the current page +/- `plusOrMinus` is greater
// than the 2nd page from the first page or less than the 2nd page from the last
// page, `...` will be displayed.
if (allApps.size > 0) {
val leftSideIndices =
rangeIndices(actualPage - plusOrMinus until actualPage, 1 < _)
val rightSideIndices =
rangeIndices(actualPage + 1 to actualPage + plusOrMinus, _ < pageCount)

<h4>
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
<span style="float: right">
{if (actualPage > 1) <a href={"/?page=" + (actualPage - 1)}>&lt;</a>}
{if (actualPage < pageCount) <a href={"/?page=" + (actualPage + 1)}>&gt;</a>}
</span>
<span style="float: right">
{
if (actualPage > 1) {
<a href={"/?page=" + (actualPage - 1)}>&lt; </a>
<a href={"/?page=1"}>1</a>
}
}
{if (actualPage - plusOrMinus > secondPageFromLeft) " ... "}
{leftSideIndices}
{actualPage}
{rightSideIndices}
{if (actualPage + plusOrMinus < secondPageFromRight) " ... "}
{
if (actualPage < pageCount) {
<a href={"/?page=" + pageCount}>{pageCount}</a>
<a href={"/?page=" + (actualPage + 1)}> &gt;</a>
}
}
</span>
</h4> ++
appTable
} else {
Expand All @@ -81,6 +110,10 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
"Spark User",
"Last Updated")

private def rangeIndices(range: Seq[Int], condition: Int => Boolean): Seq[Node] = {
range.filter(condition).map(nextPage => <a href={"/?page=" + nextPage}> {nextPage} </a>)
}

private def appRow(info: ApplicationHistoryInfo): Seq[Node] = {
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${info.id}"
val startTime = UIUtils.formatDate(info.startTime)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ private[spark] class Executor(
val startGCTime = gcTime

try {
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
Expand Down Expand Up @@ -278,6 +277,8 @@ private[spark] class Executor(
env.shuffleMemoryManager.releaseMemoryForThisThread()
// Release memory used by this thread for unrolling blocks
env.blockManager.memoryStore.releaseUnrollMemoryForThisThread()
// Release memory used by this thread for accumulators
Accumulators.clear()
runningTasks.remove(taskId)
}
}
Expand Down
36 changes: 21 additions & 15 deletions core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,10 @@ import org.apache.spark.util.Utils
* @param preferredLocation the preferred location for this partition
*/
private[spark] case class CoalescedRDDPartition(
index: Int,
@transient rdd: RDD[_],
parentsIndices: Array[Int],
@transient preferredLocation: String = ""
) extends Partition {
index: Int,
@transient rdd: RDD[_],
parentsIndices: Array[Int],
@transient preferredLocation: Option[String] = None) extends Partition {
var parents: Seq[Partition] = parentsIndices.map(rdd.partitions(_))

@throws(classOf[IOException])
Expand All @@ -55,9 +54,10 @@ private[spark] case class CoalescedRDDPartition(
* @return locality of this coalesced partition between 0 and 1
*/
def localFraction: Double = {
val loc = parents.count(p =>
rdd.context.getPreferredLocs(rdd, p.index).map(tl => tl.host).contains(preferredLocation))

val loc = parents.count { p =>
val parentPreferredLocations = rdd.context.getPreferredLocs(rdd, p.index).map(_.host)
preferredLocation.exists(parentPreferredLocations.contains)
}
if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
}
}
Expand All @@ -73,9 +73,9 @@ private[spark] case class CoalescedRDDPartition(
* @param balanceSlack used to trade-off balance and locality. 1.0 is all locality, 0 is all balance
*/
private[spark] class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
@transient var prev: RDD[T],
maxPartitions: Int,
balanceSlack: Double = 0.10)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies

override def getPartitions: Array[Partition] = {
Expand Down Expand Up @@ -113,7 +113,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
List(partition.asInstanceOf[CoalescedRDDPartition].preferredLocation)
partition.asInstanceOf[CoalescedRDDPartition].preferredLocation.toSeq
}
}

Expand Down Expand Up @@ -147,7 +147,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
*
*/

private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {
private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: Double) {

def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean = o1.size < o2.size
def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean =
Expand Down Expand Up @@ -341,8 +341,14 @@ private[spark] class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanc
}
}

private[spark] case class PartitionGroup(prefLoc: String = "") {
private case class PartitionGroup(prefLoc: Option[String] = None) {
var arr = mutable.ArrayBuffer[Partition]()

def size = arr.size
}

private object PartitionGroup {
def apply(prefLoc: String): PartitionGroup = {
require(prefLoc != "", "Preferred location must not be empty")
PartitionGroup(Some(prefLoc))
}
}
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,9 @@ private[spark] object UIUtils extends Logging {
</li>
}
val helpButton: Seq[Node] = helpText.map { helpText =>
<a data-toggle="tooltip" data-placement="bottom" title={helpText}>(?)</a>
<sup>
(<a data-toggle="tooltip" data-placement="bottom" title={helpText}>?</a>)
</sup>
}.getOrElse(Seq.empty)

<html>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
.setMaster("local")
.setAppName("test-executor-allocation-manager")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
intercept[SparkException] { new SparkContext(conf) }
SparkEnv.get.stop() // cleanup the created environment
SparkContext.clearActiveContext()
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class RDDSuite extends FunSuite with SharedSparkContext {
test("coalesced RDDs with locality") {
val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b"))))
val coal3 = data3.coalesce(3)
val list3 = coal3.partitions.map(p => p.asInstanceOf[CoalescedRDDPartition].preferredLocation)
val list3 = coal3.partitions.flatMap(_.asInstanceOf[CoalescedRDDPartition].preferredLocation)
assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped")

// RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
Expand Down
11 changes: 10 additions & 1 deletion docs/spark-standalone.md
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ To run an interactive Spark shell against the cluster, run the following command

You can also pass an option `--total-executor-cores <numCores>` to control the number of cores that spark-shell uses on the cluster.

# Launching Compiled Spark Applications
# Launching Spark Applications

The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to
submit a compiled Spark application to the cluster. For standalone clusters, Spark currently
Expand All @@ -272,6 +272,15 @@ should specify them through the `--jars` flag using comma as a delimiter (e.g. `
To control the application's configuration or execution environment, see
[Spark Configuration](configuration.html).

Additionally, standalone `cluster` mode supports restarting your application automatically if it
exited with non-zero exit code. To use this feature, you may pass in the `--supervise` flag to
`spark-submit` when launching your application. Then, if you wish to kill an application that is
failing repeatedly, you may do so through:

./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>

You can find the driver ID through the standalone Master web UI at `http://<master url>:8080`.

# Resource Scheduling

The standalone cluster mode currently only supports a simple FIFO scheduler across applications.
Expand Down
Loading

0 comments on commit 187070d

Please sign in to comment.