Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into dynamic-allocat…
Browse files Browse the repository at this point in the history
…ion-sc

Conflicts:
	core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
  • Loading branch information
Andrew Or committed Dec 10, 2014
2 parents 347a348 + 2b9b726 commit 59baf6c
Show file tree
Hide file tree
Showing 83 changed files with 808 additions and 1,304 deletions.
3 changes: 2 additions & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,8 @@ THE SOFTWARE.

========================================================================
For Scala Interpreter classes (all .scala files in repl/src/main/scala
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala):
except for Main.Scala, SparkHelper.scala and ExecutorClassLoader.scala),
and for SerializableMapWrapper in JavaUtils.scala:
========================================================================

Copyright (c) 2002-2013 EPFL
Expand Down
10 changes: 0 additions & 10 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,6 @@
</build>

<profiles>
<profile>
<id>yarn-alpha</id>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-yarn-alpha_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
</profile>
<profile>
<id>yarn</id>
<dependencies>
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/spark/SparkJobInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark;

import java.io.Serializable;

/**
* Exposes information about Spark Jobs.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkJobInfo {
public interface SparkJobInfo extends Serializable {
int jobId();
int[] stageIds();
JobExecutionStatus status();
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/spark/SparkStageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

package org.apache.spark;

import java.io.Serializable;

/**
* Exposes information about Spark Stages.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkStageInfo {
public interface SparkStageInfo extends Serializable {
int stageId();
int currentAttemptId();
long submissionTime();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,6 @@ span.additional-metric-title {

/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .gc_time, .deserialization_time, .serialization_time, .getting_result_time {
.scheduler_delay, .deserialization_time, .serialization_time, .getting_result_time {
display: none;
}
62 changes: 60 additions & 2 deletions core/src/main/scala/org/apache/spark/api/java/JavaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package org.apache.spark.api.java

import com.google.common.base.Optional

import scala.collection.convert.Wrappers.MapWrapper
import java.{util => ju}
import scala.collection.mutable

private[spark] object JavaUtils {
def optionToOptional[T](option: Option[T]): Optional[T] =
Expand All @@ -32,7 +33,64 @@ private[spark] object JavaUtils {
def mapAsSerializableJavaMap[A, B](underlying: collection.Map[A, B]) =
new SerializableMapWrapper(underlying)

// Implementation is copied from scala.collection.convert.Wrappers.MapWrapper,
// but implements java.io.Serializable. It can't just be subclassed to make it
// Serializable since the MapWrapper class has no no-arg constructor. This class
// doesn't need a no-arg constructor though.
class SerializableMapWrapper[A, B](underlying: collection.Map[A, B])
extends MapWrapper(underlying) with java.io.Serializable
extends ju.AbstractMap[A, B] with java.io.Serializable { self =>

override def size = underlying.size

override def get(key: AnyRef): B = try {
underlying get key.asInstanceOf[A] match {
case None => null.asInstanceOf[B]
case Some(v) => v
}
} catch {
case ex: ClassCastException => null.asInstanceOf[B]
}

override def entrySet: ju.Set[ju.Map.Entry[A, B]] = new ju.AbstractSet[ju.Map.Entry[A, B]] {
def size = self.size

def iterator = new ju.Iterator[ju.Map.Entry[A, B]] {
val ui = underlying.iterator
var prev : Option[A] = None

def hasNext = ui.hasNext

def next() = {
val (k, v) = ui.next
prev = Some(k)
new ju.Map.Entry[A, B] {
import scala.util.hashing.byteswap32
def getKey = k
def getValue = v
def setValue(v1 : B) = self.put(k, v1)
override def hashCode = byteswap32(k.hashCode) + (byteswap32(v.hashCode) << 16)
override def equals(other: Any) = other match {
case e: ju.Map.Entry[_, _] => k == e.getKey && v == e.getValue
case _ => false
}
}
}

def remove() {
prev match {
case Some(k) =>
underlying match {
case mm: mutable.Map[a, _] =>
mm remove k
prev = None
case _ =>
throw new UnsupportedOperationException("remove")
}
case _ =>
throw new IllegalStateException("next must be called at least once before remove")
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
* Request that the cluster manager kill the specified executors.
* Return whether the kill request is acknowledged.
*/
final override def killExecutors(executorIds: Seq[String]): Boolean = {
final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
logInfo(s"Requesting to kill executor(s) ${executorIds.mkString(", ")}")
val filteredExecutorIds = new ArrayBuffer[String]
executorIds.foreach { id =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ private[spark] class HashShuffleReader[K, C](
} else {
new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
}
} else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")

// Convert the Product2s to pairs since this is what downstream RDDs currently expect
iter.asInstanceOf[Iterator[Product2[K, C]]].map(pair => (pair._1, pair._2))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,8 @@ private[spark] class HashShuffleWriter[K, V](
} else {
records
}
} else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
} else {
require(!dep.mapSideCombine, "Map-side combine without Aggregator specified!")
records
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ private[spark] class SortShuffleWriter[K, V, C](
/** Write a bunch of records to this task's output */
override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
if (dep.mapSideCombine) {
if (!dep.aggregator.isDefined) {
throw new IllegalStateException("Aggregator is empty for map-side combine")
}
require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!")
sorter = new ExternalSorter[K, V, C](
dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
sorter.insertAll(records)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1014,8 +1014,10 @@ private[spark] class BlockManager(
// If we get here, the block write failed.
logWarning(s"Block $blockId was marked as failure. Nothing to drop")
return None
} else if (blockInfo.get(blockId).isEmpty) {
logWarning(s"Block $blockId was already dropped.")
return None
}

var blockIsUpdated = false
val level = info.level

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,8 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus

case UpdateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize) =>
// TODO: Ideally we want to handle all the message replies in receive instead of in the
// individual private methods.
updateBlockInfo(blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize)
sender ! updateBlockInfo(
blockManagerId, blockId, storageLevel, deserializedSize, size, tachyonSize)

case GetLocations(blockId) =>
sender ! getLocations(blockId)
Expand Down Expand Up @@ -355,23 +354,21 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
storageLevel: StorageLevel,
memSize: Long,
diskSize: Long,
tachyonSize: Long) {
tachyonSize: Long): Boolean = {

if (!blockManagerInfo.contains(blockManagerId)) {
if (blockManagerId.isDriver && !isLocal) {
// We intentionally do not register the master (except in local mode),
// so we should not indicate failure.
sender ! true
return true
} else {
sender ! false
return false
}
return
}

if (blockId == null) {
blockManagerInfo(blockManagerId).updateLastSeenMs()
sender ! true
return
return true
}

blockManagerInfo(blockManagerId).updateBlockInfo(
Expand All @@ -395,7 +392,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
if (locations.size == 0) {
blockLocations.remove(blockId)
}
sender ! true
true
}

private def getLocations(blockId: BlockId): Seq[BlockManagerId] = {
Expand Down
13 changes: 3 additions & 10 deletions core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,6 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<span class="additional-metric-title">Task Deserialization Time</span>
</span>
</li>
<li>
<span data-toggle="tooltip"
title={ToolTips.GC_TIME} data-placement="right">
<input type="checkbox" name={TaskDetailsClassNames.GC_TIME}/>
<span class="additional-metric-title">GC Time</span>
</span>
</li>
<li>
<span data-toggle="tooltip"
title={ToolTips.RESULT_SERIALIZATION_TIME} data-placement="right">
Expand Down Expand Up @@ -168,7 +161,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
("Executor ID / Host", ""), ("Launch Time", ""), ("Duration", ""),
("Scheduler Delay", TaskDetailsClassNames.SCHEDULER_DELAY),
("Task Deserialization Time", TaskDetailsClassNames.TASK_DESERIALIZATION_TIME),
("GC Time", TaskDetailsClassNames.GC_TIME),
("GC Time", ""),
("Result Serialization Time", TaskDetailsClassNames.RESULT_SERIALIZATION_TIME),
("Getting Result Time", TaskDetailsClassNames.GETTING_RESULT_TIME)) ++
{if (hasAccumulators) Seq(("Accumulators", "")) else Nil} ++
Expand Down Expand Up @@ -308,7 +301,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
<tr class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
{deserializationQuantiles}
</tr>
<tr class={TaskDetailsClassNames.GC_TIME}>{gcQuantiles}</tr>,
<tr>{gcQuantiles}</tr>,
<tr class={TaskDetailsClassNames.RESULT_SERIALIZATION_TIME}>
{serializationQuantiles}
</tr>,
Expand Down Expand Up @@ -429,7 +422,7 @@ private[ui] class StagePage(parent: StagesTab) extends WebUIPage("stage") {
class={TaskDetailsClassNames.TASK_DESERIALIZATION_TIME}>
{UIUtils.formatDuration(taskDeserializationTime.toLong)}
</td>
<td sorttable_customkey={gcTime.toString} class={TaskDetailsClassNames.GC_TIME}>
<td sorttable_customkey={gcTime.toString}>
{if (gcTime > 0) UIUtils.formatDuration(gcTime) else ""}
</td>
<td sorttable_customkey={serializationTime.toString}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ package org.apache.spark.ui.jobs
* If new optional metrics are added here, they should also be added to the end of webui.css
* to have the style set to "display: none;" by default.
*/
private object TaskDetailsClassNames {
private[spark] object TaskDetailsClassNames {
val SCHEDULER_DELAY = "scheduler_delay"
val GC_TIME = "gc_time"
val TASK_DESERIALIZATION_TIME = "deserialization_time"
val RESULT_SERIALIZATION_TIME = "serialization_time"
val GETTING_RESULT_TIME = "getting_result_time"
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -425,8 +425,7 @@ private[spark] object Utils extends Logging {
conf: SparkConf,
securityMgr: SecurityManager,
hadoopConf: Configuration) {
val tempDir = getLocalDir(conf)
val tempFile = File.createTempFile("fetchFileTemp", null, new File(tempDir))
val tempFile = File.createTempFile("fetchFileTemp", null, new File(targetDir.getAbsolutePath))
val targetFile = new File(targetDir, filename)
val uri = new URI(url)
val fileOverwrite = conf.getBoolean("spark.files.overwrite", defaultValue = false)
Expand Down
13 changes: 13 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -1357,6 +1357,19 @@ public Tuple2<Integer, int[]> call(Integer x) {
pairRDD.collectAsMap(); // Used to crash with ClassCastException
}

@SuppressWarnings("unchecked")
@Test
public void collectAsMapAndSerialize() throws Exception {
JavaPairRDD<String,Integer> rdd =
sc.parallelizePairs(Arrays.asList(new Tuple2<String,Integer>("foo", 1)));
Map<String,Integer> map = rdd.collectAsMap();
ByteArrayOutputStream bytes = new ByteArrayOutputStream();
new ObjectOutputStream(bytes).writeObject(map);
Map<String,Integer> deserializedMap = (Map<String,Integer>)
new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray())).readObject();
Assert.assertEquals(1, deserializedMap.get("foo").intValue());
}

@Test
@SuppressWarnings("unchecked")
public void sampleByKey() {
Expand Down
5 changes: 1 addition & 4 deletions dev/scalastyle
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,8 @@
#

echo -e "q\n" | sbt/sbt -Phive -Phive-thriftserver scalastyle > scalastyle.txt
# Check style with YARN alpha built too
echo -e "q\n" | sbt/sbt -Pyarn-alpha -Phadoop-0.23 -Dhadoop.version=0.23.9 yarn-alpha/scalastyle \
>> scalastyle.txt
# Check style with YARN built too
echo -e "q\n" | sbt/sbt -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 yarn/scalastyle \
echo -e "q\n" | sbt/sbt -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 scalastyle \
>> scalastyle.txt

ERRORS=$(cat scalastyle.txt | awk '{if($1~/error/)print}')
Expand Down
25 changes: 2 additions & 23 deletions docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,32 +60,11 @@ mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package
mvn -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package
{% endhighlight %}

For Apache Hadoop 2.x, 0.23.x, Cloudera CDH, and other Hadoop versions with YARN, you can enable the "yarn-alpha" or "yarn" profile and optionally set the "yarn.version" property if it is different from "hadoop.version". The additional build profile required depends on the YARN version:

<table class="table">
<thead>
<tr><th>YARN version</th><th>Profile required</th></tr>
</thead>
<tbody>
<tr><td>0.23.x to 2.1.x</td><td>yarn-alpha (Deprecated.)</td></tr>
<tr><td>2.2.x and later</td><td>yarn</td></tr>
</tbody>
</table>

Note: Support for YARN-alpha API's will be removed in Spark 1.3 (see SPARK-3445).
For Apache Hadoop 2.x, 0.23.x, Cloudera CDH, and other Hadoop versions with YARN, you can enable the "yarn" profile and optionally set the "yarn.version" property if it is different from "hadoop.version". As of Spark 1.3, Spark only supports YARN versions 2.2.0 and later.

Examples:

{% highlight bash %}
# Apache Hadoop 2.0.5-alpha
mvn -Pyarn-alpha -Dhadoop.version=2.0.5-alpha -DskipTests clean package

# Cloudera CDH 4.2.0
mvn -Pyarn-alpha -Dhadoop.version=2.0.0-cdh4.2.0 -DskipTests clean package

# Apache Hadoop 0.23.x
mvn -Pyarn-alpha -Phadoop-0.23 -Dhadoop.version=0.23.7 -DskipTests clean package

# Apache Hadoop 2.2.X
mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package

Expand All @@ -99,7 +78,7 @@ Versions of Hadoop after 2.5.X may or may not work with the -Phadoop-2.4 profile
released after this version of Spark).

# Different versions of HDFS and YARN.
mvn -Pyarn-alpha -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=0.23.7 -DskipTests clean package
mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -Dyarn.version=2.2.0 -DskipTests clean package
{% endhighlight %}

# Building With Hive and JDBC Support
Expand Down
Loading

0 comments on commit 59baf6c

Please sign in to comment.