Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into rest
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Jan 29, 2015
2 parents e2104e6 + 7156322 commit 914fdff
Show file tree
Hide file tree
Showing 129 changed files with 3,197 additions and 1,044 deletions.
22 changes: 0 additions & 22 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,6 @@
</properties>

<dependencies>
<!-- Promote Guava to compile scope in this module so it's included while shading. -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -133,22 +127,6 @@
<goal>shade</goal>
</goals>
<configuration>
<relocations>
<relocation>
<pattern>com.google</pattern>
<shadedPattern>org.spark-project.guava</shadedPattern>
<includes>
<include>com.google.common.**</include>
</includes>
<excludes>
<exclude>com/google/common/base/Absent*</exclude>
<exclude>com/google/common/base/Function</exclude>
<exclude>com/google/common/base/Optional*</exclude>
<exclude>com/google/common/base/Present*</exclude>
<exclude>com/google/common/base/Supplier</exclude>
</excludes>
</relocation>
</relocations>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
Expand Down
4 changes: 2 additions & 2 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ install_app() {
# check if we already have the tarball
# check if we have curl installed
# download application
[ ! -f "${local_tarball}" ] && [ -n "`which curl 2>/dev/null`" ] && \
[ ! -f "${local_tarball}" ] && [ $(command -v curl) ] && \
echo "exec: curl ${curl_opts} ${remote_tarball}" && \
curl ${curl_opts} "${remote_tarball}" > "${local_tarball}"
# if the file still doesn't exist, lets try `wget` and cross our fingers
[ ! -f "${local_tarball}" ] && [ -n "`which wget 2>/dev/null`" ] && \
[ ! -f "${local_tarball}" ] && [ $(command -v wget) ] && \
echo "exec: wget ${wget_opts} ${remote_tarball}" && \
wget ${wget_opts} -O "${local_tarball}" "${remote_tarball}"
# if both were unsuccessful, exit
Expand Down
4 changes: 2 additions & 2 deletions build/sbt-launch-lib.bash
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ acquire_sbt_jar () {
# Download
printf "Attempting to fetch sbt\n"
JAR_DL="${JAR}.part"
if hash curl 2>/dev/null; then
if [ $(command -v curl) ]; then
(curl --silent ${URL1} > "${JAR_DL}" || curl --silent ${URL2} > "${JAR_DL}") && mv "${JAR_DL}" "${JAR}"
elif hash wget 2>/dev/null; then
elif [ $(command -v wget) ]; then
(wget --quiet ${URL1} -O "${JAR_DL}" || wget --quiet ${URL2} -O "${JAR_DL}") && mv "${JAR_DL}" "${JAR}"
else
printf "You do not have curl or wget installed, please install sbt manually from http://www.scala-sbt.org/\n"
Expand Down
52 changes: 4 additions & 48 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@
<name>Spark Project Core</name>
<url>http://spark.apache.org/</url>
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill_${scala.binary.version}</artifactId>
Expand Down Expand Up @@ -106,16 +110,6 @@
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
</dependency>
<!--
Promote Guava to "compile" so that maven-shade-plugin picks it up (for packaging the Optional
class exposed in the Java API). The plugin will then remove this dependency from the published
pom, so that Guava does not pollute the client's compilation classpath.
-->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
Expand Down Expand Up @@ -350,44 +344,6 @@
<verbose>true</verbose>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<shadedArtifactAttached>false</shadedArtifactAttached>
<artifactSet>
<includes>
<include>com.google.guava:guava</include>
</includes>
</artifactSet>
<filters>
<!-- See comment in the guava dependency declaration above. -->
<filter>
<artifact>com.google.guava:guava</artifact>
<includes>
<include>com/google/common/base/Absent*</include>
<include>com/google/common/base/Function</include>
<include>com/google/common/base/Optional*</include>
<include>com/google/common/base/Present*</include>
<include>com/google/common/base/Supplier</include>
</includes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
<!--
Copy guava to the build directory. This is needed to make the SPARK_PREPEND_CLASSES
option work in compute-classpath.sh, since it would put the non-shaded Spark classes in
the runtime classpath.
-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
Expand Down
37 changes: 37 additions & 0 deletions core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,19 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
*/
def reduce(f: JFunction2[T, T, T]): T = rdd.reduce(f)

/**
* Reduces the elements of this RDD in a multi-level tree pattern.
*
* @param depth suggested depth of the tree
* @see [[org.apache.spark.api.java.JavaRDDLike#reduce]]
*/
def treeReduce(f: JFunction2[T, T, T], depth: Int): T = rdd.treeReduce(f, depth)

/**
* [[org.apache.spark.api.java.JavaRDDLike#treeReduce]] with suggested depth 2.
*/
def treeReduce(f: JFunction2[T, T, T]): T = treeReduce(f, 2)

/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
* given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
Expand All @@ -369,6 +382,30 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
combOp: JFunction2[U, U, U]): U =
rdd.aggregate(zeroValue)(seqOp, combOp)(fakeClassTag[U])

/**
* Aggregates the elements of this RDD in a multi-level tree pattern.
*
* @param depth suggested depth of the tree
* @see [[org.apache.spark.api.java.JavaRDDLike#aggregate]]
*/
def treeAggregate[U](
zeroValue: U,
seqOp: JFunction2[U, T, U],
combOp: JFunction2[U, U, U],
depth: Int): U = {
rdd.treeAggregate(zeroValue)(seqOp, combOp, depth)(fakeClassTag[U])
}

/**
* [[org.apache.spark.api.java.JavaRDDLike#treeAggregate]] with suggested depth 2.
*/
def treeAggregate[U](
zeroValue: U,
seqOp: JFunction2[U, T, U],
combOp: JFunction2[U, U, U]): U = {
treeAggregate(zeroValue, seqOp, combOp, 2)
}

/**
* Return the number of elements in the RDD.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,11 @@ private[python] class JavaToWritableConverter extends Converter[Any, Writable] {
mapWritable.put(convertToWritable(k), convertToWritable(v))
}
mapWritable
case array: Array[Any] => {
val arrayWriteable = new ArrayWritable(classOf[Writable])
arrayWriteable.set(array.map(convertToWritable(_)))
arrayWriteable
}
case other => throw new SparkException(
s"Data of type ${other.getClass.getName} cannot be used")
}
Expand Down
18 changes: 13 additions & 5 deletions core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ private[spark] object SerDeUtil extends Logging {
iter.flatMap { row =>
val obj = unpickle.loads(row)
if (batched) {
obj.asInstanceOf[JArrayList[_]].asScala
obj match {
case array: Array[Any] => array.toSeq
case _ => obj.asInstanceOf[JArrayList[_]].asScala
}
} else {
Seq(obj)
}
Expand Down Expand Up @@ -199,7 +202,10 @@ private[spark] object SerDeUtil extends Logging {
* representation is serialized
*/
def pairRDDToPython(rdd: RDD[(Any, Any)], batchSize: Int): RDD[Array[Byte]] = {
val (keyFailed, valueFailed) = checkPickle(rdd.first())
val (keyFailed, valueFailed) = rdd.take(1) match {
case Array() => (false, false)
case Array(first) => checkPickle(first)
}

rdd.mapPartitions { iter =>
val cleaned = iter.map { case (k, v) =>
Expand All @@ -226,10 +232,12 @@ private[spark] object SerDeUtil extends Logging {
}

val rdd = pythonToJava(pyRDD, batched).rdd
rdd.first match {
case obj if isPair(obj) =>
rdd.take(1) match {
case Array(obj) if isPair(obj) =>
// we only accept (K, V)
case other => throw new SparkException(
case Array() =>
// we also accept empty collections
case Array(other) => throw new SparkException(
s"RDD element of type ${other.getClass.getName} cannot be used")
}
rdd.map { obj =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ private[spark] class Executor(
}

val executorSource = new ExecutorSource(this, executorId)
conf.set("spark.executor.id", executorId)

if (!isLocal) {
env.metricsSystem.registerSource(executorSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,14 +184,16 @@ private[nio] class ConnectionManager(
// to be able to track asynchronous messages
private val idCount: AtomicInteger = new AtomicInteger(1)

private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()
private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()

private val selectorThread = new Thread("connection-manager-thread") {
override def run() = ConnectionManager.this.run()
}
selectorThread.setDaemon(true)
// start this thread last, since it invokes run(), which accesses members above
selectorThread.start()

private val writeRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()

private def triggerWrite(key: SelectionKey) {
val conn = connectionsByKey.getOrElse(key, null)
if (conn == null) return
Expand Down Expand Up @@ -232,7 +234,6 @@ private[nio] class ConnectionManager(
} )
}

private val readRunnableStarted: HashSet[SelectionKey] = new HashSet[SelectionKey]()

private def triggerRead(key: SelectionKey) {
val conn = connectionsByKey.getOrElse(key, null)
Expand Down
63 changes: 63 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,38 @@ abstract class RDD[T: ClassTag](
jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

/**
* Reduces the elements of this RDD in a multi-level tree pattern.
*
* @param depth suggested depth of the tree (default: 2)
* @see [[org.apache.spark.rdd.RDD#reduce]]
*/
def treeReduce(f: (T, T) => T, depth: Int = 2): T = {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
val cleanF = context.clean(f)
val reducePartition: Iterator[T] => Option[T] = iter => {
if (iter.hasNext) {
Some(iter.reduceLeft(cleanF))
} else {
None
}
}
val partiallyReduced = mapPartitions(it => Iterator(reducePartition(it)))
val op: (Option[T], Option[T]) => Option[T] = (c, x) => {
if (c.isDefined && x.isDefined) {
Some(cleanF(c.get, x.get))
} else if (c.isDefined) {
c
} else if (x.isDefined) {
x
} else {
None
}
}
partiallyReduced.treeAggregate(Option.empty[T])(op, op, depth)
.getOrElse(throw new UnsupportedOperationException("empty collection"))
}

/**
* Aggregate the elements of each partition, and then the results for all the partitions, using a
* given associative function and a neutral "zero value". The function op(t1, t2) is allowed to
Expand Down Expand Up @@ -935,6 +967,37 @@ abstract class RDD[T: ClassTag](
jobResult
}

/**
* Aggregates the elements of this RDD in a multi-level tree pattern.
*
* @param depth suggested depth of the tree (default: 2)
* @see [[org.apache.spark.rdd.RDD#aggregate]]
*/
def treeAggregate[U: ClassTag](zeroValue: U)(
seqOp: (U, T) => U,
combOp: (U, U) => U,
depth: Int = 2): U = {
require(depth >= 1, s"Depth must be greater than or equal to 1 but got $depth.")
if (partitions.size == 0) {
return Utils.clone(zeroValue, context.env.closureSerializer.newInstance())
}
val cleanSeqOp = context.clean(seqOp)
val cleanCombOp = context.clean(combOp)
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp)
var partiallyAggregated = mapPartitions(it => Iterator(aggregatePartition(it)))
var numPartitions = partiallyAggregated.partitions.size
val scale = math.max(math.ceil(math.pow(numPartitions, 1.0 / depth)).toInt, 2)
// If creating an extra level doesn't help reduce the wall-clock time, we stop tree aggregation.
while (numPartitions > scale + numPartitions / scale) {
numPartitions /= scale
val curNumPartitions = numPartitions
partiallyAggregated = partiallyAggregated.mapPartitionsWithIndex { (i, iter) =>
iter.map((i % curNumPartitions, _))
}.reduceByKey(new HashPartitioner(curNumPartitions), cleanCombOp).values
}
partiallyAggregated.reduce(cleanCombOp)
}

/**
* Return the number of elements in the RDD.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,11 +91,11 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan
case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent

@DeveloperApi
case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo)
case class SparkListenerExecutorAdded(time: Long, executorId: String, executorInfo: ExecutorInfo)
extends SparkListenerEvent

@DeveloperApi
case class SparkListenerExecutorRemoved(executorId: String)
case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String)
extends SparkListenerEvent

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
listenerBus.post(SparkListenerExecutorAdded(executorId, data))
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}

Expand Down Expand Up @@ -216,7 +217,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
totalCoreCount.addAndGet(-executorInfo.totalCores)
totalRegisteredExecutors.addAndGet(-1)
scheduler.executorLost(executorId, SlaveLost(reason))
listenerBus.post(SparkListenerExecutorRemoved(executorId))
listenerBus.post(
SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason))
case None => logError(s"Asked to remove non-existent executor $executorId")
}
}
Expand Down
Loading

0 comments on commit 914fdff

Please sign in to comment.