Skip to content

Commit

Permalink
fixed merge conflicts and set preference to use the diff(other: Verte…
Browse files Browse the repository at this point in the history
…xRDD[VD]) method
  • Loading branch information
Brennon York committed Feb 25, 2015
2 parents 2c678c6 + 9f603fc commit 753c963
Show file tree
Hide file tree
Showing 66 changed files with 1,441 additions and 954 deletions.
9 changes: 9 additions & 0 deletions conf/metrics.properties.template
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@

#worker.sink.csv.unit=minutes

# Enable Slf4jSink for all instances by class name
#*.sink.slf4j.class=org.apache.spark.metrics.sink.Slf4jSink

# Polling period for Slf4JSink
#*.sink.sl4j.period=1

#*.sink.sl4j.unit=minutes


# Enable jvm source for instance master, worker, driver and executor
#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ private[spark] class ApplicationInfo(
}
}

private val myMaxCores = desc.maxCores.getOrElse(defaultCores)
val requestedCores = desc.maxCores.getOrElse(defaultCores)

def coresLeft: Int = myMaxCores - coresGranted
def coresLeft: Int = requestedCores - coresGranted

private var _retryCount = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,16 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
val workers = state.workers.sortBy(_.id)
val workerTable = UIUtils.listingTable(workerHeaders, workerRow, workers)

val appHeaders = Seq("Application ID", "Name", "Cores", "Memory per Node", "Submitted Time",
"User", "State", "Duration")
val activeAppHeaders = Seq("Application ID", "Name", "Cores in Use",
"Cores Requested", "Memory per Node", "Submitted Time", "User", "State", "Duration")
val activeApps = state.activeApps.sortBy(_.startTime).reverse
val activeAppsTable = UIUtils.listingTable(appHeaders, appRow, activeApps)
val activeAppsTable = UIUtils.listingTable(activeAppHeaders, activeAppRow, activeApps)

val completedAppHeaders = Seq("Application ID", "Name", "Cores Requested", "Memory per Node",
"Submitted Time", "User", "State", "Duration")
val completedApps = state.completedApps.sortBy(_.endTime).reverse
val completedAppsTable = UIUtils.listingTable(appHeaders, appRow, completedApps)
val completedAppsTable = UIUtils.listingTable(completedAppHeaders, completeAppRow,
completedApps)

val driverHeaders = Seq("Submission ID", "Submitted Time", "Worker", "State", "Cores",
"Memory", "Main Class")
Expand Down Expand Up @@ -162,16 +166,23 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
</tr>
}

