Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into rest
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed Jan 27, 2015
2 parents 77774ba + f2ba5c6 commit 6568ca5
Show file tree
Hide file tree
Showing 388 changed files with 9,413 additions and 8,102 deletions.
2 changes: 2 additions & 0 deletions assembly/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,10 @@
</includes>
<excludes>
<exclude>com/google/common/base/Absent*</exclude>
<exclude>com/google/common/base/Function</exclude>
<exclude>com/google/common/base/Optional*</exclude>
<exclude>com/google/common/base/Present*</exclude>
<exclude>com/google/common/base/Supplier</exclude>
</excludes>
</relocation>
</relocations>
Expand Down
27 changes: 15 additions & 12 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,25 @@ else
assembly_folder="$ASSEMBLY_DIR"
fi

num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" ]; then
echo "Failed to find Spark assembly in $assembly_folder"
echo "You need to build Spark before running this program."
exit 1
fi
num_jars=0

for f in ${assembly_folder}/spark-assembly*hadoop*.jar; do
if [[ ! -e "$f" ]]; then
echo "Failed to find Spark assembly in $assembly_folder" 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
ASSEMBLY_JAR="$f"
num_jars=$((num_jars+1))
done

if [ "$num_jars" -gt "1" ]; then
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar$")
echo "Found multiple Spark assembly jars in $assembly_folder:"
echo "$jars_list"
echo "Please remove all but one jar."
echo "Found multiple Spark assembly jars in $assembly_folder:" 1>&2
ls ${assembly_folder}/spark-assembly*hadoop*.jar 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi

ASSEMBLY_JAR="$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)"

# Verify that versions of java used to build the jars and run Spark are compatible
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
Expand Down
27 changes: 21 additions & 6 deletions bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,32 @@ else
fi

