Skip to content

Commit

Permalink
merge master
Browse files Browse the repository at this point in the history
  • Loading branch information
mengxr committed Oct 9, 2014
2 parents 7e5a322 + 9c439d3 commit 61b8e0f
Show file tree
Hide file tree
Showing 64 changed files with 1,680 additions and 1,143 deletions.
6 changes: 4 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,10 @@ private[spark] object UIUtils extends Logging {
<div class="row-fluid">
<div class="span12">
<h3 style="vertical-align: middle; display: inline-block;">
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")}
style="margin-right: 15px;" />
<a style="text-decoration: none" href={prependBaseUri("/")}>
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")}
style="margin-right: 15px;" />
</a>
{title}
</h3>
</div>
Expand Down
2 changes: 1 addition & 1 deletion dev/run-tests
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ function handle_error () {
elif [ "$AMPLAB_JENKINS_BUILD_PROFILE" = "hadoop2.0" ]; then
export SBT_MAVEN_PROFILES_ARGS="-Dhadoop.version=2.0.0-mr1-cdh4.1.1"
elif [ "$AMPLAB_JENKINS_BUILD_PROFILE" = "hadoop2.2" ]; then
export SBT_MAVEN_PROFILES_ARGS="-Pyarn -Dhadoop.version=2.2.0"
export SBT_MAVEN_PROFILES_ARGS="-Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0"
elif [ "$AMPLAB_JENKINS_BUILD_PROFILE" = "hadoop2.3" ]; then
export SBT_MAVEN_PROFILES_ARGS="-Pyarn -Phadoop-2.3 -Dhadoop.version=2.3.0"
fi
Expand Down
2 changes: 2 additions & 0 deletions dev/scalastyle
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ echo -e "q\n" | sbt/sbt -Pyarn -Phadoop-2.2 -Dhadoop.version=2.2.0 yarn/scalasty
>> scalastyle.txt

ERRORS=$(cat scalastyle.txt | grep -e "\<error\>")
rm scalastyle.txt

if test ! -z "$ERRORS"; then
echo -e "Scalastyle checks failed at following occurrences:\n$ERRORS"
exit 1
Expand Down
8 changes: 4 additions & 4 deletions docs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,19 @@ phase, use the following sytax:
// supported languages too.
{% endhighlight %}

## API Docs (Scaladoc and Epydoc)
## API Docs (Scaladoc and Sphinx)

You can build just the Spark scaladoc by running `sbt/sbt doc` from the SPARK_PROJECT_ROOT directory.

Similarly, you can build just the PySpark epydoc by running `epydoc --config epydoc.conf` from the
SPARK_PROJECT_ROOT/pyspark directory. Documentation is only generated for classes that are listed as
Similarly, you can build just the PySpark docs by running `make html` from the
SPARK_PROJECT_ROOT/python/docs directory. Documentation is only generated for classes that are listed as
public in `__init__.py`.

When you run `jekyll` in the `docs` directory, it will also copy over the scaladoc for the various
Spark subprojects into the `docs` directory (and then also into the `_site` directory). We use a
jekyll plugin to run `sbt/sbt doc` before building the site so if you haven't run it (recently) it
may take some time as it generates all of the scaladoc. The jekyll plugin also generates the
PySpark docs using [epydoc](http://epydoc.sourceforge.net/).
PySpark docs [Sphinx](http://sphinx-doc.org/).

NOTE: To skip the step of building and copying over the Scala and Python API docs, run `SKIP_API=1
jekyll`.
3 changes: 3 additions & 0 deletions docs/_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ gems:
kramdown:
entity_output: numeric

include:
- _static

# These allow the documentation to be updated with nerw releases
# of Spark, Scala, and Mesos.
SPARK_VERSION: 1.0.0-SNAPSHOT
Expand Down
19 changes: 10 additions & 9 deletions docs/_plugins/copy_api_dirs.rb
Original file line number Diff line number Diff line change
Expand Up @@ -63,19 +63,20 @@
puts "cp -r " + source + "/. " + dest
cp_r(source + "/.", dest)

# Build Epydoc for Python
puts "Moving to python directory and building epydoc."
cd("../python")
puts `epydoc --config epydoc.conf`
# Build Sphinx docs for Python

puts "Moving back into docs dir."
cd("../docs")
puts "Moving to python/docs directory and building sphinx."
cd("../python/docs")
puts `make html`

puts "Moving back into home dir."
cd("../../")

puts "Making directory api/python"
mkdir_p "api/python"
mkdir_p "docs/api/python"

puts "cp -r ../python/docs/. api/python"
cp_r("../python/docs/.", "api/python")
puts "cp -r python/docs/_build/html/. docs/api/python"
cp_r("python/docs/_build/html/.", "docs/api/python")

cd("..")
end
113 changes: 87 additions & 26 deletions ec2/spark_ec2.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import tempfile
import time
import urllib2
import warnings
from optparse import OptionParser
from sys import stderr
import boto
Expand Down Expand Up @@ -61,8 +62,8 @@ def parse_args():
"-s", "--slaves", type="int", default=1,
help="Number of slaves to launch (default: %default)")
parser.add_option(
"-w", "--wait", type="int", default=120,
help="Seconds to wait for nodes to start (default: %default)")
"-w", "--wait", type="int",
help="DEPRECATED (no longer necessary) - Seconds to wait for nodes to start")
parser.add_option(
"-k", "--key-pair",
help="Key pair to use on instances")
Expand Down Expand Up @@ -195,18 +196,6 @@ def get_or_make_group(conn, name):
return conn.create_security_group(name, "Spark EC2 group")


# Wait for a set of launched instances to exit the "pending" state
# (i.e. either to start running or to fail and be terminated)
def wait_for_instances(conn, instances):
while True:
for i in instances:
i.update()
if len([i for i in instances if i.state == 'pending']) > 0:
time.sleep(5)
else:
return


# Check whether a given EC2 instance object is in a state we consider active,
# i.e. not terminating or terminated. We count both stopping and stopped as
# active since we can restart stopped clusters.
Expand Down Expand Up @@ -594,7 +583,7 @@ def setup_cluster(conn, master_nodes, slave_nodes, opts, deploy_ssh_key):

# NOTE: We should clone the repository before running deploy_files to
# prevent ec2-variables.sh from being overwritten
ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/mesos/spark-ec2.git -b v3")
ssh(master, opts, "rm -rf spark-ec2 && git clone https://github.com/mesos/spark-ec2.git -b v4")

print "Deploying files to master..."
deploy_files(conn, "deploy.generic", opts, master_nodes, slave_nodes, modules)
Expand All @@ -619,14 +608,64 @@ def setup_spark_cluster(master, opts):
print "Ganglia started at http://%s:5080/ganglia" % master


# Wait for a whole cluster (masters, slaves and ZooKeeper) to start up
def wait_for_cluster(conn, wait_secs, master_nodes, slave_nodes):
print "Waiting for instances to start up..."
time.sleep(5)
wait_for_instances(conn, master_nodes)
wait_for_instances(conn, slave_nodes)
print "Waiting %d more seconds..." % wait_secs
time.sleep(wait_secs)
def is_ssh_available(host, opts):
"Checks if SSH is available on the host."
try:
with open(os.devnull, 'w') as devnull:
ret = subprocess.check_call(
ssh_command(opts) + ['-t', '-t', '-o', 'ConnectTimeout=3',
'%s@%s' % (opts.user, host), stringify_command('true')],
stdout=devnull,
stderr=devnull
)
return ret == 0
except subprocess.CalledProcessError as e:
return False


def is_cluster_ssh_available(cluster_instances, opts):
for i in cluster_instances:
if not is_ssh_available(host=i.ip_address, opts=opts):
return False
else:
return True


def wait_for_cluster_state(cluster_instances, cluster_state, opts):
"""
cluster_instances: a list of boto.ec2.instance.Instance
cluster_state: a string representing the desired state of all the instances in the cluster
value can be 'ssh-ready' or a valid value from boto.ec2.instance.InstanceState such as
'running', 'terminated', etc.
(would be nice to replace this with a proper enum: http://stackoverflow.com/a/1695250)
"""
sys.stdout.write(
"Waiting for all instances in cluster to enter '{s}' state.".format(s=cluster_state)
)
sys.stdout.flush()

num_attempts = 0

while True:
time.sleep(3 * num_attempts)

for i in cluster_instances:
s = i.update() # capture output to suppress print to screen in newer versions of boto

if cluster_state == 'ssh-ready':
if all(i.state == 'running' for i in cluster_instances) and \
is_cluster_ssh_available(cluster_instances, opts):
break
else:
if all(i.state == cluster_state for i in cluster_instances):
break

num_attempts += 1

sys.stdout.write(".")
sys.stdout.flush()

sys.stdout.write("\n")


# Get number of local disks available for a given EC2 instance type.
Expand Down Expand Up @@ -868,6 +907,16 @@ def real_main():
(opts, action, cluster_name) = parse_args()

# Input parameter validation
if opts.wait is not None:
# NOTE: DeprecationWarnings are silent in 2.7+ by default.
# To show them, run Python with the -Wdefault switch.
# See: https://docs.python.org/3.5/whatsnew/2.7.html
warnings.warn(
"This option is deprecated and has no effect. "
"spark-ec2 automatically waits as long as necessary for clusters to startup.",
DeprecationWarning
)

if opts.ebs_vol_num > 8:
print >> stderr, "ebs-vol-num cannot be greater than 8"
sys.exit(1)
Expand All @@ -890,7 +939,11 @@ def real_main():
(master_nodes, slave_nodes) = get_existing_cluster(conn, opts, cluster_name)
else:
(master_nodes, slave_nodes) = launch_cluster(conn, opts, cluster_name)
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready',
opts=opts
)
setup_cluster(conn, master_nodes, slave_nodes, opts, True)

elif action == "destroy":
Expand Down Expand Up @@ -919,7 +972,11 @@ def real_main():
else:
group_names = [opts.security_group_prefix + "-master",
opts.security_group_prefix + "-slaves"]

wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='terminated',
opts=opts
)
attempt = 1
while attempt <= 3:
print "Attempt %d" % attempt
Expand Down Expand Up @@ -1019,7 +1076,11 @@ def real_main():
for inst in master_nodes:
if inst.state not in ["shutting-down", "terminated"]:
inst.start()
wait_for_cluster(conn, opts.wait, master_nodes, slave_nodes)
wait_for_cluster_state(
cluster_instances=(master_nodes + slave_nodes),
cluster_state='ssh-ready',
opts=opts
)
setup_cluster(conn, master_nodes, slave_nodes, opts, False)

else:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples.mllib

import scala.reflect.runtime.universe._

/**
* Abstract class for parameter case classes.
* This overrides the [[toString]] method to print all case class fields by name and value.
* @tparam T Concrete parameter class.
*/
abstract class AbstractParams[T: TypeTag] {

private def tag: TypeTag[T] = typeTag[T]

/**
* Finds all case class fields in concrete class instance, and outputs them in JSON-style format:
* {
* [field name]:\t[field value]\n
* [field name]:\t[field value]\n
* ...
* }
*/
override def toString: String = {
val tpe = tag.tpe
val allAccessors = tpe.declarations.collect {
case m: MethodSymbol if m.isCaseAccessor => m
}
val mirror = runtimeMirror(getClass.getClassLoader)
val instanceMirror = mirror.reflect(this)
allAccessors.map { f =>
val paramName = f.name.toString
val fieldMirror = instanceMirror.reflectField(f)
val paramValue = fieldMirror.get
s" $paramName:\t$paramValue"
}.mkString("{\n", ",\n", "\n}")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object BinaryClassification {
stepSize: Double = 1.0,
algorithm: Algorithm = LR,
regType: RegType = L2,
regParam: Double = 0.1)
regParam: Double = 0.1) extends AbstractParams[Params]

def main(args: Array[String]) {
val defaultParams = Params()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.{SparkConf, SparkContext}
object Correlations {

case class Params(input: String = "data/mllib/sample_linear_regression_data.txt")
extends AbstractParams[Params]

def main(args: Array[String]) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.{SparkConf, SparkContext}
*/
object CosineSimilarity {
case class Params(inputFile: String = null, threshold: Double = 0.1)
extends AbstractParams[Params]

def main(args: Array[String]) {
val defaultParams = Params()
Expand Down
Loading

0 comments on commit 61b8e0f

Please sign in to comment.