private def appRow(app: ApplicationInfo): Seq[Node] = {
private def appRow(app: ApplicationInfo, active: Boolean): Seq[Node] = {
<tr>
<td>
<a href={"app?appId=" + app.id}>{app.id}</a>
</td>
<td>
<a href={app.desc.appUiUrl}>{app.desc.name}</a>
</td>
{
if (active) {
<td>
{app.coresGranted}
</td>
}
}
<td>
{app.coresGranted}
{app.requestedCores}
</td>
<td sorttable_customkey={app.desc.memoryPerSlave.toString}>
{Utils.megabytesToString(app.desc.memoryPerSlave)}
Expand All @@ -183,6 +194,14 @@ private[spark] class MasterPage(parent: MasterWebUI) extends WebUIPage("") {
</tr>
}

private def activeAppRow(app: ApplicationInfo): Seq[Node] = {
appRow(app, active = true)
}

private def completeAppRow(app: ApplicationInfo): Seq[Node] = {
appRow(app, active = false)
}

private def driverRow(driver: DriverInfo): Seq[Node] = {
<tr>
<td>{driver.id} </td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ private[spark] class WorkerPage(parent: WorkerWebUI) extends WebUIPage("") {
def driverRow(driver: DriverRunner): Seq[Node] = {
<tr>
<td>{driver.driverId}</td>
<td>{driver.driverDesc.command.arguments(1)}</td>
<td>{driver.driverDesc.command.arguments(2)}</td>
<td>{driver.finalState.getOrElse(DriverState.RUNNING)}</td>
<td sorttable_customkey={driver.driverDesc.cores.toString}>
{driver.driverDesc.cores.toString}
Expand Down
68 changes: 68 additions & 0 deletions core/src/main/scala/org/apache/spark/metrics/sink/Slf4jSink.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.metrics.sink

import java.util.Properties
import java.util.concurrent.TimeUnit

import com.codahale.metrics.{Slf4jReporter, MetricRegistry}

import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem

private[spark] class Slf4jSink(
val property: Properties,
val registry: MetricRegistry,
securityMgr: SecurityManager)
extends Sink {
val SLF4J_DEFAULT_PERIOD = 10
val SLF4J_DEFAULT_UNIT = "SECONDS"

val SLF4J_KEY_PERIOD = "period"
val SLF4J_KEY_UNIT = "unit"

val pollPeriod = Option(property.getProperty(SLF4J_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => SLF4J_DEFAULT_PERIOD
}

val pollUnit: TimeUnit = Option(property.getProperty(SLF4J_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(SLF4J_DEFAULT_UNIT)
}

MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

val reporter: Slf4jReporter = Slf4jReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()

override def start() {
reporter.start(pollPeriod, pollUnit)
}

override def stop() {
reporter.stop()
}

override def report() {
reporter.report()
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
for (stageId <- jobData.stageIds) {
stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
jobsUsingStage.remove(jobEnd.jobId)
if (jobsUsingStage.isEmpty) {
stageIdToActiveJobIds.remove(stageId)
}
stageIdToInfo.get(stageId).foreach { stageInfo =>
if (stageInfo.submissionTime.isEmpty) {
// if this stage is pending, it won't complete, so mark it as "skipped":
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -696,7 +696,7 @@ private[spark] object Utils extends Logging {
try {
val rootDir = new File(root)
if (rootDir.exists || rootDir.mkdirs()) {
val dir = createDirectory(root)
val dir = createTempDir(root)
chmod700(dir)
Some(dir.getAbsolutePath)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,28 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc
listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
}

test("test clearing of stageIdToActiveJobs") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
val listener = new JobProgressListener(conf)
val jobId = 0
val stageIds = 1 to 50
// Start a job with 50 stages
listener.onJobStart(createJobStartEvent(jobId, stageIds))
for (stageId <- stageIds) {
listener.onStageSubmitted(createStageStartEvent(stageId))
}
listener.stageIdToActiveJobIds.size should be > 0

// Complete the stages and job
for (stageId <- stageIds) {
listener.onStageCompleted(createStageEndEvent(stageId, failed = false))
}
listener.onJobEnd(createJobEndEvent(jobId, false))
assertActiveJobsStateIsEmpty(listener)
listener.stageIdToActiveJobIds.size should be (0)
}

test("test LRU eviction of jobs") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
Expand Down
1 change: 1 addition & 0 deletions docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ kramdown:

include:
- _static
- _modules

# These allow the documentation to be updated with newer releases
# of Spark, Scala, and Mesos.
Expand Down
4 changes: 3 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,9 @@ Apart from these, the following properties are also available, and may be useful
<td>5</td>
<td>
(Netty only) Seconds to wait between retries of fetches. The maximum delay caused by retrying
is simply <code>maxRetries * retryWait</code>, by default 15 seconds.
is simply <code>maxRetries * retryWait</code>. The default maximum delay is therefore
15 seconds, because the default value of <code>maxRetries</code> is 3, and the default
<code>retryWait</code> here is 5 seconds.
</td>
</tr>
</table>
Expand Down
8 changes: 4 additions & 4 deletions docs/graphx-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ val joinedGraph = graph.joinVertices(uniqueCosts,

## Neighborhood Aggregation

A key step in may graph analytics tasks is aggregating information about the neighborhood of each
A key step in many graph analytics tasks is aggregating information about the neighborhood of each
vertex.
For example, we might want to know the number of followers each user has or the average age of the
the followers of each user. Many iterative graph algorithms (e.g., PageRank, Shortest Path, and
Expand Down Expand Up @@ -634,7 +634,7 @@ avgAgeOfOlderFollowers.collect.foreach(println(_))

### Map Reduce Triplets Transition Guide (Legacy)

In earlier versions of GraphX we neighborhood aggregation was accomplished using the
In earlier versions of GraphX neighborhood aggregation was accomplished using the
[`mapReduceTriplets`][Graph.mapReduceTriplets] operator:

{% highlight scala %}
Expand Down Expand Up @@ -682,8 +682,8 @@ val result = graph.aggregateMessages[String](msgFun, reduceFun)
### Computing Degree Information

A common aggregation task is computing the degree of each vertex: the number of edges adjacent to
each vertex. In the context of directed graphs it often necessary to know the in-degree, out-
degree, and the total degree of each vertex. The [`GraphOps`][GraphOps] class contains a
each vertex. In the context of directed graphs it is often necessary to know the in-degree,
out-degree, and the total degree of each vertex. The [`GraphOps`][GraphOps] class contains a
collection of operators to compute the degrees of each vertex. For example in the following we
compute the max in, out, and total degrees:

Expand Down
41 changes: 15 additions & 26 deletions docs/mllib-data-types.md
Original file line number Diff line number Diff line change
Expand Up @@ -298,23 +298,22 @@ In general the use of non-deterministic RDDs can lead to errors.

### BlockMatrix

A `BlockMatrix` is a distributed matrix backed by an RDD of `MatrixBlock`s, where `MatrixBlock` is
A `BlockMatrix` is a distributed matrix backed by an RDD of `MatrixBlock`s, where a `MatrixBlock` is
a tuple of `((Int, Int), Matrix)`, where the `(Int, Int)` is the index of the block, and `Matrix` is
the sub-matrix at the given index with size `rowsPerBlock` x `colsPerBlock`.
`BlockMatrix` supports methods such as `.add` and `.multiply` with another `BlockMatrix`.
`BlockMatrix` also has a helper function `.validate` which can be used to debug whether the
`BlockMatrix` supports methods such as `add` and `multiply` with another `BlockMatrix`.
`BlockMatrix` also has a helper function `validate` which can be used to check whether the
`BlockMatrix` is set up properly.

<div class="codetabs">
<div data-lang="scala" markdown="1">

A [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) can be
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` using `.toBlockMatrix()`.
`.toBlockMatrix()` will create blocks of size 1024 x 1024. Users may change the sizes of their blocks
by supplying the values through `.toBlockMatrix(rowsPerBlock, colsPerBlock)`.
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` by calling `toBlockMatrix`.
`toBlockMatrix` creates blocks of size 1024 x 1024 by default.
Users may change the block size by supplying the values through `toBlockMatrix(rowsPerBlock, colsPerBlock)`.

{% highlight scala %}
import org.apache.spark.mllib.linalg.SingularValueDecomposition
import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}

val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
Expand All @@ -323,29 +322,24 @@ val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()

// validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate
matA.validate()

// Calculate A^T A.
val AtransposeA = matA.transpose.multiply(matA)

// get SVD of 2 * A
val A2 = matA.add(matA)
val svd = A2.toIndexedRowMatrix().computeSVD(20, false, 1e-9)
val ata = matA.transpose.multiply(matA)
{% endhighlight %}
</div>

<div data-lang="java" markdown="1">

A [`BlockMatrix`](api/scala/index.html#org.apache.spark.mllib.linalg.distributed.BlockMatrix) can be
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` using `.toBlockMatrix()`.
`.toBlockMatrix()` will create blocks of size 1024 x 1024. Users may change the sizes of their blocks
by supplying the values through `.toBlockMatrix(rowsPerBlock, colsPerBlock)`.
A [`BlockMatrix`](api/java/org/apache/spark/mllib/linalg/distributed/BlockMatrix.html) can be
most easily created from an `IndexedRowMatrix` or `CoordinateMatrix` by calling `toBlockMatrix`.
`toBlockMatrix` creates blocks of size 1024 x 1024 by default.
Users may change the block size by supplying the values through `toBlockMatrix(rowsPerBlock, colsPerBlock)`.

{% highlight java %}
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.mllib.linalg.SingularValueDecomposition;
import org.apache.spark.mllib.linalg.distributed.BlockMatrix;
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix;
import org.apache.spark.mllib.linalg.distributed.IndexedRowMatrix;
Expand All @@ -356,17 +350,12 @@ CoordinateMatrix coordMat = new CoordinateMatrix(entries.rdd());
// Transform the CoordinateMatrix to a BlockMatrix
BlockMatrix matA = coordMat.toBlockMatrix().cache();

// validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
matA.validate();

// Calculate A^T A.
BlockMatrix AtransposeA = matA.transpose().multiply(matA);

// get SVD of 2 * A
BlockMatrix A2 = matA.add(matA);
SingularValueDecomposition<IndexedRowMatrix, Matrix> svd =
A2.toIndexedRowMatrix().computeSVD(20, false, 1e-9);
BlockMatrix ata = matA.transpose().multiply(matA);
{% endhighlight %}
</div>
</div>
Expand Down
4 changes: 2 additions & 2 deletions docs/mllib-decision-tree.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ impurity measure for regression (variance).
<tr>
<td>Variance</td>
<td>Regression</td>
<td>$\frac{1}{N} \sum_{i=1}^{N} (x_i - \mu)^2$</td><td>$y_i$ is label for an instance,
$N$ is the number of instances and $\mu$ is the mean given by $\frac{1}{N} \sum_{i=1}^N x_i$.</td>
<td>$\frac{1}{N} \sum_{i=1}^{N} (y_i - \mu)^2$</td><td>$y_i$ is label for an instance,
$N$ is the number of instances and $\mu$ is the mean given by $\frac{1}{N} \sum_{i=1}^N y_i$.</td>
</tr>
</tbody>
</table>
Expand Down
11 changes: 11 additions & 0 deletions docs/mllib-ensembles.md
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,17 @@ We omit some decision tree parameters since those are covered in the [decision t

* **`algo`**: The algorithm or task (classification vs. regression) is set using the tree [Strategy] parameter.

#### Validation while training

Gradient boosting can overfit when trained with more trees. In order to prevent overfitting, it is useful to validate while
training. The method runWithValidation has been provided to make use of this option. It takes a pair of RDD's as arguments, the
first one being the training dataset and the second being the validation dataset.

The training is stopped when the improvement in the validation error is not more than a certain tolerance
(supplied by the `validationTol` argument in `BoostingStrategy`). In practice, the validation error
decreases initially and later increases. There might be cases in which the validation error does not change monotonically,
and the user is advised to set a large enough negative tolerance and examine the validation curve to to tune the number of
iterations.

### Examples

Expand Down
Loading

0 comments on commit 753c963

Please sign in to comment.