Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SKIPME Merge Apache branch-1.1 bug fixes #30

Merged
merged 13 commits into from
Jan 30, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 15 additions & 12 deletions bin/compute-classpath.sh
Original file line number Diff line number Diff line change
Expand Up @@ -69,22 +69,25 @@ else
assembly_folder="$ASSEMBLY_DIR"
fi

num_jars=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*\.jar" | wc -l)
if [ "$num_jars" -eq "0" ]; then
echo "Failed to find Spark assembly in $assembly_folder"
echo "You need to build Spark before running this program."
exit 1
fi
num_jars=0

for f in ${assembly_folder}/spark-assembly*hadoop*.jar; do
if [[ ! -e "$f" ]]; then
echo "Failed to find Spark assembly in $assembly_folder" 1>&2
echo "You need to build Spark before running this program." 1>&2
exit 1
fi
ASSEMBLY_JAR="$f"
num_jars=$((num_jars+1))
done

if [ "$num_jars" -gt "1" ]; then
jars_list=$(ls "$assembly_folder" | grep "spark-assembly.*hadoop.*.jar")
echo "Found multiple Spark assembly jars in $assembly_folder:"
echo "$jars_list"
echo "Please remove all but one jar."
echo "Found multiple Spark assembly jars in $assembly_folder:" 1>&2
ls ${assembly_folder}/spark-assembly*hadoop*.jar 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi

ASSEMBLY_JAR=$(ls "$assembly_folder"/spark-assembly*hadoop*.jar 2>/dev/null)

# Verify that versions of java used to build the jars and run Spark are compatible
jar_error_check=$("$JAR_CMD" -tf "$ASSEMBLY_JAR" nonexistent/class/path 2>&1)
if [[ "$jar_error_check" =~ "invalid CEN header" ]]; then
Expand Down
27 changes: 21 additions & 6 deletions bin/run-example
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,32 @@ else
fi

