Skip to content

Commit

Permalink
Merge branch 'master' of git://git.apache.org/spark into SPARK-5339
Browse files Browse the repository at this point in the history
  • Loading branch information
sarutak committed Jan 23, 2015
2 parents 0e012d1 + 820ce03 commit 6e96121
Show file tree
Hide file tree
Showing 108 changed files with 2,604 additions and 3,049 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ log4j.logger.org.eclipse.jetty=WARN
log4j.logger.org.eclipse.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=INFO
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=INFO
log4j.logger.org.apache.hadoop.yarn.util.RackResolver=WARN
8 changes: 8 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,14 @@ pre {
border: none;
}

.description-input {
overflow: hidden;
text-overflow: ellipsis;
width: 100%;
white-space: nowrap;
display: block;
}

.stacktrace-details {
max-height: 300px;
overflow-y: auto;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ private[spark] class ExecutorAllocationManager(
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
if (tasksPerExecutor == 0) {
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.cores")
throw new SparkException("spark.executor.cores must not be less than spark.task.cpus.")
}
}

Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import scala.collection.JavaConverters._
import scala.collection.concurrent.TrieMap
import scala.collection.mutable.{HashMap, LinkedHashSet}
import org.apache.spark.serializer.KryoSerializer

Expand Down Expand Up @@ -46,7 +47,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)

private[spark] val settings = new HashMap[String, String]()
private[spark] val settings = new TrieMap[String, String]()

if (loadDefaults) {
// Load any spark.* system properties
Expand Down Expand Up @@ -177,7 +178,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
}

/** Get all parameters as a list of pairs */
def getAll: Array[(String, String)] = settings.clone().toArray
def getAll: Array[(String, String)] = settings.toArray

/** Get a parameter as an integer, falling back to a default if not set */
def getInt(key: String, defaultValue: Int): Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private[spark] class TaskSchedulerImpl(
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient memory")
"and have sufficient resources")
} else {
this.cancel()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoade
}

override def deserializeStream(s: InputStream): DeserializationStream = {
new JavaDeserializationStream(s, Utils.getContextOrSparkClassLoader)
new JavaDeserializationStream(s, defaultClassLoader)
}

def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ import scala.util.Random
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.scheduler.SchedulingMode

