Skip to content
This repository has been archived by the owner on Nov 30, 2019. It is now read-only.

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into SPARK-5188
Browse files Browse the repository at this point in the history
  • Loading branch information
sarutak committed Jan 20, 2015
2 parents 7cc8255 + bc20a52 commit 3caa4cb
Show file tree
Hide file tree
Showing 366 changed files with 6,666 additions and 5,722 deletions.
27 changes: 15 additions & 12 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -72,22 +72,25 @@ else
assembly_folder="$ASSEMBLY_DIR"
fi

num_jars="$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar$" | wc -l)"
if [ "$num_jars" -eq "0" ]; then
echo "Failed to find Spark assembly in $assembly_folder"
echo "You need to build Spark before running this program."
exit 1
fi
num_jars=0

for f in ${assembly_folder}/spark-assembly*hadoop*.jar; do
if [[ ! -e "$f" ]]; then
echo "Failed to find Spark assembly in $assembly_folder" 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
ASSEMBLY_JAR="$f"
num_jars=$((num_jars+1))
done

if [ "$num_jars" -gt "1" ]; then
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar$")
echo "Found multiple Spark assembly jars in $assembly_folder:"
echo "$jars_list"
echo "Please remove all but one jar."
echo "Found multiple Spark assembly jars in $assembly_folder:" 1>&2
ls ${assembly_folder}/spark-assembly*hadoop*.jar 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi

ASSEMBLY_JAR="$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)"

# Verify that versions of java used to build the jars and run Spark are compatible
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
Expand Down
27 changes: 21 additions & 6 deletions bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,32 @@ else
fi

if [ -f "$FWDIR/RELEASE" ]; then
export SPARK_EXAMPLES_JAR="`ls "$FWDIR"/lib/spark-examples-*hadoop*.jar`"
elif [ -e "$EXAMPLES_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-examples-*hadoop*.jar ]; then
export SPARK_EXAMPLES_JAR="`ls "$EXAMPLES_DIR"/target/scala-$SPARK_SCALA_VERSION/spark-examples-*hadoop*.jar`"
JAR_PATH="${FWDIR}/lib"
else
JAR_PATH="${EXAMPLES_DIR}/target/scala-${SPARK_SCALA_VERSION}"
fi

if [[ -z "$SPARK_EXAMPLES_JAR" ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
JAR_COUNT=0

for f in ${JAR_PATH}/spark-examples-*hadoop*.jar; do
if [[ ! -e "$f" ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
exit 1
fi
SPARK_EXAMPLES_JAR="$f"
JAR_COUNT=$((JAR_COUNT+1))
done

if [ "$JAR_COUNT" -gt "1" ]; then
echo "Found multiple Spark examples assembly jars in ${JAR_PATH}" 1>&2
ls ${JAR_PATH}/spark-examples-*hadoop*.jar 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi

export SPARK_EXAMPLES_JAR

EXAMPLE_MASTER=${MASTER:-"local[*]"}

if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
Expand Down
4 changes: 3 additions & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ case "$1" in
'org.apache.spark.executor.MesosExecutorBackend')
OUR_JAVA_OPTS="$SPARK_JAVA_OPTS $SPARK_EXECUTOR_OPTS"
OUR_JAVA_MEM=${SPARK_EXECUTOR_MEMORY:-$DEFAULT_MEM}
export PYTHONPATH="$FWDIR/python:$PYTHONPATH"
export PYTHONPATH="$FWDIR/python/lib/py4j-0.8.2.1-src.zip:$PYTHONPATH"
;;

# Spark submit uses SPARK_JAVA_OPTS + SPARK_SUBMIT_OPTS +
Expand Down Expand Up @@ -148,7 +150,7 @@ fi
if [[ "$1" =~ org.apache.spark.tools.* ]]; then
if test -z "$SPARK_TOOLS_JAR"; then
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2
echo "You need to build Spark before running $1." 1>&2
echo "You need to run \"build/sbt tools/package\" before running $1." 1>&2
exit 1
fi
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
Expand Down
5 changes: 4 additions & 1 deletion bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ while (($#)); do
shift
done

DEFAULT_PROPERTIES_FILE="$SPARK_HOME/conf/spark-defaults.conf"
if [ -z "$SPARK_CONF_DIR" ]; then
export SPARK_CONF_DIR="$SPARK_HOME/conf"
fi
DEFAULT_PROPERTIES_FILE="$SPARK_CONF_DIR/spark-defaults.conf"
if [ "$MASTER" == "yarn-cluster" ]; then
SPARK_SUBMIT_DEPLOY_MODE=cluster
fi
Expand Down
6 changes: 5 additions & 1 deletion bin/spark-submit2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ set ORIG_ARGS=%*

rem Reset the values of all variables used
set SPARK_SUBMIT_DEPLOY_MODE=client
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_HOME%\conf\spark-defaults.conf

if not defined %SPARK_CONF_DIR% (
set SPARK_CONF_DIR=%SPARK_HOME%\conf
)
set SPARK_SUBMIT_PROPERTIES_FILE=%SPARK_CONF_DIR%\spark-defaults.conf
set SPARK_SUBMIT_DRIVER_MEMORY=
set SPARK_SUBMIT_LIBRARY_PATH=
set SPARK_SUBMIT_CLASSPATH=
Expand Down
97 changes: 97 additions & 0 deletions core/src/main/java/org/apache/spark/JavaSparkListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;

/**
* Java clients should extend this class instead of implementing
* SparkListener directly. This is to prevent java clients
* from breaking when new events are added to the SparkListener
* trait.
*
* This is a concrete class instead of abstract to enforce
* new events get added to both the SparkListener and this adapter
* in lockstep.
*/
public class JavaSparkListener implements SparkListener {

@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }

@Override
public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }

@Override
public void onTaskStart(SparkListenerTaskStart taskStart) { }

@Override
public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }

@Override
public void onTaskEnd(SparkListenerTaskEnd taskEnd) { }

@Override
public void onJobStart(SparkListenerJobStart jobStart) { }

@Override
public void onJobEnd(SparkListenerJobEnd jobEnd) { }

@Override
public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }

@Override
public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }

@Override
public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }

@Override
public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }

@Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }

@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }

@Override
public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }

@Override
public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }

@Override
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
}
24 changes: 22 additions & 2 deletions core/src/main/java/org/apache/spark/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static void unset() {
*/
public abstract boolean isInterrupted();

/** @deprecated: use isRunningLocally() */
/** @deprecated use {@link #isRunningLocally()} */
@Deprecated
public abstract boolean runningLocally();

Expand All @@ -87,19 +87,39 @@ static void unset() {
* is for HadoopRDD to register a callback to close the input stream.
* Will be called in any situation - success, failure, or cancellation.
*
* @deprecated: use addTaskCompletionListener
* @deprecated use {@link #addTaskCompletionListener(scala.Function1)}
*
* @param f Callback function.
*/
@Deprecated
public abstract void addOnCompleteCallback(final Function0<Unit> f);

/**
* The ID of the stage that this task belong to.
*/
public abstract int stageId();

/**
* The ID of the RDD partition that is computed by this task.
*/
public abstract int partitionId();

/**
* How many times this task has been attempted. The first task attempt will be assigned
* attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.
*/
public abstract int attemptNumber();

/** @deprecated use {@link #taskAttemptId()}; it was renamed to avoid ambiguity. */
@Deprecated
public abstract long attemptId();

/**
* An ID that is unique to this task attempt (within the same SparkContext, no two task attempts
* will share the same attempt ID). This is roughly equivalent to Hadoop's TaskAttemptID.
*/
public abstract long taskAttemptId();

/** ::DeveloperApi:: */
@DeveloperApi
public abstract TaskMetrics taskMetrics();
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
height: 50px;
font-size: 15px;
margin-bottom: 15px;
min-width: 1200px
}

.navbar .navbar-inner {
Expand All @@ -39,12 +40,12 @@

.navbar .nav > li a {
height: 30px;
line-height: 30px;
line-height: 2;
}

.navbar-text {
height: 50px;
line-height: 50px;
line-height: 3.3;
}

table.sortable thead {
Expand Down Expand Up @@ -170,7 +171,7 @@ span.additional-metric-title {
}

.version {
line-height: 30px;
line-height: 2.5;
vertical-align: bottom;
font-size: 12px;
padding: 0;
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ case class Aggregator[K, V, C] (
// Update task metrics if context is not null
// TODO: Make context non optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
Expand Down Expand Up @@ -95,8 +95,8 @@ case class Aggregator[K, V, C] (
// Update task metrics if context is not null
// TODO: Make context non-optional in a future release
Option(context).foreach { c =>
c.taskMetrics.memoryBytesSpilled += combiners.memoryBytesSpilled
c.taskMetrics.diskBytesSpilled += combiners.diskBytesSpilled
c.taskMetrics.incMemoryBytesSpilled(combiners.memoryBytesSpilled)
c.taskMetrics.incDiskBytesSpilled(combiners.diskBytesSpilled)
}
combiners.iterator
}
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.addBytesRead(inputMetrics.bytesRead)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
Expand Down
Loading

0 comments on commit 3caa4cb

Please sign in to comment.