if [ -f "$FWDIR/RELEASE" ]; then
export SPARK_EXAMPLES_JAR=`ls "$FWDIR"/lib/spark-examples-*hadoop*.jar`
elif [ -e "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.jar ]; then
export SPARK_EXAMPLES_JAR=`ls "$EXAMPLES_DIR"/target/scala-$SCALA_VERSION/spark-examples-*hadoop*.jar`
JAR_PATH="${FWDIR}/lib"
else
JAR_PATH="${EXAMPLES_DIR}/target/scala-${SPARK_SCALA_VERSION}"
fi

if [[ -z $SPARK_EXAMPLES_JAR ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
JAR_COUNT=0

for f in ${JAR_PATH}/spark-examples-*hadoop*.jar; do
if [[ ! -e "$f" ]]; then
echo "Failed to find Spark examples assembly in $FWDIR/lib or $FWDIR/examples/target" 1>&2
echo "You need to build Spark before running this program" 1>&2
exit 1
fi
SPARK_EXAMPLES_JAR="$f"
JAR_COUNT=$((JAR_COUNT+1))
done

if [ "$JAR_COUNT" -gt "1" ]; then
echo "Found multiple Spark examples assembly jars in ${JAR_PATH}" 1>&2
ls ${JAR_PATH}/spark-examples-*hadoop*.jar 1>&2
echo "Please remove all but one jar." 1>&2
exit 1
fi

export SPARK_EXAMPLES_JAR

EXAMPLE_MASTER=${MASTER:-"local[*]"}

if [[ ! $EXAMPLE_CLASS == org.apache.spark.examples* ]]; then
Expand Down
7 changes: 7 additions & 0 deletions bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@ source $FWDIR/bin/utils.sh
SUBMIT_USAGE_FUNCTION=usage
gatherSparkSubmitOpts "$@"

# SPARK-4161: scala does not assume use of the java classpath,
# so we need to add the "-Dscala.usejavacp=true" flag mnually. We
# do this specifically for the Spark shell because the scala REPL
# has its own class loader, and any additional classpath specified
# through spark.driver.extraClassPath is not automatically propagated.
SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Dscala.usejavacp=true"

function main() {
if $cygwin; then
# Workaround for issue involving JLine and Cygwin
Expand Down
10 changes: 10 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/webui.css
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,16 @@ pre {
border: none;
}

.version {
line-height: 30px;
vertical-align: bottom;
font-size: 12px;
padding: 0;
margin: 0;
font-weight: bold;
color: #777;
}

.tooltip {
font-weight: normal;
}
Expand Down
10 changes: 6 additions & 4 deletions core/src/main/scala/org/apache/spark/Aggregator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,17 @@ case class Aggregator[K, V, C] (
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C) {

private val externalSorting = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)
// When spilling is enabled sorting will happen externally, but not necessarily with an
// ExternalSorter.
private val isSpillEnabled = SparkEnv.get.conf.getBoolean("spark.shuffle.spill", true)

@deprecated("use combineValuesByKey with TaskContext argument", "0.9.0")
def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]): Iterator[(K, C)] =
combineValuesByKey(iter, null)

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
context: TaskContext): Iterator[(K, C)] = {
if (!externalSorting) {
if (!isSpillEnabled) {
val combiners = new AppendOnlyMap[K,C]
var kv: Product2[K, V] = null
val update = (hadValue: Boolean, oldValue: C) => {
Expand Down Expand Up @@ -71,9 +73,9 @@ case class Aggregator[K, V, C] (
combineCombinersByKey(iter, null)

def combineCombinersByKey(iter: Iterator[_ <: Product2[K, C]], context: TaskContext)
: Iterator[(K, C)] =
: Iterator[(K, C)] =
{
if (!externalSorting) {
if (!isSpillEnabled) {
val combiners = new AppendOnlyMap[K,C]
var kc: Product2[K, C] = null
val update = (hadValue: Boolean, oldValue: C) => {
Expand Down
4 changes: 1 addition & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -858,7 +858,7 @@ class SparkContext(config: SparkConf) extends Logging {
}

/** The version of Spark on which this application is running. */
def version = SparkContext.SPARK_VERSION
def version = SPARK_VERSION

/**
* Return a map from the slave to the max memory available for caching and the remaining
Expand Down Expand Up @@ -1339,8 +1339,6 @@ class SparkContext(config: SparkConf) extends Logging {
*/
object SparkContext extends Logging {

private[spark] val SPARK_VERSION = "1.1.2-SNAPSHOT"

private[spark] val SPARK_JOB_DESCRIPTION = "spark.job.description"

private[spark] val SPARK_JOB_GROUP_ID = "spark.jobGroup.id"
Expand Down
32 changes: 16 additions & 16 deletions core/src/main/scala/org/apache/spark/deploy/master/Master.scala
Original file line number Diff line number Diff line change
Expand Up @@ -679,23 +679,23 @@ private[spark] class Master(
app.desc.appUiUrl = notFoundBasePath
return false
}
val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec

if (eventLogPaths.isEmpty) {
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
var msg = s"No event logs found for application $appName in $eventLogDir."
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
return false
}

try {
val fileSystem = Utils.getHadoopFileSystem(eventLogDir)
val eventLogInfo = EventLoggingListener.parseLoggingInfo(eventLogDir, fileSystem)
val eventLogPaths = eventLogInfo.logPaths
val compressionCodec = eventLogInfo.compressionCodec

if (eventLogPaths.isEmpty) {
// Event logging is enabled for this application, but no event logs are found
val title = s"Application history not found (${app.id})"
var msg = s"No event logs found for application $appName in $eventLogDir."
logWarning(msg)
msg += " Did you specify the correct logging directory?"
msg = URLEncoder.encode(msg, "UTF-8")
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
return false
}

val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)",
HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.fs.permission.FsPermission
import org.json4s.JsonAST.JValue
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.{Logging, SparkConf, SPARK_VERSION}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
Expand Down Expand Up @@ -83,7 +83,7 @@ private[spark] class EventLoggingListener(
sparkConf.get("spark.io.compression.codec", CompressionCodec.DEFAULT_COMPRESSION_CODEC)
logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
}
logger.newFile(SPARK_VERSION_PREFIX + SparkContext.SPARK_VERSION)
logger.newFile(SPARK_VERSION_PREFIX + SPARK_VERSION)
logger.newFile(LOG_PREFIX + logger.fileIndex)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private[spark] class JavaSerializerInstance(counterReset: Int, defaultClassLoade
}

override def deserializeStream(s: InputStream): DeserializationStream = {
new JavaDeserializationStream(s, Utils.getContextOrSparkClassLoader)
new JavaDeserializationStream(s, defaultClassLoader)
}

def deserializeStream(s: InputStream, loader: ClassLoader): DeserializationStream = {
Expand Down
23 changes: 9 additions & 14 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.{Locale, Date}
import scala.xml.Node

import org.apache.spark.Logging
import org.apache.spark.SparkContext
import org.apache.spark.SPARK_VERSION

/** Utility functions for generating XML pages with spark content. */
private[spark] object UIUtils extends Logging {
Expand Down Expand Up @@ -186,9 +186,12 @@ private[spark] object UIUtils extends Logging {
<body>
<div class="navbar navbar-static-top">
<div class="navbar-inner">
<a href={prependBaseUri("/")} class="brand">
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
</a>
<div class="brand">
<a href={prependBaseUri("/")} class="brand">
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
<span class="version">{SPARK_VERSION}</span>
</a>
</div>
<ul class="nav">{header}</ul>
<p class="navbar-text pull-right">
<strong title={appName}>{shortAppName}</strong> application UI
Expand All @@ -205,11 +208,6 @@ private[spark] object UIUtils extends Logging {
</div>
{content}
</div>
<div id="footer">
<div class="container-fluid">
<p class="muted credit">Spark {SparkContext.SPARK_VERSION}</p>
</div>
</div>
</body>
</html>
}
Expand All @@ -229,18 +227,15 @@ private[spark] object UIUtils extends Logging {
<a style="text-decoration: none" href={prependBaseUri("/")}>
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")}
style="margin-right: 15px;" />
<span class="version"
style="margin-right: 15px;">{SPARK_VERSION}</span>
</a>
{title}
</h3>
</div>
</div>
{content}
</div>
<div id="footer">
<div class="container-fluid">
<p class="muted credit">Spark {SparkContext.SPARK_VERSION}</p>
</div>
</div>
</body>
</html>
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.hadoop.fs.{FileStatus, Path}
import org.json4s.jackson.JsonMethods._
import org.scalatest.{BeforeAndAfter, FunSuite}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SparkConf, SparkContext, SPARK_VERSION}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.util.{JsonProtocol, Utils}

Expand Down Expand Up @@ -194,7 +194,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {

def assertInfoCorrect(info: EventLoggingInfo, loggerStopped: Boolean) {
assert(info.logPaths.size > 0)
assert(info.sparkVersion === SparkContext.SPARK_VERSION)
assert(info.sparkVersion === SPARK_VERSION)
assert(info.compressionCodec.isDefined === compressionCodec.isDefined)
info.compressionCodec.foreach { codec =>
assert(compressionCodec.isDefined)
Expand Down Expand Up @@ -378,7 +378,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter {
private def assertSparkVersionIsValid(logFiles: Array[FileStatus]) {
val file = logFiles.map(_.getPath.getName).find(EventLoggingListener.isSparkVersionFile)
assert(file.isDefined)
assert(EventLoggingListener.parseSparkVersion(file.get) === SparkContext.SPARK_VERSION)
assert(EventLoggingListener.parseSparkVersion(file.get) === SPARK_VERSION)
}

private def assertCompressionCodecIsValid(logFiles: Array[FileStatus], compressionCodec: String) {
Expand Down
8 changes: 8 additions & 0 deletions dev/mima
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@ FWDIR="$(cd `dirname $0`/..; pwd)"
cd "$FWDIR"

echo -e "q\n" | sbt/sbt oldDeps/update
rm -f .generated-mima*

# Generate Mima Ignore is called twice, first with latest built jars
# on the classpath and then again with previous version jars on the classpath.
# Because of a bug in GenerateMIMAIgnore that when old jars are ahead on classpath
# it did not process the new classes (which are in assembly jar).
./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore

export SPARK_CLASSPATH=`find lib_managed \( -name '*spark*jar' -a -type f \) | tr "\\n" ":"`
echo "SPARK_CLASSPATH=$SPARK_CLASSPATH"

./bin/spark-class org.apache.spark.tools.GenerateMIMAIgnore

echo -e "q\n" | sbt/sbt mima-report-binary-issues | grep -v -e "info.*Resolving"
ret_val=$?

Expand Down
2 changes: 1 addition & 1 deletion docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ for details.
</tr>
<tr>
<td> <b>cogroup</b>(<i>otherDataset</i>, [<i>numTasks</i>]) </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, Iterable&lt;V&gt;, Iterable&lt;W&gt;) tuples. This operation is also called <code>groupWith</code>. </td>
<td> When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable&lt;V&gt;, Iterable&lt;W&gt;)) tuples. This operation is also called <code>groupWith</code>. </td>
</tr>
<tr>
<td> <b>cartesian</b>(<i>otherDataset</i>) </td>
Expand Down
4 changes: 2 additions & 2 deletions docs/submitting-applications.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ to the console. Thus, this mode is especially suitable for applications that inv

Alternatively, if your application is submitted from a machine far from the worker machines (e.g.
locally on your laptop), it is common to use `cluster` mode to minimize network latency between
the drivers and the executors. Note that `cluster` mode is currently not supported for standalone
clusters, Mesos clusters, or python applications.
the drivers and the executors. Note that `cluster` mode is currently not supported for Mesos
clusters or python applications.

For Python applications, simply pass a `.py` file in the place of `<application-jar>` instead of a JAR,
and add Python `.zip`, `.egg` or `.py` files to the search path with `--py-files`.
Expand Down
6 changes: 6 additions & 0 deletions project/MimaBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,12 @@ object MimaBuild {

def excludeMember(fullName: String) = Seq(
ProblemFilters.exclude[MissingMethodProblem](fullName),
// Sometimes excluded methods have default arguments and
// they are translated into public methods/fields($default$) in generated
// bytecode. It is not possible to exhustively list everything.
// But this should be okay.
ProblemFilters.exclude[MissingMethodProblem](fullName+"$default$2"),
ProblemFilters.exclude[MissingMethodProblem](fullName+"$default$1"),
ProblemFilters.exclude[MissingFieldProblem](fullName),
ProblemFilters.exclude[IncompatibleResultTypeProblem](fullName),
ProblemFilters.exclude[IncompatibleMethTypeProblem](fullName),
Expand Down
2 changes: 1 addition & 1 deletion project/SparkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ object OldDeps {
Some("org.apache.spark" % fullId % "1.0.0")
}

def oldDepsSettings() = Defaults.defaultSettings ++ Seq(
def oldDepsSettings() = Defaults.coreDefaultSettings ++ Seq(
name := "old-deps",
scalaVersion := "2.10.4",
retrieveManaged := true,
Expand Down
10 changes: 0 additions & 10 deletions python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,6 @@
Main entry point for accessing data stored in Apache Hive..
"""

# The following block allows us to import python's random instead of mllib.random for scripts in
# mllib that depend on top level pyspark packages, which transitively depend on python's random.
# Since Python's import logic looks for modules in the current package first, we eliminate
# mllib.random as a candidate for C{import random} by removing the first search path, the script's
# location, in order to force the loader to look in Python's top-level modules for C{random}.
import sys
s = sys.path.pop(0)
import random
sys.path.insert(0, s)

from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
Expand Down
Loading