Skip to content

Commit

Permalink
Merge branch 'master' of github.com:apache/spark into dag-viz-streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
Andrew Or committed May 14, 2015
2 parents 5703939 + 7fb715d commit 5c30360
Show file tree
Hide file tree
Showing 74 changed files with 665 additions and 396 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,41 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)

/** Return the graph metadata for the given stage, or None if no such information exists. */
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = {
val stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
val graphs = stageIds.flatMap { sid => stageIdToGraph.get(sid) }
def getOperationGraphForJob(jobId: Int): Seq[RDDOperationGraph] = synchronized {
val _stageIds = jobIdToStageIds.get(jobId).getOrElse { Seq.empty }
val graphs = _stageIds.flatMap { sid => stageIdToGraph.get(sid) }
// If the metadata for some stages have been removed, do not bother rendering this job
if (stageIds.size != graphs.size) {
if (_stageIds.size != graphs.size) {
Seq.empty
} else {
graphs
}
}

/** Return the graph metadata for the given stage, or None if no such information exists. */
def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = {
def getOperationGraphForStage(stageId: Int): Option[RDDOperationGraph] = synchronized {
stageIdToGraph.get(stageId)
}

/** On job start, construct a RDDOperationGraph for each stage in the job for display later. */
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
val jobId = jobStart.jobId
val stageInfos = jobStart.stageInfos

jobIds += jobId
jobIdToStageIds(jobId) = jobStart.stageInfos.map(_.stageId).sorted

stageInfos.foreach { stageInfo =>
stageIds += stageInfo.stageId
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
// Remove state for old stages
if (stageIds.size >= retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
stageIds.trimStart(toRemove)
}
}

// Remove state for old jobs
if (jobIds.size >= retainedJobs) {
val toRemove = math.max(retainedJobs / 10, 1)
Expand All @@ -71,15 +84,4 @@ private[ui] class RDDOperationGraphListener(conf: SparkConf) extends SparkListen
}
}

/** Remove graph metadata for old stages */
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val stageInfo = stageSubmitted.stageInfo
stageIds += stageInfo.stageId
stageIdToGraph(stageInfo.stageId) = RDDOperationGraph.makeOperationGraph(stageInfo)
if (stageIds.size >= retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stageIds.take(toRemove).foreach { id => stageIdToGraph.remove(id) }
stageIds.trimStart(toRemove)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ class RDDOperationGraphListenerSuite extends FunSuite {
assert(numStages > 0, "I will not run a job with 0 stages for you.")
val stageInfos = (0 until numStages).map { _ =>
val stageInfo = new StageInfo(stageIdCounter, 0, "s", 0, Seq.empty, Seq.empty, "d")
listener.onStageSubmitted(new SparkListenerStageSubmitted(stageInfo))
stageIdCounter += 1
stageInfo
}
Expand Down
14 changes: 7 additions & 7 deletions dev/create-release/create-release.sh
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,14 @@ if [[ ! "$@" =~ --skip-publish ]]; then

rm -rf $SPARK_REPO

build/mvn -DskipTests -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-Pyarn -Phive -Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
build/mvn -DskipTests -Pyarn -Phive \
-Phive-thriftserver -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
clean install

./dev/change-version-to-2.11.sh

build/mvn -DskipTests -Dhadoop.version=2.2.0 -Dyarn.version=2.2.0 \
-Dscala-2.11 -Pyarn -Phive -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
build/mvn -DskipTests -Pyarn -Phive \
-Dscala-2.11 -Phadoop-2.2 -Pspark-ganglia-lgpl -Pkinesis-asl \
clean install

./dev/change-version-to-2.10.sh
Expand Down Expand Up @@ -228,9 +228,9 @@ if [[ ! "$@" =~ --skip-package ]]; then

# We increment the Zinc port each time to avoid OOM's and other craziness if multiple builds
# share the same Zinc server.
make_binary_release "hadoop1" "-Phive -Phive-thriftserver -Dhadoop.version=1.0.4" "3030" &
make_binary_release "hadoop1-scala2.11" "-Phive -Dscala-2.11" "3031" &
make_binary_release "cdh4" "-Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0" "3032" &
make_binary_release "hadoop1" "-Phadoop-1 -Phive -Phive-thriftserver" "3030" &
make_binary_release "hadoop1-scala2.11" "-Phadoop-1 -Phive -Dscala-2.11" "3031" &
make_binary_release "cdh4" "-Phadoop-1 -Phive -Phive-thriftserver -Dhadoop.version=2.0.0-mr1-cdh4.2.0" "3032" &
make_binary_release "hadoop2.3" "-Phadoop-2.3 -Phive -Phive-thriftserver -Pyarn" "3033" &
make_binary_release "hadoop2.4" "-Phadoop-2.4 -Phive -Phive-thriftserver -Pyarn" "3034" &
make_binary_release "mapr3" "-Pmapr3 -Phive -Phive-thriftserver" "3035" &
Expand Down
3 changes: 1 addition & 2 deletions dev/merge_spark_pr.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,9 @@ def get_version_json(version_str):

resolve = filter(lambda a: a['name'] == "Resolve Issue", asf_jira.transitions(jira_id))[0]
resolution = filter(lambda r: r.raw['name'] == "Fixed", asf_jira.resolutions())[0]
custom_fields = {'resolution': {'id': resolution.raw['id']}}
asf_jira.transition_issue(
jira_id, resolve["id"], fixVersions = jira_fix_versions,
comment = comment, fields = custom_fields)
comment = comment, resolution = {'id': resolution.raw['id']})

print "Successfully resolved %s with fixVersions=%s!" % (jira_id, fix_versions)

Expand Down
6 changes: 3 additions & 3 deletions dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ function handle_error () {
{
if [ -n "$AMPLAB_JENKINS_BUILD_PROFILE" ]; then
if [ "$AMPLAB_JENKINS_BUILD_PROFILE" = "hadoop1.0" ]; then
export SBT_MAVEN_PROFILES_ARGS="-Dhadoop.version=1.0.4"
export SBT_MAVEN_PROFILES_ARGS="-Phadoop-1 -Dhadoop.version=1.0.4"
elif [ "$AMPLAB_JENKINS_BUILD_PROFILE" = "hadoop2.0" ]; then
export SBT_MAVEN_PROFILES_ARGS="-Dhadoop.version=2.0.0-mr1-cdh4.1.1"
export SBT_MAVEN_PROFILES_ARGS="-Phadoop-1 -Dhadoop.version=2.0.0-mr1-cdh4.1.1"
elif [ "$AMPLAB_JENKINS_BUILD_PROFILE" = "hadoop2.2" ]; then
export SBT_MAVEN_PROFILES_ARGS="-Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0"
export SBT_MAVEN_PROFILES_ARGS="-Pyarn -Phadoop-2.2"
elif [ "$AMPLAB_JENKINS_BUILD_PROFILE" = "hadoop2.3" ]; then
export SBT_MAVEN_PROFILES_ARGS="-Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0"
fi
Expand Down
4 changes: 2 additions & 2 deletions dev/scalastyle
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
echo -e "q\n" | build/sbt -Phive -Phive-thriftserver scalastyle > scalastyle.txt
echo -e "q\n" | build/sbt -Phive -Phive-thriftserver test:scalastyle >> scalastyle.txt
# Check style with YARN built too
echo -e "q\n" | build/sbt -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 scalastyle >> scalastyle.txt
echo -e "q\n" | build/sbt -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 test:scalastyle >> scalastyle.txt
echo -e "q\n" | build/sbt -Pyarn -Phadoop-2.2 scalastyle >> scalastyle.txt
echo -e "q\n" | build/sbt -Pyarn -Phadoop-2.2 test:scalastyle >> scalastyle.txt

ERRORS=$(cat scalastyle.txt | awk '{if($1~/error/)print}')
rm scalastyle.txt
Expand Down
11 changes: 6 additions & 5 deletions docs/building-spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,14 @@ You can fix this by setting the `MAVEN_OPTS` variable as discussed before.

# Specifying the Hadoop Version

Because HDFS is not protocol-compatible across versions, if you want to read from HDFS, you'll need to build Spark against the specific HDFS version in your environment. You can do this through the "hadoop.version" property. If unset, Spark will build against Hadoop 1.0.4 by default. Note that certain build profiles are required for particular Hadoop versions:
Because HDFS is not protocol-compatible across versions, if you want to read from HDFS, you'll need to build Spark against the specific HDFS version in your environment. You can do this through the "hadoop.version" property. If unset, Spark will build against Hadoop 2.2.0 by default. Note that certain build profiles are required for particular Hadoop versions:

<table class="table">
<thead>
<tr><th>Hadoop version</th><th>Profile required</th></tr>
</thead>
<tbody>
<tr><td>1.x to 2.1.x</td><td>(none)</td></tr>
<tr><td>1.x to 2.1.x</td><td>hadoop-1</td></tr>
<tr><td>2.2.x</td><td>hadoop-2.2</td></tr>
<tr><td>2.3.x</td><td>hadoop-2.3</td></tr>
<tr><td>2.4.x</td><td>hadoop-2.4</td></tr>
Expand All @@ -77,19 +77,20 @@ For Apache Hadoop versions 1.x, Cloudera CDH "mr1" distributions, and other Hado

{% highlight bash %}
# Apache Hadoop 1.2.1
mvn -Dhadoop.version=1.2.1 -DskipTests clean package
mvn -Dhadoop.version=1.2.1 -Phadoop-1 -DskipTests clean package

# Cloudera CDH 4.2.0 with MapReduce v1
mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -DskipTests clean package
mvn -Dhadoop.version=2.0.0-mr1-cdh4.2.0 -Phadoop-1 -DskipTests clean package
{% endhighlight %}

You can enable the "yarn" profile and optionally set the "yarn.version" property if it is different from "hadoop.version". Spark only supports YARN versions 2.2.0 and later.

Examples:

{% highlight bash %}

# Apache Hadoop 2.2.X
mvn -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package
mvn -Pyarn -Phadoop-2.2 -DskipTests clean package

# Apache Hadoop 2.3.X
mvn -Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package
Expand Down
2 changes: 1 addition & 1 deletion docs/hadoop-third-party-distributions.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ property. For certain versions, you will need to specify additional profiles. Fo
see the guide on [building with maven](building-spark.html#specifying-the-hadoop-version):

mvn -Dhadoop.version=1.0.4 -DskipTests clean package
mvn -Phadoop-2.2 -Dhadoop.version=2.2.0 -DskipTests clean package
mvn -Phadoop-2.3 -Dhadoop.version=2.3.0 -DskipTests clean package

The table below lists the corresponding `hadoop.version` code for each CDH/HDP release. Note that
some Hadoop releases are binary compatible across client versions. This means the pre-built Spark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.spark.ml.classification.ClassificationModel;
import org.apache.spark.ml.param.IntParam;
import org.apache.spark.ml.param.ParamMap;
import org.apache.spark.ml.util.Identifiable$;
import org.apache.spark.mllib.linalg.BLAS;
import org.apache.spark.mllib.linalg.Vector;
import org.apache.spark.mllib.linalg.Vectors;
Expand Down Expand Up @@ -103,7 +104,23 @@ public static void main(String[] args) throws Exception {
* However, this should still compile and run successfully.
*/
class MyJavaLogisticRegression
extends Classifier<Vector, MyJavaLogisticRegression, MyJavaLogisticRegressionModel> {
extends Classifier<Vector, MyJavaLogisticRegression, MyJavaLogisticRegressionModel> {

public MyJavaLogisticRegression() {
init();
}

public MyJavaLogisticRegression(String uid) {
this.uid_ = uid;
init();
}

private String uid_ = Identifiable$.MODULE$.randomUID("myJavaLogReg");

@Override
public String uid() {
return uid_;
}

/**
* Param for max number of iterations
Expand All @@ -117,7 +134,7 @@ class MyJavaLogisticRegression

int getMaxIter() { return (Integer) getOrDefault(maxIter); }

public MyJavaLogisticRegression() {
private void init() {
setMaxIter(100);
}

Expand All @@ -137,7 +154,7 @@ public MyJavaLogisticRegressionModel train(DataFrame dataset) {
Vector weights = Vectors.zeros(numFeatures); // Learning would happen here.

// Create a model, and return it.
return new MyJavaLogisticRegressionModel(this, weights);
return new MyJavaLogisticRegressionModel(uid(), weights).setParent(this);
}
}

Expand All @@ -149,17 +166,21 @@ public MyJavaLogisticRegressionModel train(DataFrame dataset) {
* However, this should still compile and run successfully.
*/
class MyJavaLogisticRegressionModel
extends ClassificationModel<Vector, MyJavaLogisticRegressionModel> {

private MyJavaLogisticRegression parent_;
public MyJavaLogisticRegression parent() { return parent_; }
extends ClassificationModel<Vector, MyJavaLogisticRegressionModel> {

private Vector weights_;
public Vector weights() { return weights_; }

public MyJavaLogisticRegressionModel(MyJavaLogisticRegression parent_, Vector weights_) {
this.parent_ = parent_;
this.weights_ = weights_;
public MyJavaLogisticRegressionModel(String uid, Vector weights) {
this.uid_ = uid;
this.weights_ = weights;
}

private String uid_ = Identifiable$.MODULE$.randomUID("myJavaLogReg");

@Override
public String uid() {
return uid_;
}

// This uses the default implementation of transform(), which reads column "features" and outputs
Expand Down Expand Up @@ -204,6 +225,6 @@ public Vector predictRaw(Vector features) {
*/
@Override
public MyJavaLogisticRegressionModel copy(ParamMap extra) {
return copyValues(new MyJavaLogisticRegressionModel(parent_, weights_), extra);
return copyValues(new MyJavaLogisticRegressionModel(uid(), weights_), extra);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public static void main(String[] args) {
.setOutputCol("features");
LogisticRegression lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01);
.setRegParam(0.001);
Pipeline pipeline = new Pipeline()
.setStages(new PipelineStage[] {tokenizer, hashingTF, lr});

Expand All @@ -77,7 +77,7 @@ public static void main(String[] args) {
List<Document> localTest = Lists.newArrayList(
new Document(4L, "spark i j k"),
new Document(5L, "l m n"),
new Document(6L, "mapreduce spark"),
new Document(6L, "spark hadoop spark"),
new Document(7L, "apache hadoop"));
DataFrame test = jsql.createDataFrame(jsc.parallelize(localTest), Document.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
Expand All @@ -58,7 +58,7 @@
Document = Row("id", "text")
test = sc.parallelize([(4, "spark i j k"),
(5, "l m n"),
(6, "mapreduce spark"),
(6, "spark hadoop spark"),
(7, "apache hadoop")]) \
.map(lambda x: Document(*x)).toDF()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.examples.ml
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.ml.classification.{ClassificationModel, Classifier, ClassifierParams}
import org.apache.spark.ml.param.{IntParam, ParamMap}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
Expand Down Expand Up @@ -106,10 +107,12 @@ private trait MyLogisticRegressionParams extends ClassifierParams {
*
* NOTE: This is private since it is an example. In practice, you may not want it to be private.
*/
private class MyLogisticRegression
private class MyLogisticRegression(override val uid: String)
extends Classifier[Vector, MyLogisticRegression, MyLogisticRegressionModel]
with MyLogisticRegressionParams {

def this() = this(Identifiable.randomUID("myLogReg"))

setMaxIter(100) // Initialize

// The parameter setter is in this class since it should return type MyLogisticRegression.
Expand All @@ -125,7 +128,7 @@ private class MyLogisticRegression
val weights = Vectors.zeros(numFeatures) // Learning would happen here.

// Create a model, and return it.
new MyLogisticRegressionModel(this, weights)
new MyLogisticRegressionModel(uid, weights).setParent(this)
}
}

Expand All @@ -135,7 +138,7 @@ private class MyLogisticRegression
* NOTE: This is private since it is an example. In practice, you may not want it to be private.
*/
private class MyLogisticRegressionModel(
override val parent: MyLogisticRegression,
override val uid: String,
val weights: Vector)
extends ClassificationModel[Vector, MyLogisticRegressionModel]
with MyLogisticRegressionParams {
Expand Down Expand Up @@ -173,6 +176,6 @@ private class MyLogisticRegressionModel(
* This is used for the default implementation of [[transform()]].
*/
override def copy(extra: ParamMap): MyLogisticRegressionModel = {
copyValues(new MyLogisticRegressionModel(parent, weights), extra)
copyValues(new MyLogisticRegressionModel(uid, weights), extra)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object SimpleTextClassificationPipeline {
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.01)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))

Expand All @@ -75,7 +75,7 @@ object SimpleTextClassificationPipeline {
val test = sc.parallelize(Seq(
Document(4L, "spark i j k"),
Document(5L, "l m n"),
Document(6L, "mapreduce spark"),
Document(6L, "spark hadoop spark"),
Document(7L, "apache hadoop")))

// Make predictions on test documents.
Expand Down
2 changes: 1 addition & 1 deletion make-distribution.sh
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ while (( "$#" )); do
--hadoop)
echo "Error: '--hadoop' is no longer supported:"
echo "Error: use Maven profiles and options -Dhadoop.version and -Dyarn.version instead."
echo "Error: Related profiles include hadoop-2.2, hadoop-2.3 and hadoop-2.4."
echo "Error: Related profiles include hadoop-1, hadoop-2.2, hadoop-2.3 and hadoop-2.4."
exit_with_usage
;;
--with-yarn)
Expand Down
Loading

0 comments on commit 5c30360

Please sign in to comment.