if [ -f "$FWDIR/RELEASE" ]; then
export SPARK_EXAMPLES_JAR="`ls "$FWDIR"/lib/spark-examples-*hadoop*.jar`"
elif [ -e "$EXAMPLES_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-examples-*hadoop*.jar ]; then
export SPARK_EXAMPLES_JAR="`ls "$EXAMPLES_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-examples-*hadoop*.jar`"
JAR_PATH="${FWDIR}/lib"
else
JAR_PATH="${EXAMPLES_DIR}/target/scala-${SPARK_SCALA_VERSION}"
fi

if [[ -z "$SPARK_EXAMPLES_JAR" ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
JAR_COUNT=0

for f in ${JAR_PATH}/spark-examples-*hadoop*.jar; do
if [[ ! -e "$f" ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
exit 1
fi
SPARK_EXAMPLES_JAR="$f"
JAR_COUNT=$((JAR_COUNT+1))
done

if [ "$JAR_COUNT" -gt "1" ]; then
echo "Found multiple Spark examples assembly jars in ${JAR_PATH}" 1>&2
ls ${JAR_PATH}/spark-examples-*hadoop*.jar 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi

export SPARK_EXAMPLES_JAR

EXAMPLE_MASTER=${MASTER:-"local[*]"}

if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
Expand Down
9 changes: 6 additions & 3 deletions bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ FWDIR="$(cd "`dirname "$0"`"/..; pwd)"

# Export this as SPARK_HOME
export SPARK_HOME="$FWDIR"
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"$SPARK_HOME/conf"}"

. "$FWDIR"/bin/load-spark-env.sh

Expand Down Expand Up @@ -71,6 +72,8 @@ case "$1" in
'org.apache.spark.executor.MesosExecutorBackend')
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
export PYTHONPATH="$FWDIR/python:$PYTHONPATH"
export PYTHONPATH="$FWDIR/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
;;

# Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
Expand Down Expand Up @@ -118,8 +121,8 @@ fi
JAVA_OPTS="$JAVA_OPTS -Xms$OUR_JAVA_MEM -Xmx$OUR_JAVA_MEM"

# Load extra JAVA_OPTS from conf/java-opts, if it exists
if [ -e "$FWDIR/conf/java-opts" ] ; then
JAVA_OPTS="$JAVA_OPTS `cat "$FWDIR"/conf/java-opts`"
if [ -e "$SPARK_CONF_DIR/java-opts" ] ; then
JAVA_OPTS="$JAVA_OPTS `cat "$SPARK_CONF_DIR"/java-opts`"
fi

# Attention: when changing the way the JAVA_OPTS are assembled, the change must be reflected in CommandUtils.scala!
Expand Down Expand Up @@ -148,7 +151,7 @@ fi
if [[ "$1" =~ org.apache.spark.tools.* ]]; then
if test -z "$SPARK_TOOLS_JAR"; then
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2
echo "You need to build Spark before running $1." 1>&2
echo "You need to run \"build/sbt tools/package\" before running $1." 1>&2
exit 1
fi
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
Expand Down
8 changes: 4 additions & 4 deletions build/mvn
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,10 @@ install_app() {
# Install maven under the build/ folder
install_mvn() {
install_app \
"http://apache.claz.org/maven/maven-3/3.2.3/binaries" \
"apache-maven-3.2.3-bin.tar.gz" \
"apache-maven-3.2.3/bin/mvn"
MVN_BIN="${_DIR}/apache-maven-3.2.3/bin/mvn"
"http://archive.apache.org/dist/maven/maven-3/3.2.5/binaries" \
"apache-maven-3.2.5-bin.tar.gz" \
"apache-maven-3.2.5/bin/mvn"
MVN_BIN="${_DIR}/apache-maven-3.2.5/bin/mvn"
}

# Install zinc under the build/ folder
Expand Down
2 changes: 2 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -372,8 +372,10 @@
<artifact>com.google.guava:guava</artifact>
<includes>
<include>com/google/common/base/Absent*</include>
<include>com/google/common/base/Function</include>
<include>com/google/common/base/Optional*</include>
<include>com/google/common/base/Present*</include>
<include>com/google/common/base/Supplier</include>
</includes>
</filter>
</filters>
Expand Down
97 changes: 97 additions & 0 deletions core/src/main/java/org/apache/spark/JavaSparkListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;

/**
* Java clients should extend this class instead of implementing
* SparkListener directly. This is to prevent java clients
* from breaking when new events are added to the SparkListener
* trait.
*
* This is a concrete class instead of abstract to enforce
* new events get added to both the SparkListener and this adapter
* in lockstep.
*/
public class JavaSparkListener implements SparkListener {

@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }

@Override
public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }

@Override
public void onTaskStart(SparkListenerTaskStart taskStart) { }

@Override
public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }

@Override
public void onTaskEnd(SparkListenerTaskEnd taskEnd) { }

@Override
public void onJobStart(SparkListenerJobStart jobStart) { }

@Override
public void onJobEnd(SparkListenerJobEnd jobEnd) { }

@Override
public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }

@Override
public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }

@Override
public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }

@Override
public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }

@Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }

@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }

@Override
public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }

@Override
public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }

@Override
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
}
24 changes: 22 additions & 2 deletions core/src/main/java/org/apache/spark/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static void unset() {
*/
public abstract boolean isInterrupted();

/** @deprecated: use isRunningLocally() */
/** @deprecated use {@link #isRunningLocally()} */
@Deprecated
public abstract boolean runningLocally();

Expand All @@ -87,19 +87,39 @@ static void unset() {
* is for HadoopRDD to register a callback to close the input stream.
* Will be called in any situation - success, failure, or cancellation.
*
* @deprecated: use addTaskCompletionListener
* @deprecated use {@link #addTaskCompletionListener(scala.Function1)}
*
* @param f Callback function.
*/
@Deprecated
public abstract void addOnCompleteCallback(final Function0<Unit> f);

/**
* The ID of the stage that this task belong to.
*/
public abstract int stageId();

/**
* The ID of the RDD partition that is computed by this task.
*/
public abstract int partitionId();

/**
* How many times this task has been attempted. The first task attempt will be assigned
* attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.
*/
public abstract int attemptNumber();

/** @deprecated use {@link #taskAttemptId()}; it was renamed to avoid ambiguity. */
@Deprecated
public abstract long attemptId();

/**
* An ID that is unique to this task attempt (within the same SparkContext, no two task attempts
* will share the same attempt ID). This is roughly equivalent to Hadoop's TaskAttemptID.
*/
public abstract long taskAttemptId();

/** ::DeveloperApi:: */
@DeveloperApi
public abstract TaskMetrics taskMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN
18 changes: 14 additions & 4 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
height: 50px;
font-size: 15px;
margin-bottom: 15px;
min-width: 1200px
}

.navbar .navbar-inner {
Expand All @@ -39,12 +40,12 @@

.navbar .nav > li a {
height: 30px;
line-height: 30px;
line-height: 2;
}

.navbar-text {
height: 50px;
line-height: 50px;
line-height: 3.3;
}

table.sortable thead {
Expand Down Expand Up @@ -120,6 +121,14 @@ pre {
border: none;
}

.description-input {
overflow: hidden;
text-overflow: ellipsis;
width: 100%;
white-space: nowrap;
display: block;
}

.stacktrace-details {
max-height: 300px;
overflow-y: auto;
Expand Down Expand Up @@ -170,7 +179,7 @@ span.additional-metric-title {
}

.version {
line-height: 30px;
line-height: 2.5;
vertical-align: bottom;
font-size: 12px;
padding: 0;
Expand All @@ -181,6 +190,7 @@ span.additional-metric-title {

/* Hide all additional metrics by default. This is done here rather than using JavaScript to
* avoid slow page loads for stage pages with large numbers (e.g., thousands) of tasks. */
.scheduler_delay, .deserialization_time, .serialization_time, .getting_result_time {
.scheduler_delay, .deserialization_time, .fetch_wait_time, .serialization_time,
.getting_result_time {
display: none;
}
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ case class Aggregator[K, V, C] (
// Update task metrics if context is not null
// TODO: Make context non optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
Expand Down Expand Up @@ -95,8 +95,8 @@ case class Aggregator[K, V, C] (
// Update task metrics if context is not null
// TODO: Make context non-optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.addBytesRead(inputMetrics.bytesRead)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
Expand Down
Loading

0 comments on commit 6568ca5

Please sign in to comment.