Skip to content

Commit

Permalink
Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/spark
Browse files Browse the repository at this point in the history
…into numpy

Conflicts:
	python/pyspark/rddsampler.py
  • Loading branch information
Davies Liu committed Nov 19, 2014
2 parents f583023 + 7f22fa8 commit 13f7b05
Show file tree
Hide file tree
Showing 195 changed files with 8,470 additions and 1,994 deletions.
2 changes: 0 additions & 2 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,5 @@ if [[ "$1" =~ \.py$ ]]; then
gatherSparkSubmitOpts "$@"
exec "$FWDIR"/bin/spark-submit "${SUBMISSION_OPTS[@]}" "$primary" "${APPLICATION_OPTS[@]}"
else
# PySpark shell requires special handling downstream
export PYSPARK_SHELL=1
exec "$PYSPARK_DRIVER_PYTHON" $PYSPARK_DRIVER_PYTHON_OPTS
fi
1 change: 0 additions & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ for /f %%i in ('echo %1^| findstr /R "\.py"') do (
)

if [%PYTHON_FILE%] == [] (
set PYSPARK_SHELL=1
if [%IPYTHON%] == [1] (
ipython %IPYTHON_OPTS%
) else (
Expand Down
3 changes: 3 additions & 0 deletions bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
ORIG_ARGS=("$@")

# Set COLUMNS for progress bar
export COLUMNS=`tput cols`

while (($#)); do
if [ "$1" = "--deploy-mode" ]; then
SPARK_SUBMIT_DEPLOY_MODE=$2
Expand Down
9 changes: 8 additions & 1 deletion conf/spark-env.sh.template
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
# - SPARK_YARN_DIST_FILES, Comma separated list of files to be distributed with the job.
# - SPARK_YARN_DIST_ARCHIVES, Comma separated list of archives to be distributed with the job.

# Options for the daemons used in the standalone deploy mode:
# Options for the daemons used in the standalone deploy mode
# - SPARK_MASTER_IP, to bind the master to a different IP address or hostname
# - SPARK_MASTER_PORT / SPARK_MASTER_WEBUI_PORT, to use non-default ports for the master
# - SPARK_MASTER_OPTS, to set config properties only for the master (e.g. "-Dx=y")
Expand All @@ -41,3 +41,10 @@
# - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y")
# - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y")
# - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers

# Generic options for the daemons used in the standalone deploy mode
# - SPARK_CONF_DIR Alternate conf dir. (Default: ${SPARK_HOME}/conf)
# - SPARK_LOG_DIR Where log files are stored. (Default: ${SPARK_HOME}/logs)
# - SPARK_PID_DIR Where the pid file is stored. (Default: /tmp)
# - SPARK_IDENT_STRING A string representing this instance of spark. (Default: $USER)
# - SPARK_NICENESS The scheduling priority for daemons. (Default: 0)
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/spark/SparkStageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
public interface SparkStageInfo {
int stageId();
int currentAttemptId();
long submissionTime();
String name();
int numTasks();
int numActiveTasks();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ package org.apache.spark.api.java
* these interfaces to pass functions to various Java API methods for Spark. Please visit Spark's
* Java programming guide for more details.
*/
package object function
package object function
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,6 @@ $(function() {
// Switch the class of the arrow from open to closed.
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-open');
$(this).find('.expand-additional-metrics-arrow').toggleClass('arrow-closed');

// If clicking caused the metrics to expand, automatically check all options for additional
// metrics (don't trigger a click when collapsing metrics, because it leads to weird
// toggling behavior).
if (!$(additionalMetricsDiv).hasClass('collapsed')) {
$(this).parent().find('input:checkbox:not(:checked)').trigger('click');
}
});

$("input:checkbox:not(:checked)").each(function() {
Expand All @@ -48,6 +41,16 @@ $(function() {
stripeTables();
});

$("#select-all-metrics").click(function() {
if (this.checked) {
// Toggle all un-checked options.
$('input:checkbox:not(:checked)').trigger('click');
} else {
// Toggle all checked options.
$('input:checkbox:checked').trigger('click');
}
});

// Trigger a click on the checkbox if a user clicks the label next to it.
$("span.additional-metric-title").click(function() {
$(this).parent().find('input:checkbox').trigger('click');
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ import org.apache.spark.scheduler._
* the scheduler queue is not drained in N seconds, then new executors are added. If the queue
* persists for another M seconds, then more executors are added and so on. The number added
* in each round increases exponentially from the previous round until an upper bound on the
* number of executors has been reached.
* number of executors has been reached. The upper bound is based both on a configured property
* and on the number of tasks pending: the policy will never increase the number of executor
* requests past the number needed to handle all pending tasks.
*
* The rationale for the exponential increase is twofold: (1) Executors should be added slowly
* in the beginning in case the number of extra executors needed turns out to be small. Otherwise,
Expand Down Expand Up @@ -82,6 +84,12 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// During testing, the methods to actually kill and add executors are mocked out
private val testing = conf.getBoolean("spark.dynamicAllocation.testing", false)

// TODO: The default value of 1 for spark.executor.cores works right now because dynamic
// allocation is only supported for YARN and the default number of cores per executor in YARN is
// 1, but it might need to be attained differently for different cluster managers
private val tasksPerExecutor =
conf.getInt("spark.executor.cores", 1) / conf.getInt("spark.task.cpus", 1)

validateSettings()

// Number of executors to add in the next round
Expand Down Expand Up @@ -110,6 +118,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
// Clock used to schedule when executors should be added and removed
private var clock: Clock = new RealClock

// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener(this)

/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
Expand Down Expand Up @@ -141,6 +152,9 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutor == 0) {
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.cores")
}
}

/**
Expand All @@ -154,7 +168,6 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
* Register for scheduler callbacks to decide when to add and remove executors.
*/
def start(): Unit = {
val listener = new ExecutorAllocationListener(this)
sc.addSparkListener(listener)
startPolling()
}
Expand Down Expand Up @@ -218,13 +231,27 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
return 0
}

// Request executors with respect to the upper bound
val actualNumExecutorsToAdd =
if (numExistingExecutors + numExecutorsToAdd <= maxNumExecutors) {
numExecutorsToAdd
} else {
maxNumExecutors - numExistingExecutors
}
// The number of executors needed to satisfy all pending tasks is the number of tasks pending
// divided by the number of tasks each executor can fit, rounded up.
val maxNumExecutorsPending =
(listener.totalPendingTasks() + tasksPerExecutor - 1) / tasksPerExecutor
if (numExecutorsPending >= maxNumExecutorsPending) {
logDebug(s"Not adding executors because there are already $numExecutorsPending " +
s"pending and pending tasks could only fill $maxNumExecutorsPending")
numExecutorsToAdd = 1
return 0
}

// It's never useful to request more executors than could satisfy all the pending tasks, so
// cap request at that amount.
// Also cap request with respect to the configured upper bound.
val maxNumExecutorsToAdd = math.min(
maxNumExecutorsPending - numExecutorsPending,
maxNumExecutors - numExistingExecutors)
assert(maxNumExecutorsToAdd > 0)

val actualNumExecutorsToAdd = math.min(numExecutorsToAdd, maxNumExecutorsToAdd)

val newTotalExecutors = numExistingExecutors + actualNumExecutorsToAdd
val addRequestAcknowledged = testing || sc.requestExecutors(actualNumExecutorsToAdd)
if (addRequestAcknowledged) {
Expand Down Expand Up @@ -445,6 +472,16 @@ private[spark] class ExecutorAllocationManager(sc: SparkContext) extends Logging
blockManagerRemoved: SparkListenerBlockManagerRemoved): Unit = {
allocationManager.onExecutorRemoved(blockManagerRemoved.blockManagerId.executorId)
}

/**
* An estimate of the total number of pending tasks remaining for currently running stages. Does
* not account for tasks which may have failed and been resubmitted.
*/
def totalPendingTasks(): Int = {
stageIdToNumTasks.map { case (stageId, numTasks) =>
numTasks - stageIdToTaskIndices.get(stageId).map(_.size).getOrElse(0)
}.sum
}
}

}
Expand Down
Loading

0 comments on commit 13f7b05

Please sign in to comment.