Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into SPARK-3278
Browse files Browse the repository at this point in the history
  • Loading branch information
zapletal-martin committed Jan 17, 2015
2 parents 0d14bd3 + f3bfc76 commit f90c8c7
Show file tree
Hide file tree
Showing 314 changed files with 4,723 additions and 4,893 deletions.
2 changes: 1 addition & 1 deletion bin/spark-class
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ fi
if [[ "$1" =~ org.apache.spark.tools.* ]]; then
if test -z "$SPARK_TOOLS_JAR"; then
echo "Failed to find Spark Tools Jar in $FWDIR/tools/target/scala-$SPARK_SCALA_VERSION/" 1>&2
echo "You need to build Spark before running $1." 1>&2
echo "You need to run \"build/sbt tools/package\" before running $1." 1>&2
exit 1
fi
CLASSPATH="$CLASSPATH:$SPARK_TOOLS_JAR"
Expand Down
97 changes: 97 additions & 0 deletions core/src/main/java/org/apache/spark/JavaSparkListener.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import org.apache.spark.scheduler.SparkListener;
import org.apache.spark.scheduler.SparkListenerApplicationEnd;
import org.apache.spark.scheduler.SparkListenerApplicationStart;
import org.apache.spark.scheduler.SparkListenerBlockManagerAdded;
import org.apache.spark.scheduler.SparkListenerBlockManagerRemoved;
import org.apache.spark.scheduler.SparkListenerEnvironmentUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorAdded;
import org.apache.spark.scheduler.SparkListenerExecutorMetricsUpdate;
import org.apache.spark.scheduler.SparkListenerExecutorRemoved;
import org.apache.spark.scheduler.SparkListenerJobEnd;
import org.apache.spark.scheduler.SparkListenerJobStart;
import org.apache.spark.scheduler.SparkListenerStageCompleted;
import org.apache.spark.scheduler.SparkListenerStageSubmitted;
import org.apache.spark.scheduler.SparkListenerTaskEnd;
import org.apache.spark.scheduler.SparkListenerTaskGettingResult;
import org.apache.spark.scheduler.SparkListenerTaskStart;
import org.apache.spark.scheduler.SparkListenerUnpersistRDD;

/**
* Java clients should extend this class instead of implementing
* SparkListener directly. This is to prevent java clients
* from breaking when new events are added to the SparkListener
* trait.
*
* This is a concrete class instead of abstract to enforce
* new events get added to both the SparkListener and this adapter
* in lockstep.
*/
public class JavaSparkListener implements SparkListener {

@Override
public void onStageCompleted(SparkListenerStageCompleted stageCompleted) { }

@Override
public void onStageSubmitted(SparkListenerStageSubmitted stageSubmitted) { }

@Override
public void onTaskStart(SparkListenerTaskStart taskStart) { }

@Override
public void onTaskGettingResult(SparkListenerTaskGettingResult taskGettingResult) { }

@Override
public void onTaskEnd(SparkListenerTaskEnd taskEnd) { }

@Override
public void onJobStart(SparkListenerJobStart jobStart) { }

@Override
public void onJobEnd(SparkListenerJobEnd jobEnd) { }

@Override
public void onEnvironmentUpdate(SparkListenerEnvironmentUpdate environmentUpdate) { }

@Override
public void onBlockManagerAdded(SparkListenerBlockManagerAdded blockManagerAdded) { }

@Override
public void onBlockManagerRemoved(SparkListenerBlockManagerRemoved blockManagerRemoved) { }

@Override
public void onUnpersistRDD(SparkListenerUnpersistRDD unpersistRDD) { }

@Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) { }

@Override
public void onApplicationEnd(SparkListenerApplicationEnd applicationEnd) { }

@Override
public void onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate executorMetricsUpdate) { }

@Override
public void onExecutorAdded(SparkListenerExecutorAdded executorAdded) { }

@Override
public void onExecutorRemoved(SparkListenerExecutorRemoved executorRemoved) { }
}
24 changes: 22 additions & 2 deletions core/src/main/java/org/apache/spark/TaskContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static void unset() {
*/
public abstract boolean isInterrupted();

/** @deprecated: use isRunningLocally() */
/** @deprecated use {@link #isRunningLocally()} */
@Deprecated
public abstract boolean runningLocally();

Expand All @@ -87,19 +87,39 @@ static void unset() {
* is for HadoopRDD to register a callback to close the input stream.
* Will be called in any situation - success, failure, or cancellation.
*
* @deprecated: use addTaskCompletionListener
* @deprecated use {@link #addTaskCompletionListener(scala.Function1)}
*
* @param f Callback function.
*/
@Deprecated
public abstract void addOnCompleteCallback(final Function0<Unit> f);

/**
* The ID of the stage that this task belong to.
*/
public abstract int stageId();

/**
* The ID of the RDD partition that is computed by this task.
*/
public abstract int partitionId();

/**
* How many times this task has been attempted. The first task attempt will be assigned
* attemptNumber = 0, and subsequent attempts will have increasing attempt numbers.
*/
public abstract int attemptNumber();

/** @deprecated use {@link #taskAttemptId()}; it was renamed to avoid ambiguity. */
@Deprecated
public abstract long attemptId();

/**
* An ID that is unique to this task attempt (within the same SparkContext, no two task attempts
* will share the same attempt ID). This is roughly equivalent to Hadoop's TaskAttemptID.
*/
public abstract long taskAttemptId();

/** ::DeveloperApi:: */
@DeveloperApi
public abstract TaskMetrics taskMetrics();
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
height: 50px;
font-size: 15px;
margin-bottom: 15px;
min-width: 1200px
}

.navbar .navbar-inner {
Expand All @@ -39,12 +40,12 @@

.navbar .nav > li a {
height: 30px;
line-height: 30px;
line-height: 2;
}

.navbar-text {
height: 50px;
line-height: 50px;
line-height: 3.3;
}

table.sortable thead {
Expand Down Expand Up @@ -170,7 +171,7 @@ span.additional-metric-title {
}

.version {
line-height: 30px;
line-height: 2.5;
vertical-align: bottom;
font-size: 12px;
padding: 0;
Expand Down
6 changes: 5 additions & 1 deletion core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.addBytesRead(inputMetrics.bytesRead)

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
Expand Down
Loading

0 comments on commit f90c8c7

Please sign in to comment.