// scalastyle:off
/**
* Continuously generates jobs that expose various features of the WebUI (internal testing tool).
*
* Usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]
* Usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR] [#job set (4 jobs per set)]
*/
// scalastyle:on
private[spark] object UIWorkloadGenerator {

val NUM_PARTITIONS = 100
val INTER_JOB_WAIT_MS = 5000

def main(args: Array[String]) {
if (args.length < 2) {
if (args.length < 3) {
println(
"usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator [master] [FIFO|FAIR]")
"usage: ./bin/spark-class org.apache.spark.ui.UIWorkloadGenerator " +
"[master] [FIFO|FAIR] [#job set (4 jobs per set)]")
System.exit(1)
}

Expand All @@ -45,6 +48,7 @@ private[spark] object UIWorkloadGenerator {
if (schedulingMode == SchedulingMode.FAIR) {
conf.set("spark.scheduler.mode", "FAIR")
}
val nJobSet = args(2).toInt
val sc = new SparkContext(conf)

def setProperties(s: String) = {
Expand Down Expand Up @@ -84,7 +88,7 @@ private[spark] object UIWorkloadGenerator {
("Job with delays", baseData.map(x => Thread.sleep(100)).count)
)

while (true) {
(1 to nJobSet).foreach { _ =>
for ((desc, job) <- jobs) {
new Thread {
override def run() {
Expand All @@ -101,5 +105,6 @@ private[spark] object UIWorkloadGenerator {
Thread.sleep(INTER_JOB_WAIT_MS)
}
}
sc.stop()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[ui] class AllJobsPage(parent: JobsTab) extends WebUIPage("") {
{job.jobId} {job.jobGroup.map(id => s"($id)").getOrElse("")}
</td>
<td>
<div><em>{lastStageDescription}</em></div>
<span class="description-input" title={lastStageDescription}>{lastStageDescription}</span>
<a href={detailUrl}>{lastStageName}</a>
</td>
<td sorttable_customkey={job.submissionTime.getOrElse(-1).toString}>
Expand Down
106 changes: 69 additions & 37 deletions core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,54 +59,86 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
val poolTable = new PoolTable(pools, parent)

val shouldShowActiveStages = activeStages.nonEmpty
val shouldShowPendingStages = pendingStages.nonEmpty
val shouldShowCompletedStages = completedStages.nonEmpty
val shouldShowFailedStages = failedStages.nonEmpty

val summary: NodeSeq =
<div>
<ul class="unstyled">
{if (sc.isDefined) {
// Total duration is not meaningful unless the UI is live
<li>
<strong>Total Duration: </strong>
{UIUtils.formatDuration(now - sc.get.startTime)}
</li>
}}
{
if (sc.isDefined) {
// Total duration is not meaningful unless the UI is live
<li>
<strong>Total Duration: </strong>
{UIUtils.formatDuration(now - sc.get.startTime)}
</li>
}
}
<li>
<strong>Scheduling Mode: </strong>
{listener.schedulingMode.map(_.toString).getOrElse("Unknown")}
</li>
<li>
<a href="#active"><strong>Active Stages:</strong></a>
{activeStages.size}
</li>
<li>
<a href="#pending"><strong>Pending Stages:</strong></a>
{pendingStages.size}
</li>
<li>
<a href="#completed"><strong>Completed Stages:</strong></a>
{numCompletedStages}
</li>
<li>
<a href="#failed"><strong>Failed Stages:</strong></a>
{numFailedStages}
</li>
{
if (shouldShowActiveStages) {
<li>
<a href="#active"><strong>Active Stages:</strong></a>
{activeStages.size}
</li>
}
}
{
if (shouldShowPendingStages) {
<li>
<a href="#pending"><strong>Pending Stages:</strong></a>
{pendingStages.size}
</li>
}
}
{
if (shouldShowCompletedStages) {
<li>
<a href="#completed"><strong>Completed Stages:</strong></a>
{numCompletedStages}
</li>
}
}
{
if (shouldShowFailedStages) {
<li>
<a href="#failed"><strong>Failed Stages:</strong></a>
{numFailedStages}
</li>
}
}
</ul>
</div>

val content = summary ++
{if (sc.isDefined && isFairScheduler) {
<h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
} else {
Seq[Node]()
}} ++
<h4 id="active">Active Stages ({activeStages.size})</h4> ++
activeStagesTable.toNodeSeq ++
<h4 id="pending">Pending Stages ({pendingStages.size})</h4> ++
pendingStagesTable.toNodeSeq ++
<h4 id="completed">Completed Stages ({numCompletedStages})</h4> ++
completedStagesTable.toNodeSeq ++
<h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++
var content = summary ++
{
if (sc.isDefined && isFairScheduler) {
<h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
} else {
Seq[Node]()
}
}
if (shouldShowActiveStages) {
content ++= <h4 id="active">Active Stages ({activeStages.size})</h4> ++
activeStagesTable.toNodeSeq
}
if (shouldShowPendingStages) {
content ++= <h4 id="pending">Pending Stages ({pendingStages.size}</h4> ++
pendingStagesTable.toNodeSeq
}
if (shouldShowCompletedStages) {
content ++= <h4 id="completed">Completed Stages ({numCompletedStages})</h4> ++
completedStagesTable.toNodeSeq
}
if (shouldShowFailedStages) {
content ++= <h4 id ="failed">Failed Stages ({numFailedStages})</h4> ++
failedStagesTable.toNodeSeq

}
UIUtils.headerSparkPage("Spark Stages (for all jobs)", content, parent)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,9 +112,8 @@ private[ui] class StageTableBase(
stageData <- listener.stageIdToData.get((s.stageId, s.attemptId))
desc <- stageData.description
} yield {
<div><em>{desc}</em></div>
<span class="description-input" title={desc}>{desc}</span>
}

<div>{stageDesc.getOrElse("")} {killLink} {nameLink} {details}</div>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ object DenseGmmEM {

for (i <- 0 until clusters.k) {
println("weight=%f\nmu=%s\nsigma=\n%s\n" format
(clusters.weight(i), clusters.mu(i), clusters.sigma(i)))
(clusters.weights(i), clusters.gaussians(i).mu, clusters.gaussians(i).sigma))
}

println("Cluster labels (first <= 100):")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ object GraphGenerators {
// This ensures that the 4 quadrants are the same size at all recursion levels
val numVertices = math.round(
math.pow(2.0, math.ceil(math.log(requestedNumVertices) / math.log(2.0)))).toInt
val numEdgesUpperBound =
math.pow(2.0, 2 * ((math.log(numVertices) / math.log(2.0)) - 1)).toInt
if (numEdgesUpperBound < numEdges) {
throw new IllegalArgumentException(
s"numEdges must be <= $numEdgesUpperBound but was $numEdges")
}
var edges: Set[Edge[Int]] = Set()
while (edges.size < numEdges) {
if (edges.size % 100 == 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,4 +110,14 @@ class GraphGeneratorsSuite extends FunSuite with LocalSparkContext {
}
}

test("SPARK-5064 GraphGenerators.rmatGraph numEdges upper bound") {
withSpark { sc =>
val g1 = GraphGenerators.rmatGraph(sc, 4, 4)
assert(g1.edges.count() === 4)
intercept[IllegalArgumentException] {
val g2 = GraphGenerators.rmatGraph(sc, 4, 8)
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -266,12 +266,16 @@ class PythonMLLibAPI extends Serializable {
k: Int,
maxIterations: Int,
runs: Int,
initializationMode: String): KMeansModel = {
initializationMode: String,
seed: java.lang.Long): KMeansModel = {
val kMeansAlg = new KMeans()
.setK(k)
.setMaxIterations(maxIterations)
.setRuns(runs)
.setInitializationMode(initializationMode)

if (seed != null) kMeansAlg.setSeed(seed)

try {
kMeansAlg.run(data.rdd.persist(StorageLevel.MEMORY_AND_DISK))
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,7 @@ class GaussianMixtureEM private (
// diagonal covariance matrices using component variances
// derived from the samples
val (weights, gaussians) = initialModel match {
case Some(gmm) => (gmm.weight, gmm.mu.zip(gmm.sigma).map { case(mu, sigma) =>
new MultivariateGaussian(mu, sigma)
})
case Some(gmm) => (gmm.weights, gmm.gaussians)

case None => {
val samples = breezeData.takeSample(withReplacement = true, k * nSamples, seed)
Expand Down Expand Up @@ -176,10 +174,7 @@ class GaussianMixtureEM private (
iter += 1
}

// Need to convert the breeze matrices to MLlib matrices
val means = Array.tabulate(k) { i => gaussians(i).mu }
val sigmas = Array.tabulate(k) { i => gaussians(i).sigma }
new GaussianMixtureModel(weights, means, sigmas)
new GaussianMixtureModel(weights, gaussians)
}

/** Average of dense breeze vectors */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.mllib.clustering
import breeze.linalg.{DenseVector => BreezeVector}

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.linalg.{Matrix, Vector}
import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.distribution.MultivariateGaussian
import org.apache.spark.mllib.util.MLUtils

Expand All @@ -36,12 +36,13 @@ import org.apache.spark.mllib.util.MLUtils
* covariance matrix for Gaussian i
*/
class GaussianMixtureModel(
val weight: Array[Double],
val mu: Array[Vector],
val sigma: Array[Matrix]) extends Serializable {
val weights: Array[Double],
val gaussians: Array[MultivariateGaussian]) extends Serializable {

require(weights.length == gaussians.length, "Length of weight and Gaussian arrays must match")

/** Number of gaussians in mixture */
def k: Int = weight.length
def k: Int = weights.length

/** Maps given points to their cluster indices. */
def predict(points: RDD[Vector]): RDD[Int] = {
Expand All @@ -55,14 +56,10 @@ class GaussianMixtureModel(
*/
def predictSoft(points: RDD[Vector]): RDD[Array[Double]] = {
val sc = points.sparkContext
val dists = sc.broadcast {
(0 until k).map { i =>
new MultivariateGaussian(mu(i).toBreeze.toDenseVector, sigma(i).toBreeze.toDenseMatrix)
}.toArray
}
val weights = sc.broadcast(weight)
val bcDists = sc.broadcast(gaussians)
val bcWeights = sc.broadcast(weights)
points.map { x =>
computeSoftAssignments(x.toBreeze.toDenseVector, dists.value, weights.value, k)
computeSoftAssignments(x.toBreeze.toDenseVector, bcDists.value, bcWeights.value, k)
}
}

Expand Down
Loading

0 comments on commit 6e96121

Please sign in to comment.