").append(m.parseHTML(a)).find(d):a)}).complete(c&&function(a,b){g.each(c,e||[a.responseText,b,a])}),this},m.expr.filters.animated=function(a){return m.grep(m.timers,function(b){return a===b.elem}).length};var cd=a.document.documentElement;function dd(a){return m.isWindow(a)?a:9===a.nodeType?a.defaultView||a.parentWindow:!1}m.offset={setOffset:function(a,b,c){var d,e,f,g,h,i,j,k=m.css(a,"position"),l=m(a),n={};"static"===k&&(a.style.position="relative"),h=l.offset(),f=m.css(a,"top"),i=m.css(a,"left"),j=("absolute"===k||"fixed"===k)&&m.inArray("auto",[f,i])>-1,j?(d=l.position(),g=d.top,e=d.left):(g=parseFloat(f)||0,e=parseFloat(i)||0),m.isFunction(b)&&(b=b.call(a,c,h)),null!=b.top&&(n.top=b.top-h.top+g),null!=b.left&&(n.left=b.left-h.left+e),"using"in b?b.using.call(a,n):l.css(n)}},m.fn.extend({offset:function(a){if(arguments.length)return void 0===a?this:this.each(function(b){m.offset.setOffset(this,a,b)});var b,c,d={top:0,left:0},e=this[0],f=e&&e.ownerDocument;if(f)return b=f.documentElement,m.contains(b,e)?(typeof e.getBoundingClientRect!==K&&(d=e.getBoundingClientRect()),c=dd(f),{top:d.top+(c.pageYOffset||b.scrollTop)-(b.clientTop||0),left:d.left+(c.pageXOffset||b.scrollLeft)-(b.clientLeft||0)}):d},position:function(){if(this[0]){var a,b,c={top:0,left:0},d=this[0];return"fixed"===m.css(d,"position")?b=d.getBoundingClientRect():(a=this.offsetParent(),b=this.offset(),m.nodeName(a[0],"html")||(c=a.offset()),c.top+=m.css(a[0],"borderTopWidth",!0),c.left+=m.css(a[0],"borderLeftWidth",!0)),{top:b.top-c.top-m.css(d,"marginTop",!0),left:b.left-c.left-m.css(d,"marginLeft",!0)}}},offsetParent:function(){return this.map(function(){var a=this.offsetParent||cd;while(a&&!m.nodeName(a,"html")&&"static"===m.css(a,"position"))a=a.offsetParent;return a||cd})}}),m.each({scrollLeft:"pageXOffset",scrollTop:"pageYOffset"},function(a,b){var c=/Y/.test(b);m.fn[a]=function(d){return V(this,function(a,d,e){var f=dd(a);return void 0===e?f?b in f?f[b]:f.document.documentElement[d]:a[d]:void(f?f.scrollTo(c?m(f).scrollLeft():e,c?e:m(f).scrollTop()):a[d]=e)},a,d,arguments.length,null)}}),m.each(["top","left"],function(a,b){m.cssHooks[b]=Lb(k.pixelPosition,function(a,c){return c?(c=Jb(a,b),Hb.test(c)?m(a).position()[b]+"px":c):void 0})}),m.each({Height:"height",Width:"width"},function(a,b){m.each({padding:"inner"+a,content:b,"":"outer"+a},function(c,d){m.fn[d]=function(d,e){var f=arguments.length&&(c||"boolean"!=typeof d),g=c||(d===!0||e===!0?"margin":"border");return V(this,function(b,c,d){var e;return m.isWindow(b)?b.document.documentElement["client"+a]:9===b.nodeType?(e=b.documentElement,Math.max(b.body["scroll"+a],e["scroll"+a],b.body["offset"+a],e["offset"+a],e["client"+a])):void 0===d?m.css(b,c,g):m.style(b,c,d,g)},b,f?d:void 0,f,null)}})}),m.fn.size=function(){return this.length},m.fn.andSelf=m.fn.addBack,"function"==typeof define&&define.amd&&define("jquery",[],function(){return m});var ed=a.jQuery,fd=a.$;return m.noConflict=function(b){return a.$===m&&(a.$=fd),b&&a.jQuery===m&&(a.jQuery=ed),m},typeof b===K&&(a.jQuery=a.$=m),m});
diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.css b/core/src/main/resources/org/apache/spark/ui/static/webui.css
index 11fd956bfbe66..445110d63e184 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.css
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.css
@@ -81,7 +81,9 @@ table.sortable thead {
span.kill-link {
margin-right: 2px;
+ margin-left: 20px;
color: gray;
+ float: right;
}
span.kill-link a {
@@ -112,3 +114,8 @@ pre {
padding-bottom: 0;
border: none;
}
+
+.tooltip {
+ font-weight: normal;
+}
+
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 2b636b085d73a..8f70744d804d9 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -79,7 +79,7 @@ class SparkEnv (
private[spark] def stop() {
pythonWorkers.foreach { case(key, worker) => worker.stop() }
- httpFileServer.stop()
+ Option(httpFileServer).foreach(_.stop())
mapOutputTracker.stop()
shuffleManager.stop()
broadcastManager.stop()
@@ -228,9 +228,15 @@ object SparkEnv extends Logging {
val cacheManager = new CacheManager(blockManager)
- val httpFileServer = new HttpFileServer(securityManager)
- httpFileServer.initialize()
- conf.set("spark.fileserver.uri", httpFileServer.serverUri)
+ val httpFileServer =
+ if (isDriver) {
+ val server = new HttpFileServer(securityManager)
+ server.initialize()
+ conf.set("spark.fileserver.uri", server.serverUri)
+ server
+ } else {
+ null
+ }
val metricsSystem = if (isDriver) {
MetricsSystem.createMetricsSystem("driver", conf, securityManager)
diff --git a/core/src/main/scala/org/apache/spark/ui/ToolTips.scala b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
new file mode 100644
index 0000000000000..37708d75489c8
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/ui/ToolTips.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ui
+
+private[spark] object ToolTips {
+ val SCHEDULER_DELAY =
+ """Scheduler delay includes time to ship the task from the scheduler to
+ the executor, and time the time to send a message from the executor to the scheduler stating
+ that the task has completed. When the scheduler becomes overloaded, task completion messages
+ become queued up, and scheduler delay increases."""
+
+ val INPUT = "Bytes read from Hadoop or from Spark storage."
+
+ val SHUFFLE_WRITE = "Bytes written to disk in order to be read by a shuffle in a future stage."
+
+ val SHUFFLE_READ =
+ """Bytes read from remote executors. Typically less than shuffle write bytes
+ because this does not include shuffle data read locally."""
+}
diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
index 1b104253d545d..9cb50d9b83dda 100644
--- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
+++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala
@@ -25,6 +25,7 @@ import org.apache.spark.Logging
/** Utility functions for generating XML pages with spark content. */
private[spark] object UIUtils extends Logging {
+ val TABLE_CLASS = "table table-bordered table-striped table-condensed sortable"
// SimpleDateFormat is not thread-safe. Don't expose it to avoid improper use.
private val dateFormat = new ThreadLocal[SimpleDateFormat]() {
@@ -139,6 +140,18 @@ private[spark] object UIUtils extends Logging {
def prependBaseUri(basePath: String = "", resource: String = "") = uiRoot + basePath + resource
+ val commonHeaderNodes = {
+
+
+
+
+
+
+
+ }
+
/** Returns a spark page with correctly formatted headers */
def headerSparkPage(
content: => Seq[Node],
@@ -157,12 +170,7 @@ private[spark] object UIUtils extends Logging {
-
-
-
-
+ {commonHeaderNodes}
{appName} - {title}
@@ -193,11 +201,7 @@ private[spark] object UIUtils extends Logging {
def basicSparkPage(content: => Seq[Node], title: String): Seq[Node] = {
-
-
-
-
+ {commonHeaderNodes}
{title}
@@ -224,9 +228,9 @@ private[spark] object UIUtils extends Logging {
data: Seq[T],
fixedWidth: Boolean = false): Seq[Node] = {
- var tableClass = "table table-bordered table-striped table-condensed sortable"
+ var listingTableClass = TABLE_CLASS
if (fixedWidth) {
- tableClass += " table-fixed"
+ listingTableClass += " table-fixed"
}
val colWidth = 100.toDouble / headers.size
val colWidthAttr = if (fixedWidth) colWidth + "%" else ""
@@ -246,7 +250,7 @@ private[spark] object UIUtils extends Logging {
}
}
}
-
+
{headerRow}
{data.map(r => generateDataRow(r))}
diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
index 95b4a4e91d333..b358c855e1c88 100644
--- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
@@ -22,9 +22,26 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.ui.{ToolTips, UIUtils, WebUIPage}
import org.apache.spark.util.Utils
+/** Summary information about an executor to display in the UI. */
+private case class ExecutorSummaryInfo(
+ id: String,
+ hostPort: String,
+ rddBlocks: Int,
+ memoryUsed: Long,
+ diskUsed: Long,
+ activeTasks: Int,
+ failedTasks: Int,
+ completedTasks: Int,
+ totalTasks: Int,
+ totalDuration: Long,
+ totalInputBytes: Long,
+ totalShuffleRead: Long,
+ totalShuffleWrite: Long,
+ maxMemory: Long)
+
private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
private val appName = parent.appName
private val basePath = parent.basePath
@@ -36,8 +53,36 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val memUsed = storageStatusList.map(_.memUsed).fold(0L)(_ + _)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_ + _)
val execInfo = for (statusId <- 0 until storageStatusList.size) yield getExecInfo(statusId)
- val execInfoSorted = execInfo.sortBy(_.getOrElse("Executor ID", ""))
- val execTable = UIUtils.listingTable(execHeader, execRow, execInfoSorted)
+ val execInfoSorted = execInfo.sortBy(_.id)
+
+ val execTable =
+
+
+ Executor ID
+ Address
+ RDD Blocks
+ Memory Used
+ Disk Used
+ Active Tasks
+ Failed Tasks
+ Complete Tasks
+ Total Tasks
+ Task Time
+ Input
+ Shuffle Read
+
+
+
+ Shuffle Write
+
+
+
+
+ {execInfoSorted.map(execRow(_))}
+
+
val content =
@@ -60,53 +105,43 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
parent.headerTabs, parent)
}
- /** Header fields for the executors table */
- private def execHeader = Seq(
- "Executor ID",
- "Address",
- "RDD Blocks",
- "Memory Used",
- "Disk Used",
- "Active Tasks",
- "Failed Tasks",
- "Complete Tasks",
- "Total Tasks",
- "Task Time",
- "Input Bytes",
- "Shuffle Read",
- "Shuffle Write")
-
/** Render an HTML row representing an executor */
- private def execRow(values: Map[String, String]): Seq[Node] = {
- val maximumMemory = values("Maximum Memory")
- val memoryUsed = values("Memory Used")
- val diskUsed = values("Disk Used")
- // scalastyle:off
+ private def execRow(info: ExecutorSummaryInfo): Seq[Node] = {
+ val maximumMemory = info.maxMemory
+ val memoryUsed = info.memoryUsed
+ val diskUsed = info.diskUsed
- {values("Executor ID")}
- {values("Address")}
- {values("RDD Blocks")}
-
- {Utils.bytesToString(memoryUsed.toLong)} /
- {Utils.bytesToString(maximumMemory.toLong)}
+ {info.id}
+ {info.hostPort}
+ {info.rddBlocks}
+
+ {Utils.bytesToString(memoryUsed)} /
+ {Utils.bytesToString(maximumMemory)}
+
+
+ {Utils.bytesToString(diskUsed)}
+
+ {info.activeTasks}
+ {info.failedTasks}
+ {info.completedTasks}
+ {info.totalTasks}
+
+ {Utils.msDurationToString(info.totalDuration)}
-
- {Utils.bytesToString(diskUsed.toLong)}
+
+ {Utils.bytesToString(info.totalInputBytes)}
+
+
+ {Utils.bytesToString(info.totalShuffleRead)}
+
+
+ {Utils.bytesToString(info.totalShuffleWrite)}
- {values("Active Tasks")}
- {values("Failed Tasks")}
- {values("Complete Tasks")}
- {values("Total Tasks")}
- {Utils.msDurationToString(values("Task Time").toLong)}
- {Utils.bytesToString(values("Input Bytes").toLong)}
- {Utils.bytesToString(values("Shuffle Read").toLong)}
- {Utils.bytesToString(values("Shuffle Write").toLong)}
- // scalastyle:on
}
/** Represent an executor's info as a map given a storage status index */
- private def getExecInfo(statusId: Int): Map[String, String] = {
+ private def getExecInfo(statusId: Int): ExecutorSummaryInfo = {
val status = listener.storageStatusList(statusId)
val execId = status.blockManagerId.executorId
val hostPort = status.blockManagerId.hostPort
@@ -118,15 +153,12 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
val failedTasks = listener.executorToTasksFailed.getOrElse(execId, 0)
val completedTasks = listener.executorToTasksComplete.getOrElse(execId, 0)
val totalTasks = activeTasks + failedTasks + completedTasks
- val totalDuration = listener.executorToDuration.getOrElse(execId, 0)
- val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0)
- val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0)
- val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0)
+ val totalDuration = listener.executorToDuration.getOrElse(execId, 0L)
+ val totalInputBytes = listener.executorToInputBytes.getOrElse(execId, 0L)
+ val totalShuffleRead = listener.executorToShuffleRead.getOrElse(execId, 0L)
+ val totalShuffleWrite = listener.executorToShuffleWrite.getOrElse(execId, 0L)
- // Also include fields not in the header
- val execFields = execHeader ++ Seq("Maximum Memory")
-
- val execValues = Seq(
+ new ExecutorSummaryInfo(
execId,
hostPort,
rddBlocks,
@@ -141,8 +173,6 @@ private[ui] class ExecutorsPage(parent: ExecutorsTab) extends WebUIPage("") {
totalShuffleRead,
totalShuffleWrite,
maxMem
- ).map(_.toString)
-
- execFields.zip(execValues).toMap
+ )
}
}
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
index 2a34a9af925d6..52020954ea57c 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/ExecutorTable.scala
@@ -20,7 +20,7 @@ package org.apache.spark.ui.jobs
import scala.collection.mutable
import scala.xml.Node
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.util.Utils
/** Page showing executor summary */
@@ -35,7 +35,7 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
/** Special table which merges two header cells. */
private def executorTable[T](): Seq[Node] = {
-
+
Executor ID
Address
@@ -43,9 +43,9 @@ private[ui] class ExecutorTable(stageId: Int, parent: JobProgressTab) {
Total Tasks
Failed Tasks
Succeeded Tasks
- Input Bytes
- Shuffle Read
- Shuffle Write
+ Input
+ Shuffle Read
+ Shuffle Write
Shuffle Spill (Memory)
Shuffle Spill (Disk)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index afb8ed754ff8b..8c3821bd7c3eb 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletRequest
import scala.xml.Node
-import org.apache.spark.ui.{WebUIPage, UIUtils}
+import org.apache.spark.ui.{ToolTips, WebUIPage, UIUtils}
import org.apache.spark.util.{Utils, Distribution}
/** Page showing statistics and task list for a given stage */
@@ -127,14 +127,14 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
metrics.get.resultSerializationTime.toDouble
}
val serializationQuantiles =
- "Result serialization time" +: Distribution(serializationTimes).
- get.getQuantiles().map(ms => UIUtils.formatDuration(ms.toLong))
+ Result serialization time +: Distribution(serializationTimes).
+ get.getQuantiles().map(ms => {UIUtils.formatDuration(ms.toLong)} )
val serviceTimes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.executorRunTime.toDouble
}
- val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles()
- .map(ms => UIUtils.formatDuration(ms.toLong))
+ val serviceQuantiles = Duration +: Distribution(serviceTimes).get.getQuantiles()
+ .map(ms => {UIUtils.formatDuration(ms.toLong)} )
val gettingResultTimes = validTasks.map { case TaskUIData(info, _, _) =>
if (info.gettingResultTime > 0) {
@@ -143,9 +143,9 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
0.0
}
}
- val gettingResultQuantiles = "Time spent fetching task results" +:
+ val gettingResultQuantiles = Time spent fetching task results +:
Distribution(gettingResultTimes).get.getQuantiles().map { millis =>
- UIUtils.formatDuration(millis.toLong)
+ {UIUtils.formatDuration(millis.toLong)}
}
// The scheduler delay includes the network delay to send the task to the worker
// machine and to send back the result (but not the time to fetch the task result,
@@ -160,42 +160,45 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
}
totalExecutionTime - metrics.get.executorRunTime
}
- val schedulerDelayQuantiles = "Scheduler delay" +:
+ val schedulerDelayTitle = Scheduler delay
+ val schedulerDelayQuantiles = schedulerDelayTitle +:
Distribution(schedulerDelays).get.getQuantiles().map { millis =>
- UIUtils.formatDuration(millis.toLong)
+ {UIUtils.formatDuration(millis.toLong)}
}
def getQuantileCols(data: Seq[Double]) =
- Distribution(data).get.getQuantiles().map(d => Utils.bytesToString(d.toLong))
+ Distribution(data).get.getQuantiles().map(d => {Utils.bytesToString(d.toLong)} )
val inputSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.inputMetrics.map(_.bytesRead).getOrElse(0L).toDouble
}
- val inputQuantiles = "Input" +: getQuantileCols(inputSizes)
+ val inputQuantiles = Input +: getQuantileCols(inputSizes)
val shuffleReadSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
- val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
+ val shuffleReadQuantiles = Shuffle Read (Remote) +:
+ getQuantileCols(shuffleReadSizes)
val shuffleWriteSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
}
- val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
+ val shuffleWriteQuantiles = Shuffle Write +: getQuantileCols(shuffleWriteSizes)
val memoryBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.memoryBytesSpilled.toDouble
}
- val memoryBytesSpilledQuantiles = "Shuffle spill (memory)" +:
+ val memoryBytesSpilledQuantiles = Shuffle spill (memory) +:
getQuantileCols(memoryBytesSpilledSizes)
val diskBytesSpilledSizes = validTasks.map { case TaskUIData(_, metrics, _) =>
metrics.get.diskBytesSpilled.toDouble
}
- val diskBytesSpilledQuantiles = "Shuffle spill (disk)" +:
+ val diskBytesSpilledQuantiles = Shuffle spill (disk) +:
getQuantileCols(diskBytesSpilledSizes)
- val listings: Seq[Seq[String]] = Seq(
+ val listings: Seq[Seq[Node]] = Seq(
serializationQuantiles,
serviceQuantiles,
gettingResultQuantiles,
@@ -208,7 +211,7 @@ private[ui] class StagePage(parent: JobProgressTab) extends WebUIPage("stage") {
val quantileHeaders = Seq("Metric", "Min", "25th percentile",
"Median", "75th percentile", "Max")
- def quantileRow(data: Seq[String]): Seq[Node] = {data.map(d => {d} )}
+ def quantileRow(data: Seq[Node]): Seq[Node] = {data}
Some(UIUtils.listingTable(quantileHeaders, quantileRow, listings, fixedWidth = true))
}
val executorTable = new ExecutorTable(stageId, parent)
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
index a9ac6d5bee9c9..fd8d0b5cdde00 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StageTable.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.HashMap
import scala.xml.Node
import org.apache.spark.scheduler.{StageInfo, TaskInfo}
-import org.apache.spark.ui.UIUtils
+import org.apache.spark.ui.{ToolTips, UIUtils}
import org.apache.spark.util.Utils
/** Page showing list of all ongoing and recently finished stages */
@@ -43,9 +43,16 @@ private[ui] class StageTableBase(
Submitted
Duration
Tasks: Succeeded/Total
- Input
- Shuffle Read
- Shuffle Write
+ Input
+ Shuffle Read
+
+
+
+ Shuffle Write
+
+
}
def toNodeSeq: Seq[Node] = {
@@ -82,7 +89,8 @@ private[ui] class StageTableBase(
// scalastyle:off
val killLink = if (killEnabled) {
- (kill )
+ (kill )
}
// scalastyle:on
@@ -102,7 +110,7 @@ private[ui] class StageTableBase(
listener.stageIdToDescription.get(s.stageId)
.map(d => {d}
{nameLink} {killLink}
)
- .getOrElse({killLink} {nameLink} {details}
)
+ .getOrElse({nameLink} {killLink} {details}
)
}
protected def stageRow(s: StageInfo): Seq[Node] = {
diff --git a/dev/run-tests b/dev/run-tests
index d9df020f7563c..edd17b53b3d8c 100755
--- a/dev/run-tests
+++ b/dev/run-tests
@@ -66,10 +66,10 @@ echo "========================================================================="
# (either resolution or compilation) prompts the user for input either q, r,
# etc to quit or retry. This echo is there to make it not block.
if [ -n "$_RUN_SQL_TESTS" ]; then
- echo -e "q\n" | SPARK_HIVE=true sbt/sbt clean assembly test | \
+ echo -e "q\n" | SPARK_HIVE=true sbt/sbt clean package assembly/assembly test | \
grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
else
- echo -e "q\n" | sbt/sbt clean assembly test | \
+ echo -e "q\n" | sbt/sbt clean package assembly/assembly test | \
grep -v -e "info.*Resolving" -e "warn.*Merging" -e "info.*Including"
fi
diff --git a/dev/run-tests-jenkins b/dev/run-tests-jenkins
new file mode 100755
index 0000000000000..8dda671e976ce
--- /dev/null
+++ b/dev/run-tests-jenkins
@@ -0,0 +1,85 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Wrapper script that runs the Spark tests then reports QA results
+# to github via its API.
+
+# Go to the Spark project root directory
+FWDIR="$(cd `dirname $0`/..; pwd)"
+cd $FWDIR
+
+COMMENTS_URL="https://api.github.com/repos/apache/spark/issues/$ghprbPullId/comments"
+
+function post_message {
+ message=$1
+ data="{\"body\": \"$message\"}"
+ echo "Attempting to post to Github:"
+ echo "$data"
+
+ curl -D- -u x-oauth-basic:$GITHUB_OAUTH_KEY -X POST --data "$data" -H \
+ "Content-Type: application/json" \
+ $COMMENTS_URL | head -n 8
+}
+
+start_message="QA tests have started for PR $ghprbPullId."
+if [ "$sha1" == "$ghprbActualCommit" ]; then
+ start_message="$start_message This patch DID NOT merge cleanly! "
+else
+ start_message="$start_message This patch merges cleanly. "
+fi
+start_message="$start_message View progress: "
+start_message="$start_message${BUILD_URL}consoleFull"
+
+post_message "$start_message"
+
+./dev/run-tests
+test_result="$?"
+
+result_message="QA results for PR $ghprbPullId: "
+
+if [ "$test_result" -eq "0" ]; then
+ result_message="$result_message- This patch PASSES unit tests. "
+else
+ result_message="$result_message- This patch FAILED unit tests. "
+fi
+
+if [ "$sha1" != "$ghprbActualCommit" ]; then
+ result_message="$result_message- This patch merges cleanly "
+ non_test_files=$(git diff master --name-only | grep -v "\/test" | tr "\n" " ")
+ new_public_classes=$(git diff master $non_test_files \
+ | grep -e "trait " -e "class " \
+ | grep -e "{" -e "(" \
+ | grep -v -e \@\@ -e private \
+ | grep \+ \
+ | sed "s/\+ *//" \
+ | tr "\n" "~" \
+ | sed "s/~/ /g")
+ if [ "$new_public_classes" == "" ]; then
+ result_message="$result_message- This patch adds no public classes "
+ else
+ result_message="$result_message- This patch adds the following public classes (experimental): "
+ result_message="$result_message$new_public_classes"
+ fi
+fi
+result_message="${result_message} For more information see test ouptut:"
+result_message="${result_message} ${BUILD_URL}consoleFull"
+
+post_message "$result_message"
+
+exit $test_result
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index f5c2bfb697c81..44775ea479ece 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -428,11 +428,11 @@ def launch_cluster(conn, opts, cluster_name):
for master in master_nodes:
master.add_tag(
key='Name',
- value='spark-{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id))
+ value='{cn}-master-{iid}'.format(cn=cluster_name, iid=master.id))
for slave in slave_nodes:
slave.add_tag(
key='Name',
- value='spark-{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id))
+ value='{cn}-slave-{iid}'.format(cn=cluster_name, iid=slave.id))
# Return all the instances
return (master_nodes, slave_nodes)
@@ -699,6 +699,7 @@ def ssh(host, opts, command):
time.sleep(30)
tries = tries + 1
+
# Backported from Python 2.7 for compatiblity with 2.6 (See SPARK-1990)
def _check_output(*popenargs, **kwargs):
if 'stdout' in kwargs:
diff --git a/examples/pom.xml b/examples/pom.xml
index 4f6d7fdb87d47..bd1c387c2eb91 100644
--- a/examples/pom.xml
+++ b/examples/pom.xml
@@ -27,6 +27,9 @@
org.apache.spark
spark-examples_2.10
+
+ examples
+
jar
Spark Project Examples
http://spark.apache.org/
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
index 4d28e0aad6597..79cfedf332436 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkKMeans.scala
@@ -17,8 +17,6 @@
package org.apache.spark.examples
-import java.util.Random
-
import breeze.linalg.{Vector, DenseVector, squaredDistance}
import org.apache.spark.{SparkConf, SparkContext}
@@ -28,15 +26,12 @@ import org.apache.spark.SparkContext._
* K-means clustering.
*/
object SparkKMeans {
- val R = 1000 // Scaling factor
- val rand = new Random(42)
def parseVector(line: String): Vector[Double] = {
DenseVector(line.split(' ').map(_.toDouble))
}
def closestPoint(p: Vector[Double], centers: Array[Vector[Double]]): Int = {
- var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity
diff --git a/external/flume/pom.xml b/external/flume/pom.xml
index c1f581967777b..61a6aff543aed 100644
--- a/external/flume/pom.xml
+++ b/external/flume/pom.xml
@@ -27,6 +27,9 @@
org.apache.spark
spark-streaming-flume_2.10
+
+ streaming-flume
+
jar
Spark Project External Flume
http://spark.apache.org/
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index ed35e34ad45ab..07ae88febf916 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -20,6 +20,7 @@ package org.apache.spark.streaming.flume
import java.net.InetSocketAddress
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
+import java.util.concurrent.Executors
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
@@ -29,24 +30,32 @@ import org.apache.flume.source.avro.AvroFlumeEvent
import org.apache.flume.source.avro.Status
import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
-
+import org.apache.spark.Logging
import org.apache.spark.util.Utils
import org.apache.spark.storage.StorageLevel
-import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream._
-import org.apache.spark.Logging
+import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.receiver.Receiver
+import org.jboss.netty.channel.ChannelPipelineFactory
+import org.jboss.netty.channel.Channels
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.ChannelFactory
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
+import org.jboss.netty.handler.codec.compression._
+import org.jboss.netty.handler.execution.ExecutionHandler
+
private[streaming]
class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
- storageLevel: StorageLevel
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
) extends ReceiverInputDStream[SparkFlumeEvent](ssc_) {
override def getReceiver(): Receiver[SparkFlumeEvent] = {
- new FlumeReceiver(host, port, storageLevel)
+ new FlumeReceiver(host, port, storageLevel, enableDecompression)
}
}
@@ -134,22 +143,71 @@ private[streaming]
class FlumeReceiver(
host: String,
port: Int,
- storageLevel: StorageLevel
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
) extends Receiver[SparkFlumeEvent](storageLevel) with Logging {
lazy val responder = new SpecificResponder(
classOf[AvroSourceProtocol], new FlumeEventServer(this))
- lazy val server = new NettyServer(responder, new InetSocketAddress(host, port))
+ var server: NettyServer = null
+
+ private def initServer() = {
+ if (enableDecompression) {
+ val channelFactory = new NioServerSocketChannelFactory
+ (Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
+ val channelPipelieFactory = new CompressionChannelPipelineFactory()
+
+ new NettyServer(
+ responder,
+ new InetSocketAddress(host, port),
+ channelFactory,
+ channelPipelieFactory,
+ null)
+ } else {
+ new NettyServer(responder, new InetSocketAddress(host, port))
+ }
+ }
def onStart() {
- server.start()
+ synchronized {
+ if (server == null) {
+ server = initServer()
+ server.start()
+ } else {
+ logWarning("Flume receiver being asked to start more then once with out close")
+ }
+ }
logInfo("Flume receiver started")
}
def onStop() {
- server.close()
+ synchronized {
+ if (server != null) {
+ server.close()
+ server = null
+ }
+ }
logInfo("Flume receiver stopped")
}
override def preferredLocation = Some(host)
+
+ /** A Netty Pipeline factory that will decompress incoming data from
+ * and the Netty client and compress data going back to the client.
+ *
+ * The compression on the return is required because Flume requires
+ * a successful response to indicate it can remove the event/batch
+ * from the configured channel
+ */
+ private[streaming]
+ class CompressionChannelPipelineFactory extends ChannelPipelineFactory {
+
+ def getPipeline() = {
+ val pipeline = Channels.pipeline()
+ val encoder = new ZlibEncoder(6)
+ pipeline.addFirst("deflater", encoder)
+ pipeline.addFirst("inflater", new ZlibDecoder())
+ pipeline
+ }
+}
}
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
index 499f3560ef768..716db9fa76031 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala
@@ -36,7 +36,27 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[SparkFlumeEvent] = {
- val inputStream = new FlumeInputDStream[SparkFlumeEvent](ssc, hostname, port, storageLevel)
+ createStream(ssc, hostname, port, storageLevel, false)
+ }
+
+ /**
+ * Create a input stream from a Flume source.
+ * @param ssc StreamingContext object
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param enableDecompression should netty server decompress input stream
+ */
+ def createStream (
+ ssc: StreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): ReceiverInputDStream[SparkFlumeEvent] = {
+ val inputStream = new FlumeInputDStream[SparkFlumeEvent](
+ ssc, hostname, port, storageLevel, enableDecompression)
+
inputStream
}
@@ -66,6 +86,23 @@ object FlumeUtils {
port: Int,
storageLevel: StorageLevel
): JavaReceiverInputDStream[SparkFlumeEvent] = {
- createStream(jssc.ssc, hostname, port, storageLevel)
+ createStream(jssc.ssc, hostname, port, storageLevel, false)
+ }
+
+ /**
+ * Creates a input stream from a Flume source.
+ * @param hostname Hostname of the slave machine to which the flume data will be sent
+ * @param port Port of the slave machine to which the flume data will be sent
+ * @param storageLevel Storage level to use for storing the received objects
+ * @param enableDecompression should netty server decompress input stream
+ */
+ def createStream(
+ jssc: JavaStreamingContext,
+ hostname: String,
+ port: Int,
+ storageLevel: StorageLevel,
+ enableDecompression: Boolean
+ ): JavaReceiverInputDStream[SparkFlumeEvent] = {
+ createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
}
}
diff --git a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
index e0ad4f1015205..3b5e0c7746b2c 100644
--- a/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
+++ b/external/flume/src/test/java/org/apache/spark/streaming/flume/JavaFlumeStreamSuite.java
@@ -30,5 +30,7 @@ public void testFlumeStream() {
JavaReceiverInputDStream test1 = FlumeUtils.createStream(ssc, "localhost", 12345);
JavaReceiverInputDStream test2 = FlumeUtils.createStream(ssc, "localhost", 12345,
StorageLevel.MEMORY_AND_DISK_SER_2());
+ JavaReceiverInputDStream test3 = FlumeUtils.createStream(ssc, "localhost", 12345,
+ StorageLevel.MEMORY_AND_DISK_SER_2(), false);
}
}
diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
index dd287d0ef90a0..73dffef953309 100644
--- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
+++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala
@@ -33,15 +33,26 @@ import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuite
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
-class FlumeStreamSuite extends TestSuiteBase {
+import org.jboss.netty.channel.ChannelPipeline
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
+import org.jboss.netty.channel.socket.SocketChannel
+import org.jboss.netty.handler.codec.compression._
- val testPort = 9999
+class FlumeStreamSuite extends TestSuiteBase {
test("flume input stream") {
+ runFlumeStreamTest(false, 9998)
+ }
+
+ test("flume input compressed stream") {
+ runFlumeStreamTest(true, 9997)
+ }
+
+ def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
- FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK)
+ FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
@@ -52,8 +63,17 @@ class FlumeStreamSuite extends TestSuiteBase {
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
- val client = SpecificRequestor.getClient(
- classOf[AvroSourceProtocol], transceiver)
+ var client: AvroSourceProtocol = null;
+
+ if (enableDecompression) {
+ client = SpecificRequestor.getClient(
+ classOf[AvroSourceProtocol],
+ new NettyTransceiver(new InetSocketAddress("localhost", testPort),
+ new CompressionChannelFactory(6)));
+ } else {
+ client = SpecificRequestor.getClient(
+ classOf[AvroSourceProtocol], transceiver)
+ }
for (i <- 0 until input.size) {
val event = new AvroFlumeEvent
@@ -64,6 +84,8 @@ class FlumeStreamSuite extends TestSuiteBase {
clock.addToTime(batchDuration.milliseconds)
}
+ Thread.sleep(1000)
+
val startTime = System.currentTimeMillis()
while (outputBuffer.size < input.size && System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
logInfo("output.size = " + outputBuffer.size + ", input.size = " + input.size)
@@ -85,4 +107,13 @@ class FlumeStreamSuite extends TestSuiteBase {
assert(outputBuffer(i).head.event.getHeaders.get("test") === "header")
}
}
+
+ class CompressionChannelFactory(compressionLevel: Int) extends NioClientSocketChannelFactory {
+ override def newChannel(pipeline:ChannelPipeline) : SocketChannel = {
+ var encoder : ZlibEncoder = new ZlibEncoder(compressionLevel);
+ pipeline.addFirst("deflater", encoder);
+ pipeline.addFirst("inflater", new ZlibDecoder());
+ super.newChannel(pipeline);
+ }
+ }
}
diff --git a/external/kafka/pom.xml b/external/kafka/pom.xml
index d014a7aad0fca..4762c50685a93 100644
--- a/external/kafka/pom.xml
+++ b/external/kafka/pom.xml
@@ -27,6 +27,9 @@
org.apache.spark
spark-streaming-kafka_2.10
+
+ streaming-kafka
+
jar
Spark Project External Kafka
http://spark.apache.org/
diff --git a/external/mqtt/pom.xml b/external/mqtt/pom.xml
index 4980208cba3b0..32c530e600ce0 100644
--- a/external/mqtt/pom.xml
+++ b/external/mqtt/pom.xml
@@ -27,6 +27,9 @@
org.apache.spark
spark-streaming-mqtt_2.10
+
+ streaming-mqtt
+
jar
Spark Project External MQTT
http://spark.apache.org/
diff --git a/external/twitter/pom.xml b/external/twitter/pom.xml
index 7073bd4404d9c..637adb0f00da0 100644
--- a/external/twitter/pom.xml
+++ b/external/twitter/pom.xml
@@ -27,6 +27,9 @@
org.apache.spark
spark-streaming-twitter_2.10
+
+ streaming-twitter
+
jar
Spark Project External Twitter
http://spark.apache.org/
diff --git a/external/zeromq/pom.xml b/external/zeromq/pom.xml
index cf306e0dca8bd..e4d758a04a4cd 100644
--- a/external/zeromq/pom.xml
+++ b/external/zeromq/pom.xml
@@ -27,6 +27,9 @@
org.apache.spark
spark-streaming-zeromq_2.10
+
+ streaming-zeromq
+
jar
Spark Project External ZeroMQ
http://spark.apache.org/
diff --git a/extras/java8-tests/pom.xml b/extras/java8-tests/pom.xml
index 955ec1a8c3033..3eade411b38b7 100644
--- a/extras/java8-tests/pom.xml
+++ b/extras/java8-tests/pom.xml
@@ -28,7 +28,11 @@
java8-tests_2.10
pom
Spark Project Java8 Tests POM
-
+
+
+ java8-tests
+
+
org.apache.spark
diff --git a/extras/spark-ganglia-lgpl/pom.xml b/extras/spark-ganglia-lgpl/pom.xml
index 22ea330b4374d..a5b162a0482e4 100644
--- a/extras/spark-ganglia-lgpl/pom.xml
+++ b/extras/spark-ganglia-lgpl/pom.xml
@@ -29,7 +29,11 @@
spark-ganglia-lgpl_2.10
jar
Spark Ganglia Integration
-
+
+
+ ganglia-lgpl
+
+
org.apache.spark
diff --git a/graphx/pom.xml b/graphx/pom.xml
index 7d5d83e7f3bb9..7e3bcf29dcfbc 100644
--- a/graphx/pom.xml
+++ b/graphx/pom.xml
@@ -27,6 +27,9 @@
org.apache.spark
spark-graphx_2.10
+
+ graphx
+
jar
Spark Project GraphX
http://spark.apache.org/
diff --git a/mllib/pom.xml b/mllib/pom.xml
index b622f96dd7901..87afd7ecf2dd4 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -27,6 +27,9 @@
org.apache.spark
spark-mllib_2.10
+
+ mllib
+
jar
Spark Project ML Library
http://spark.apache.org/
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
new file mode 100644
index 0000000000000..3515461b52493
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.linalg
+
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV}
+import com.github.fommil.netlib.ARPACK
+import org.netlib.util.{intW, doubleW}
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Compute eigen-decomposition.
+ */
+@Experimental
+private[mllib] object EigenValueDecomposition {
+ /**
+ * Compute the leading k eigenvalues and eigenvectors on a symmetric square matrix using ARPACK.
+ * The caller needs to ensure that the input matrix is real symmetric. This function requires
+ * memory for `n*(4*k+4)` doubles.
+ *
+ * @param mul a function that multiplies the symmetric matrix with a DenseVector.
+ * @param n dimension of the square matrix (maximum Int.MaxValue).
+ * @param k number of leading eigenvalues required, 0 < k < n.
+ * @param tol tolerance of the eigs computation.
+ * @param maxIterations the maximum number of Arnoldi update iterations.
+ * @return a dense vector of eigenvalues in descending order and a dense matrix of eigenvectors
+ * (columns of the matrix).
+ * @note The number of computed eigenvalues might be smaller than k when some Ritz values do not
+ * satisfy the convergence criterion specified by tol (see ARPACK Users Guide, Chapter 4.6
+ * for more details). The maximum number of Arnoldi update iterations is set to 300 in this
+ * function.
+ */
+ private[mllib] def symmetricEigs(
+ mul: BDV[Double] => BDV[Double],
+ n: Int,
+ k: Int,
+ tol: Double,
+ maxIterations: Int): (BDV[Double], BDM[Double]) = {
+ // TODO: remove this function and use eigs in breeze when switching breeze version
+ require(n > k, s"Number of required eigenvalues $k must be smaller than matrix dimension $n")
+
+ val arpack = ARPACK.getInstance()
+
+ // tolerance used in stopping criterion
+ val tolW = new doubleW(tol)
+ // number of desired eigenvalues, 0 < nev < n
+ val nev = new intW(k)
+ // nev Lanczos vectors are generated in the first iteration
+ // ncv-nev Lanczos vectors are generated in each subsequent iteration
+ // ncv must be smaller than n
+ val ncv = math.min(2 * k, n)
+
+ // "I" for standard eigenvalue problem, "G" for generalized eigenvalue problem
+ val bmat = "I"
+ // "LM" : compute the NEV largest (in magnitude) eigenvalues
+ val which = "LM"
+
+ var iparam = new Array[Int](11)
+ // use exact shift in each iteration
+ iparam(0) = 1
+ // maximum number of Arnoldi update iterations, or the actual number of iterations on output
+ iparam(2) = maxIterations
+ // Mode 1: A*x = lambda*x, A symmetric
+ iparam(6) = 1
+
+ var ido = new intW(0)
+ var info = new intW(0)
+ var resid = new Array[Double](n)
+ var v = new Array[Double](n * ncv)
+ var workd = new Array[Double](n * 3)
+ var workl = new Array[Double](ncv * (ncv + 8))
+ var ipntr = new Array[Int](11)
+
+ // call ARPACK's reverse communication, first iteration with ido = 0
+ arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr, workd,
+ workl, workl.length, info)
+
+ val w = BDV(workd)
+
+ // ido = 99 : done flag in reverse communication
+ while (ido.`val` != 99) {
+ if (ido.`val` != -1 && ido.`val` != 1) {
+ throw new IllegalStateException("ARPACK returns ido = " + ido.`val` +
+ " This flag is not compatible with Mode 1: A*x = lambda*x, A symmetric.")
+ }
+ // multiply working vector with the matrix
+ val inputOffset = ipntr(0) - 1
+ val outputOffset = ipntr(1) - 1
+ val x = w.slice(inputOffset, inputOffset + n)
+ val y = w.slice(outputOffset, outputOffset + n)
+ y := mul(x)
+ // call ARPACK's reverse communication
+ arpack.dsaupd(ido, bmat, n, which, nev.`val`, tolW, resid, ncv, v, n, iparam, ipntr,
+ workd, workl, workl.length, info)
+ }
+
+ if (info.`val` != 0) {
+ info.`val` match {
+ case 1 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` +
+ " Maximum number of iterations taken. (Refer ARPACK user guide for details)")
+ case 2 => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` +
+ " No shifts could be applied. Try to increase NCV. " +
+ "(Refer ARPACK user guide for details)")
+ case _ => throw new IllegalStateException("ARPACK returns non-zero info = " + info.`val` +
+ " Please refer ARPACK user guide for error message.")
+ }
+ }
+
+ val d = new Array[Double](nev.`val`)
+ val select = new Array[Boolean](ncv)
+ // copy the Ritz vectors
+ val z = java.util.Arrays.copyOfRange(v, 0, nev.`val` * n)
+
+ // call ARPACK's post-processing for eigenvectors
+ arpack.dseupd(true, "A", select, d, z, n, 0.0, bmat, n, which, nev, tol, resid, ncv, v, n,
+ iparam, ipntr, workd, workl, workl.length, info)
+
+ // number of computed eigenvalues, might be smaller than k
+ val computed = iparam(4)
+
+ val eigenPairs = java.util.Arrays.copyOfRange(d, 0, computed).zipWithIndex.map { r =>
+ (r._1, java.util.Arrays.copyOfRange(z, r._2 * n, r._2 * n + n))
+ }
+
+ // sort the eigen-pairs in descending order
+ val sortedEigenPairs = eigenPairs.sortBy(- _._1)
+
+ // copy eigenvectors in descending order of eigenvalues
+ val sortedU = BDM.zeros[Double](n, computed)
+ sortedEigenPairs.zipWithIndex.foreach { r =>
+ val b = r._2 * n
+ var i = 0
+ while (i < n) {
+ sortedU.data(b + i) = r._1._2(i)
+ i += 1
+ }
+ }
+
+ (BDV[Double](sortedEigenPairs.map(_._1)), sortedU)
+ }
+}
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
index 695e03b736baf..99cb6516e065c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala
@@ -17,9 +17,10 @@
package org.apache.spark.mllib.linalg.distributed
-import java.util
+import java.util.Arrays
-import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
+import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, SparseVector => BSV}
+import breeze.linalg.{svd => brzSvd, axpy => brzAxpy}
import breeze.numerics.{sqrt => brzSqrt}
import com.github.fommil.netlib.BLAS.{getInstance => blas}
@@ -34,7 +35,7 @@ import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
* [[org.apache.spark.mllib.stat.MultivariateStatisticalSummary]]
* together with add() and merge() function.
* A numerically stable algorithm is implemented to compute sample mean and variance:
- *[[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
+ * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
* Zero elements (including explicit zero values) are skipped when calling add() and merge(),
* to have time complexity O(nnz) instead of O(n) for each column.
*/
@@ -200,6 +201,26 @@ class RowMatrix(
nRows
}
+ /**
+ * Multiplies the Gramian matrix `A^T A` by a dense vector on the right without computing `A^T A`.
+ *
+ * @param v a dense vector whose length must match the number of columns of this matrix
+ * @return a dense vector representing the product
+ */
+ private[mllib] def multiplyGramianMatrixBy(v: BDV[Double]): BDV[Double] = {
+ val n = numCols().toInt
+ val vbr = rows.context.broadcast(v)
+ rows.aggregate(BDV.zeros[Double](n))(
+ seqOp = (U, r) => {
+ val rBrz = r.toBreeze
+ val a = rBrz.dot(vbr.value)
+ brzAxpy(a, rBrz, U.asInstanceOf[BV[Double]])
+ U
+ },
+ combOp = (U1, U2) => U1 += U2
+ )
+ }
+
/**
* Computes the Gramian matrix `A^T A`.
*/
@@ -220,50 +241,135 @@ class RowMatrix(
}
/**
- * Computes the singular value decomposition of this matrix.
- * Denote this matrix by A (m x n), this will compute matrices U, S, V such that A = U * S * V'.
+ * Computes singular value decomposition of this matrix. Denote this matrix by A (m x n). This
+ * will compute matrices U, S, V such that A ~= U * S * V', where S contains the leading k
+ * singular values, U and V contain the corresponding singular vectors.
*
- * There is no restriction on m, but we require `n^2` doubles to fit in memory.
- * Further, n should be less than m.
-
- * The decomposition is computed by first computing A'A = V S^2 V',
- * computing svd locally on that (since n x n is small), from which we recover S and V.
- * Then we compute U via easy matrix multiplication as U = A * (V * S^-1).
- * Note that this approach requires `O(n^3)` time on the master node.
+ * At most k largest non-zero singular values and associated vectors are returned. If there are k
+ * such values, then the dimensions of the return will be:
+ * - U is a RowMatrix of size m x k that satisfies U' * U = eye(k),
+ * - s is a Vector of size k, holding the singular values in descending order,
+ * - V is a Matrix of size n x k that satisfies V' * V = eye(k).
+ *
+ * We assume n is smaller than m. The singular values and the right singular vectors are derived
+ * from the eigenvalues and the eigenvectors of the Gramian matrix A' * A. U, the matrix
+ * storing the right singular vectors, is computed via matrix multiplication as
+ * U = A * (V * S^-1^), if requested by user. The actual method to use is determined
+ * automatically based on the cost:
+ * - If n is small (n < 100) or k is large compared with n (k > n / 2), we compute the Gramian
+ * matrix first and then compute its top eigenvalues and eigenvectors locally on the driver.
+ * This requires a single pass with O(n^2^) storage on each executor and on the driver, and
+ * O(n^2^ k) time on the driver.
+ * - Otherwise, we compute (A' * A) * v in a distributive way and send it to ARPACK's DSAUPD to
+ * compute (A' * A)'s top eigenvalues and eigenvectors on the driver node. This requires O(k)
+ * passes, O(n) storage on each executor, and O(n k) storage on the driver.
*
- * At most k largest non-zero singular values and associated vectors are returned.
- * If there are k such values, then the dimensions of the return will be:
+ * Several internal parameters are set to default values. The reciprocal condition number rCond
+ * is set to 1e-9. All singular values smaller than rCond * sigma(0) are treated as zeros, where
+ * sigma(0) is the largest singular value. The maximum number of Arnoldi update iterations for
+ * ARPACK is set to 300 or k * 3, whichever is larger. The numerical tolerance for ARPACK's
+ * eigen-decomposition is set to 1e-10.
*
- * U is a RowMatrix of size m x k that satisfies U'U = eye(k),
- * s is a Vector of size k, holding the singular values in descending order,
- * and V is a Matrix of size n x k that satisfies V'V = eye(k).
+ * @note The conditions that decide which method to use internally and the default parameters are
+ * subject to change.
*
- * @param k number of singular values to keep. We might return less than k if there are
- * numerically zero singular values. See rCond.
+ * @param k number of leading singular values to keep (0 < k <= n). It might return less than k if
+ * there are numerically zero singular values or there are not enough Ritz values
+ * converged before the maximum number of Arnoldi update iterations is reached (in case
+ * that matrix A is ill-conditioned).
* @param computeU whether to compute U
* @param rCond the reciprocal condition number. All singular values smaller than rCond * sigma(0)
* are treated as zero, where sigma(0) is the largest singular value.
- * @return SingularValueDecomposition(U, s, V)
+ * @return SingularValueDecomposition(U, s, V). U = null if computeU = false.
*/
def computeSVD(
k: Int,
computeU: Boolean = false,
rCond: Double = 1e-9): SingularValueDecomposition[RowMatrix, Matrix] = {
+ // maximum number of Arnoldi update iterations for invoking ARPACK
+ val maxIter = math.max(300, k * 3)
+ // numerical tolerance for invoking ARPACK
+ val tol = 1e-10
+ computeSVD(k, computeU, rCond, maxIter, tol, "auto")
+ }
+
+ /**
+ * The actual SVD implementation, visible for testing.
+ *
+ * @param k number of leading singular values to keep (0 < k <= n)
+ * @param computeU whether to compute U
+ * @param rCond the reciprocal condition number
+ * @param maxIter max number of iterations (if ARPACK is used)
+ * @param tol termination tolerance (if ARPACK is used)
+ * @param mode computation mode (auto: determine automatically which mode to use,
+ * local-svd: compute gram matrix and computes its full SVD locally,
+ * local-eigs: compute gram matrix and computes its top eigenvalues locally,
+ * dist-eigs: compute the top eigenvalues of the gram matrix distributively)
+ * @return SingularValueDecomposition(U, s, V). U = null if computeU = false.
+ */
+ private[mllib] def computeSVD(
+ k: Int,
+ computeU: Boolean,
+ rCond: Double,
+ maxIter: Int,
+ tol: Double,
+ mode: String): SingularValueDecomposition[RowMatrix, Matrix] = {
val n = numCols().toInt
- require(k > 0 && k <= n, s"Request up to n singular values k=$k n=$n.")
+ require(k > 0 && k <= n, s"Request up to n singular values but got k=$k and n=$n.")
- val G = computeGramianMatrix()
+ object SVDMode extends Enumeration {
+ val LocalARPACK, LocalLAPACK, DistARPACK = Value
+ }
+
+ val computeMode = mode match {
+ case "auto" =>
+ // TODO: The conditions below are not fully tested.
+ if (n < 100 || k > n / 2) {
+ // If n is small or k is large compared with n, we better compute the Gramian matrix first
+ // and then compute its eigenvalues locally, instead of making multiple passes.
+ if (k < n / 3) {
+ SVDMode.LocalARPACK
+ } else {
+ SVDMode.LocalLAPACK
+ }
+ } else {
+ // If k is small compared with n, we use ARPACK with distributed multiplication.
+ SVDMode.DistARPACK
+ }
+ case "local-svd" => SVDMode.LocalLAPACK
+ case "local-eigs" => SVDMode.LocalARPACK
+ case "dist-eigs" => SVDMode.DistARPACK
+ case _ => throw new IllegalArgumentException(s"Do not support mode $mode.")
+ }
+
+ // Compute the eigen-decomposition of A' * A.
+ val (sigmaSquares: BDV[Double], u: BDM[Double]) = computeMode match {
+ case SVDMode.LocalARPACK =>
+ require(k < n, s"k must be smaller than n in local-eigs mode but got k=$k and n=$n.")
+ val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
+ EigenValueDecomposition.symmetricEigs(v => G * v, n, k, tol, maxIter)
+ case SVDMode.LocalLAPACK =>
+ val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]]
+ val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G)
+ (sigmaSquaresFull, uFull)
+ case SVDMode.DistARPACK =>
+ require(k < n, s"k must be smaller than n in dist-eigs mode but got k=$k and n=$n.")
+ EigenValueDecomposition.symmetricEigs(multiplyGramianMatrixBy, n, k, tol, maxIter)
+ }
- // TODO: Use sparse SVD instead.
- val (u: BDM[Double], sigmaSquares: BDV[Double], v: BDM[Double]) =
- brzSvd(G.toBreeze.asInstanceOf[BDM[Double]])
val sigmas: BDV[Double] = brzSqrt(sigmaSquares)
- // Determine effective rank.
+ // Determine the effective rank.
val sigma0 = sigmas(0)
val threshold = rCond * sigma0
var i = 0
- while (i < k && sigmas(i) >= threshold) {
+ // sigmas might have a length smaller than k, if some Ritz values do not satisfy the convergence
+ // criterion specified by tol after max number of iterations.
+ // Thus use i < min(k, sigmas.length) instead of i < k.
+ if (sigmas.length < k) {
+ logWarning(s"Requested $k singular values but only found ${sigmas.length} converged.")
+ }
+ while (i < math.min(k, sigmas.length) && sigmas(i) >= threshold) {
i += 1
}
val sk = i
@@ -272,12 +378,12 @@ class RowMatrix(
logWarning(s"Requested $k singular values but only found $sk nonzeros.")
}
- val s = Vectors.dense(util.Arrays.copyOfRange(sigmas.data, 0, sk))
- val V = Matrices.dense(n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
+ val s = Vectors.dense(Arrays.copyOfRange(sigmas.data, 0, sk))
+ val V = Matrices.dense(n, sk, Arrays.copyOfRange(u.data, 0, n * sk))
if (computeU) {
// N = Vk * Sk^{-1}
- val N = new BDM[Double](n, sk, util.Arrays.copyOfRange(u.data, 0, n * sk))
+ val N = new BDM[Double](n, sk, Arrays.copyOfRange(u.data, 0, n * sk))
var i = 0
var j = 0
while (j < sk) {
@@ -364,7 +470,7 @@ class RowMatrix(
if (k == n) {
Matrices.dense(n, k, u.data)
} else {
- Matrices.dense(n, k, util.Arrays.copyOfRange(u.data, 0, n * k))
+ Matrices.dense(n, k, Arrays.copyOfRange(u.data, 0, n * k))
}
}
@@ -390,15 +496,24 @@ class RowMatrix(
*/
def multiply(B: Matrix): RowMatrix = {
val n = numCols().toInt
+ val k = B.numCols
require(n == B.numRows, s"Dimension mismatch: $n vs ${B.numRows}")
require(B.isInstanceOf[DenseMatrix],
s"Only support dense matrix at this time but found ${B.getClass.getName}.")
- val Bb = rows.context.broadcast(B)
+ val Bb = rows.context.broadcast(B.toBreeze.asInstanceOf[BDM[Double]].toDenseVector.toArray)
val AB = rows.mapPartitions({ iter =>
- val Bi = Bb.value.toBreeze.asInstanceOf[BDM[Double]]
- iter.map(v => Vectors.fromBreeze(Bi.t * v.toBreeze))
+ val Bi = Bb.value
+ iter.map(row => {
+ val v = BDV.zeros[Double](k)
+ var i = 0
+ while (i < k) {
+ v(i) = row.toBreeze.dot(new BDV(Bi, i * n, 1, n))
+ i += 1
+ }
+ Vectors.fromBreeze(v)
+ })
}, preservesPartitioning = true)
new RowMatrix(AB, nRows, B.numCols)
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
index 3b13e52a7b445..74d5d7ba10960 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
@@ -807,10 +807,10 @@ object DecisionTree extends Serializable with Logging {
// calculating right node aggregate for a split as a sum of right node aggregate of a
// higher split and the right bin aggregate of a bin where the split is a low split
rightNodeAgg(featureIndex)(2 * (numBins - 2 - splitIndex)) =
- binData(shift + (2 *(numBins - 2 - splitIndex))) +
+ binData(shift + (2 *(numBins - 1 - splitIndex))) +
rightNodeAgg(featureIndex)(2 * (numBins - 1 - splitIndex))
rightNodeAgg(featureIndex)(2 * (numBins - 2 - splitIndex) + 1) =
- binData(shift + (2* (numBins - 2 - splitIndex) + 1)) +
+ binData(shift + (2* (numBins - 1 - splitIndex) + 1)) +
rightNodeAgg(featureIndex)(2 * (numBins - 1 - splitIndex) + 1)
splitIndex += 1
@@ -855,13 +855,13 @@ object DecisionTree extends Serializable with Logging {
// calculating right node aggregate for a split as a sum of right node aggregate of a
// higher split and the right bin aggregate of a bin where the split is a low split
rightNodeAgg(featureIndex)(3 * (numBins - 2 - splitIndex)) =
- binData(shift + (3 * (numBins - 2 - splitIndex))) +
+ binData(shift + (3 * (numBins - 1 - splitIndex))) +
rightNodeAgg(featureIndex)(3 * (numBins - 1 - splitIndex))
rightNodeAgg(featureIndex)(3 * (numBins - 2 - splitIndex) + 1) =
- binData(shift + (3 * (numBins - 2 - splitIndex) + 1)) +
+ binData(shift + (3 * (numBins - 1 - splitIndex) + 1)) +
rightNodeAgg(featureIndex)(3 * (numBins - 1 - splitIndex) + 1)
rightNodeAgg(featureIndex)(3 * (numBins - 2 - splitIndex) + 2) =
- binData(shift + (3 * (numBins - 2 - splitIndex) + 2)) +
+ binData(shift + (3 * (numBins - 1 - splitIndex) + 2)) +
rightNodeAgg(featureIndex)(3 * (numBins - 1 - splitIndex) + 2)
splitIndex += 1
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
index c9f9acf4c1335..a961f89456a18 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala
@@ -96,37 +96,44 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext {
test("svd of a full-rank matrix") {
for (mat <- Seq(denseMat, sparseMat)) {
- val localMat = mat.toBreeze()
- val (localU, localSigma, localVt) = brzSvd(localMat)
- val localV: BDM[Double] = localVt.t.toDenseMatrix
- for (k <- 1 to n) {
- val svd = mat.computeSVD(k, computeU = true)
- val U = svd.U
- val s = svd.s
- val V = svd.V
- assert(U.numRows() === m)
- assert(U.numCols() === k)
- assert(s.size === k)
- assert(V.numRows === n)
- assert(V.numCols === k)
- assertColumnEqualUpToSign(U.toBreeze(), localU, k)
- assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
- assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
+ for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
+ val localMat = mat.toBreeze()
+ val (localU, localSigma, localVt) = brzSvd(localMat)
+ val localV: BDM[Double] = localVt.t.toDenseMatrix
+ for (k <- 1 to n) {
+ val skip = (mode == "local-eigs" || mode == "dist-eigs") && k == n
+ if (!skip) {
+ val svd = mat.computeSVD(k, computeU = true, 1e-9, 300, 1e-10, mode)
+ val U = svd.U
+ val s = svd.s
+ val V = svd.V
+ assert(U.numRows() === m)
+ assert(U.numCols() === k)
+ assert(s.size === k)
+ assert(V.numRows === n)
+ assert(V.numCols === k)
+ assertColumnEqualUpToSign(U.toBreeze(), localU, k)
+ assertColumnEqualUpToSign(V.toBreeze.asInstanceOf[BDM[Double]], localV, k)
+ assert(closeToZero(s.toBreeze.asInstanceOf[BDV[Double]] - localSigma(0 until k)))
+ }
+ }
+ val svdWithoutU = mat.computeSVD(1, computeU = false, 1e-9, 300, 1e-10, mode)
+ assert(svdWithoutU.U === null)
}
- val svdWithoutU = mat.computeSVD(n)
- assert(svdWithoutU.U === null)
}
}
test("svd of a low-rank matrix") {
- val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0)), 2)
- val mat = new RowMatrix(rows, 4, 2)
- val svd = mat.computeSVD(2, computeU = true)
- assert(svd.s.size === 1, "should not return zero singular values")
- assert(svd.U.numRows() === 4)
- assert(svd.U.numCols() === 1)
- assert(svd.V.numRows === 2)
- assert(svd.V.numCols === 1)
+ val rows = sc.parallelize(Array.fill(4)(Vectors.dense(1.0, 1.0, 1.0)), 2)
+ val mat = new RowMatrix(rows, 4, 3)
+ for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) {
+ val svd = mat.computeSVD(2, computeU = true, 1e-6, 300, 1e-10, mode)
+ assert(svd.s.size === 1, s"should not return zero singular values but got ${svd.s}")
+ assert(svd.U.numRows() === 4)
+ assert(svd.U.numCols() === 1)
+ assert(svd.V.numRows === 3)
+ assert(svd.V.numCols === 1)
+ }
}
def closeToZero(G: BDM[Double]): Boolean = {
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
index 35e92d71dc63f..bcb11876b8f4f 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
@@ -253,8 +253,8 @@ class DecisionTreeSuite extends FunSuite with LocalSparkContext {
val stats = bestSplits(0)._2
assert(stats.gain > 0)
- assert(stats.predict > 0.4)
- assert(stats.predict < 0.5)
+ assert(stats.predict > 0.5)
+ assert(stats.predict < 0.7)
assert(stats.impurity > 0.2)
}
@@ -280,8 +280,8 @@ class DecisionTreeSuite extends FunSuite with LocalSparkContext {
val stats = bestSplits(0)._2
assert(stats.gain > 0)
- assert(stats.predict > 0.4)
- assert(stats.predict < 0.5)
+ assert(stats.predict > 0.5)
+ assert(stats.predict < 0.7)
assert(stats.impurity > 0.2)
}
diff --git a/pom.xml b/pom.xml
index 05f76d566e9d1..fa80707d0929c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,7 +110,7 @@
UTF-8
1.6
-
+ spark
2.10.4
2.10
0.18.1
@@ -535,6 +535,10 @@
org.mortbay.jetty
servlet-api-2.5
+
+ javax.servlet
+ servlet-api
+
junit
junit
@@ -618,6 +622,10 @@
hadoop-yarn-api
${yarn.version}
+
+ javax.servlet
+ servlet-api
+
asm
asm
diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala
index bb2d73741c3bf..034ba6a7bf50f 100644
--- a/project/MimaBuild.scala
+++ b/project/MimaBuild.scala
@@ -15,13 +15,16 @@
* limitations under the License.
*/
+import sbt._
+import sbt.Keys.version
+
import com.typesafe.tools.mima.core._
import com.typesafe.tools.mima.core.MissingClassProblem
import com.typesafe.tools.mima.core.MissingTypesProblem
import com.typesafe.tools.mima.core.ProblemFilters._
import com.typesafe.tools.mima.plugin.MimaKeys.{binaryIssueFilters, previousArtifact}
import com.typesafe.tools.mima.plugin.MimaPlugin.mimaDefaultSettings
-import sbt._
+
object MimaBuild {
@@ -53,7 +56,7 @@ object MimaBuild {
excludePackage("org.apache.spark." + packageName)
}
- def ignoredABIProblems(base: File) = {
+ def ignoredABIProblems(base: File, currentSparkVersion: String) = {
// Excludes placed here will be used for all Spark versions
val defaultExcludes = Seq()
@@ -77,11 +80,16 @@ object MimaBuild {
}
defaultExcludes ++ ignoredClasses.flatMap(excludeClass) ++
- ignoredMembers.flatMap(excludeMember) ++ MimaExcludes.excludes
+ ignoredMembers.flatMap(excludeMember) ++ MimaExcludes.excludes(currentSparkVersion)
+ }
+
+ def mimaSettings(sparkHome: File, projectRef: ProjectRef) = {
+ val organization = "org.apache.spark"
+ val previousSparkVersion = "1.0.0"
+ val fullId = "spark-" + projectRef.project + "_2.10"
+ mimaDefaultSettings ++
+ Seq(previousArtifact := Some(organization % fullId % previousSparkVersion),
+ binaryIssueFilters ++= ignoredABIProblems(sparkHome, version.value))
}
- def mimaSettings(sparkHome: File) = mimaDefaultSettings ++ Seq(
- previousArtifact := None,
- binaryIssueFilters ++= ignoredABIProblems(sparkHome)
- )
}
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 1621833e124f5..3b7b87b80cda0 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -31,8 +31,8 @@ import com.typesafe.tools.mima.core._
* MimaBuild.excludeSparkClass("graphx.util.collection.GraphXPrimitiveKeyOpenHashMap")
*/
object MimaExcludes {
- val excludes =
- SparkBuild.SPARK_VERSION match {
+ def excludes(version: String) =
+ version match {
case v if v.startsWith("1.1") =>
Seq(
MimaBuild.excludeSparkPackage("deploy"),
@@ -64,6 +64,9 @@ object MimaExcludes {
"org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
+ "createZero$1")
) ++
+ Seq(
+ ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.streaming.flume.FlumeReceiver.this")
+ ) ++
Seq( // Ignore some private methods in ALS.
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.mllib.recommendation.ALS.org$apache$spark$mllib$recommendation$ALS$^dateFeatures"),
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 599714233c18f..b55c50560bb93 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -15,524 +15,159 @@
* limitations under the License.
*/
-import sbt._
-import sbt.Classpaths.publishTask
-import sbt.Keys._
-import sbtassembly.Plugin._
-import AssemblyKeys._
import scala.util.Properties
-import org.scalastyle.sbt.ScalastylePlugin.{Settings => ScalaStyleSettings}
-import com.typesafe.tools.mima.plugin.MimaKeys.previousArtifact
-import sbtunidoc.Plugin._
-import UnidocKeys._
-
import scala.collection.JavaConversions._
-// For Sonatype publishing
-// import com.jsuereth.pgp.sbtplugin.PgpKeys._
-
-object SparkBuild extends Build {
- val SPARK_VERSION = "1.1.0-SNAPSHOT"
- val SPARK_VERSION_SHORT = SPARK_VERSION.replaceAll("-SNAPSHOT", "")
-
- // Hadoop version to build against. For example, "1.0.4" for Apache releases, or
- // "2.0.0-mr1-cdh4.2.0" for Cloudera Hadoop. Note that these variables can be set
- // through the environment variables SPARK_HADOOP_VERSION and SPARK_YARN.
- val DEFAULT_HADOOP_VERSION = "1.0.4"
-
- // Whether the Hadoop version to build against is 2.2.x, or a variant of it. This can be set
- // through the SPARK_IS_NEW_HADOOP environment variable.
- val DEFAULT_IS_NEW_HADOOP = false
-
- val DEFAULT_YARN = false
-
- val DEFAULT_HIVE = false
-
- // HBase version; set as appropriate.
- val HBASE_VERSION = "0.94.6"
-
- // Target JVM version
- val SCALAC_JVM_VERSION = "jvm-1.6"
- val JAVAC_JVM_VERSION = "1.6"
-
- lazy val root = Project("root", file("."), settings = rootSettings) aggregate(allProjects: _*)
-
- lazy val core = Project("core", file("core"), settings = coreSettings)
-
- /** Following project only exists to pull previous artifacts of Spark for generating
- Mima ignores. For more information see: SPARK 2071 */
- lazy val oldDeps = Project("oldDeps", file("dev"), settings = oldDepsSettings)
-
- def replDependencies = Seq[ProjectReference](core, graphx, bagel, mllib, sql) ++ maybeHiveRef
-
- lazy val repl = Project("repl", file("repl"), settings = replSettings)
- .dependsOn(replDependencies.map(a => a: sbt.ClasspathDep[sbt.ProjectReference]): _*)
-
- lazy val tools = Project("tools", file("tools"), settings = toolsSettings) dependsOn(core) dependsOn(streaming)
+import sbt._
+import sbt.Keys._
+import org.scalastyle.sbt.ScalastylePlugin.{Settings => ScalaStyleSettings}
+import com.typesafe.sbt.pom.{PomBuild, SbtPomKeys}
+import net.virtualvoid.sbt.graph.Plugin.graphSettings
- lazy val bagel = Project("bagel", file("bagel"), settings = bagelSettings) dependsOn(core)
+object BuildCommons {
- lazy val graphx = Project("graphx", file("graphx"), settings = graphxSettings) dependsOn(core)
+ private val buildLocation = file(".").getAbsoluteFile.getParentFile
- lazy val catalyst = Project("catalyst", file("sql/catalyst"), settings = catalystSettings) dependsOn(core)
+ val allProjects@Seq(bagel, catalyst, core, graphx, hive, mllib, repl, spark, sql, streaming,
+ streamingFlume, streamingKafka, streamingMqtt, streamingTwitter, streamingZeromq) =
+ Seq("bagel", "catalyst", "core", "graphx", "hive", "mllib", "repl", "spark", "sql",
+ "streaming", "streaming-flume", "streaming-kafka", "streaming-mqtt", "streaming-twitter",
+ "streaming-zeromq").map(ProjectRef(buildLocation, _))
- lazy val sql = Project("sql", file("sql/core"), settings = sqlCoreSettings) dependsOn(core) dependsOn(catalyst % "compile->compile;test->test")
+ val optionallyEnabledProjects@Seq(yarn, yarnStable, yarnAlpha, java8Tests, sparkGangliaLgpl) =
+ Seq("yarn", "yarn-stable", "yarn-alpha", "java8-tests", "ganglia-lgpl")
+ .map(ProjectRef(buildLocation, _))
- lazy val hive = Project("hive", file("sql/hive"), settings = hiveSettings) dependsOn(sql)
+ val assemblyProjects@Seq(assembly, examples) = Seq("assembly", "examples")
+ .map(ProjectRef(buildLocation, _))
- lazy val maybeHive: Seq[ClasspathDependency] = if (isHiveEnabled) Seq(hive) else Seq()
- lazy val maybeHiveRef: Seq[ProjectReference] = if (isHiveEnabled) Seq(hive) else Seq()
+ val tools = "tools"
- lazy val streaming = Project("streaming", file("streaming"), settings = streamingSettings) dependsOn(core)
+ val sparkHome = buildLocation
+}
- lazy val mllib = Project("mllib", file("mllib"), settings = mllibSettings) dependsOn(core)
+object SparkBuild extends PomBuild {
- lazy val assemblyProj = Project("assembly", file("assembly"), settings = assemblyProjSettings)
- .dependsOn(core, graphx, bagel, mllib, streaming, repl, sql) dependsOn(maybeYarn: _*) dependsOn(maybeHive: _*) dependsOn(maybeGanglia: _*)
+ import BuildCommons._
+ import scala.collection.mutable.Map
- lazy val assembleDepsTask = TaskKey[Unit]("assemble-deps")
- lazy val assembleDeps = assembleDepsTask := {
- println()
- println("**** NOTE ****")
- println("'sbt/sbt assemble-deps' is no longer supported.")
- println("Instead create a normal assembly and:")
- println(" export SPARK_PREPEND_CLASSES=1 (toggle on)")
- println(" unset SPARK_PREPEND_CLASSES (toggle off)")
- println()
- }
+ val projectsMap: Map[String, Seq[Setting[_]]] = Map.empty
- // A configuration to set an alternative publishLocalConfiguration
- lazy val MavenCompile = config("m2r") extend(Compile)
- lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy")
- val sparkHome = System.getProperty("user.dir")
-
- // Allows build configuration to be set through environment variables
- lazy val hadoopVersion = Properties.envOrElse("SPARK_HADOOP_VERSION", DEFAULT_HADOOP_VERSION)
- lazy val isNewHadoop = Properties.envOrNone("SPARK_IS_NEW_HADOOP") match {
- case None => {
- val isNewHadoopVersion = "^2\\.[2-9]+".r.findFirstIn(hadoopVersion).isDefined
- (isNewHadoopVersion|| DEFAULT_IS_NEW_HADOOP)
+ // Provides compatibility for older versions of the Spark build
+ def backwardCompatibility = {
+ import scala.collection.mutable
+ var isAlphaYarn = false
+ var profiles: mutable.Seq[String] = mutable.Seq.empty
+ if (Properties.envOrNone("SPARK_GANGLIA_LGPL").isDefined) {
+ println("NOTE: SPARK_GANGLIA_LGPL is deprecated, please use -Pganglia-lgpl flag.")
+ profiles ++= Seq("spark-ganglia-lgpl")
}
- case Some(v) => v.toBoolean
- }
-
- lazy val isYarnEnabled = Properties.envOrNone("SPARK_YARN") match {
- case None => DEFAULT_YARN
- case Some(v) => v.toBoolean
+ if (Properties.envOrNone("SPARK_HIVE").isDefined) {
+ println("NOTE: SPARK_HIVE is deprecated, please use -Phive flag.")
+ profiles ++= Seq("hive")
+ }
+ Properties.envOrNone("SPARK_HADOOP_VERSION") match {
+ case Some(v) =>
+ if (v.matches("0.23.*")) isAlphaYarn = true
+ println("NOTE: SPARK_HADOOP_VERSION is deprecated, please use -Dhadoop.version=" + v)
+ System.setProperty("hadoop.version", v)
+ case None =>
+ }
+ if (Properties.envOrNone("SPARK_YARN").isDefined) {
+ if(isAlphaYarn) {
+ println("NOTE: SPARK_YARN is deprecated, please use -Pyarn-alpha flag.")
+ profiles ++= Seq("yarn-alpha")
+ }
+ else {
+ println("NOTE: SPARK_YARN is deprecated, please use -Pyarn flag.")
+ profiles ++= Seq("yarn")
+ }
+ }
+ profiles
}
- lazy val hadoopClient = if (hadoopVersion.startsWith("0.20.") || hadoopVersion == "1.0.0") "hadoop-core" else "hadoop-client"
- val maybeAvro = if (hadoopVersion.startsWith("0.23.")) Seq("org.apache.avro" % "avro" % "1.7.4") else Seq()
- lazy val isHiveEnabled = Properties.envOrNone("SPARK_HIVE") match {
- case None => DEFAULT_HIVE
- case Some(v) => v.toBoolean
+ override val profiles = Properties.envOrNone("MAVEN_PROFILES") match {
+ case None => backwardCompatibility
+ // Rationale: If -P option exists no need to support backwardCompatibility.
+ case Some(v) =>
+ if (backwardCompatibility.nonEmpty)
+ println("Note: We ignore environment variables, when use of profile is detected in " +
+ "conjunction with environment variable.")
+ v.split("(\\s+|,)").filterNot(_.isEmpty).map(_.trim.replaceAll("-P", "")).toSeq
}
- // Include Ganglia integration if the user has enabled Ganglia
- // This is isolated from the normal build due to LGPL-licensed code in the library
- lazy val isGangliaEnabled = Properties.envOrNone("SPARK_GANGLIA_LGPL").isDefined
- lazy val gangliaProj = Project("spark-ganglia-lgpl", file("extras/spark-ganglia-lgpl"), settings = gangliaSettings).dependsOn(core)
- val maybeGanglia: Seq[ClasspathDependency] = if (isGangliaEnabled) Seq(gangliaProj) else Seq()
- val maybeGangliaRef: Seq[ProjectReference] = if (isGangliaEnabled) Seq(gangliaProj) else Seq()
+ override val userPropertiesMap = System.getProperties.toMap
- // Include the Java 8 project if the JVM version is 8+
- lazy val javaVersion = System.getProperty("java.specification.version")
- lazy val isJava8Enabled = javaVersion.toDouble >= "1.8".toDouble
- val maybeJava8Tests = if (isJava8Enabled) Seq[ProjectReference](java8Tests) else Seq[ProjectReference]()
- lazy val java8Tests = Project("java8-tests", file("extras/java8-tests"), settings = java8TestsSettings).
- dependsOn(core) dependsOn(streaming % "compile->compile;test->test")
-
- // Include the YARN project if the user has enabled YARN
- lazy val yarnAlpha = Project("yarn-alpha", file("yarn/alpha"), settings = yarnAlphaSettings) dependsOn(core)
- lazy val yarn = Project("yarn", file("yarn/stable"), settings = yarnSettings) dependsOn(core)
-
- lazy val maybeYarn: Seq[ClasspathDependency] = if (isYarnEnabled) Seq(if (isNewHadoop) yarn else yarnAlpha) else Seq()
- lazy val maybeYarnRef: Seq[ProjectReference] = if (isYarnEnabled) Seq(if (isNewHadoop) yarn else yarnAlpha) else Seq()
-
- lazy val externalTwitter = Project("external-twitter", file("external/twitter"), settings = twitterSettings)
- .dependsOn(streaming % "compile->compile;test->test")
-
- lazy val externalKafka = Project("external-kafka", file("external/kafka"), settings = kafkaSettings)
- .dependsOn(streaming % "compile->compile;test->test")
-
- lazy val externalFlume = Project("external-flume", file("external/flume"), settings = flumeSettings)
- .dependsOn(streaming % "compile->compile;test->test")
-
- lazy val externalZeromq = Project("external-zeromq", file("external/zeromq"), settings = zeromqSettings)
- .dependsOn(streaming % "compile->compile;test->test")
-
- lazy val externalMqtt = Project("external-mqtt", file("external/mqtt"), settings = mqttSettings)
- .dependsOn(streaming % "compile->compile;test->test")
-
- lazy val allExternal = Seq[ClasspathDependency](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
- lazy val allExternalRefs = Seq[ProjectReference](externalTwitter, externalKafka, externalFlume, externalZeromq, externalMqtt)
-
- lazy val examples = Project("examples", file("examples"), settings = examplesSettings)
- .dependsOn(core, mllib, graphx, bagel, streaming, hive) dependsOn(allExternal: _*)
-
- // Everything except assembly, hive, tools, java8Tests and examples belong to packageProjects
- lazy val packageProjects = Seq[ProjectReference](core, repl, bagel, streaming, mllib, graphx, catalyst, sql) ++ maybeYarnRef ++ maybeHiveRef ++ maybeGangliaRef
-
- lazy val allProjects = packageProjects ++ allExternalRefs ++
- Seq[ProjectReference](examples, tools, assemblyProj) ++ maybeJava8Tests
-
- def sharedSettings = Defaults.defaultSettings ++ MimaBuild.mimaSettings(file(sparkHome)) ++ Seq(
- organization := "org.apache.spark",
- version := SPARK_VERSION,
- scalaVersion := "2.10.4",
- scalacOptions := Seq("-Xmax-classfile-name", "120", "-unchecked", "-deprecation", "-feature",
- "-target:" + SCALAC_JVM_VERSION),
- javacOptions := Seq("-target", JAVAC_JVM_VERSION, "-source", JAVAC_JVM_VERSION),
- unmanagedJars in Compile <<= baseDirectory map { base => (base / "lib" ** "*.jar").classpath },
+ lazy val sharedSettings = graphSettings ++ ScalaStyleSettings ++ Seq (
+ javaHome := Properties.envOrNone("JAVA_HOME").map(file),
+ incOptions := incOptions.value.withNameHashing(true),
retrieveManaged := true,
- javaHome := Properties.envOrNone("JAVA_HOME").map(file),
- // This is to add convenience of enabling sbt -Dsbt.offline=true for making the build offline.
- offline := "true".equalsIgnoreCase(sys.props("sbt.offline")),
retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
- transitiveClassifiers in Scope.GlobalScope := Seq("sources"),
- testListeners <<= target.map(t => Seq(new eu.henkelmann.sbt.JUnitXmlTestsListener(t.getAbsolutePath))),
- incOptions := incOptions.value.withNameHashing(true),
- // Fork new JVMs for tests and set Java options for those
- fork := true,
- javaOptions in Test += "-Dspark.home=" + sparkHome,
- javaOptions in Test += "-Dspark.testing=1",
- javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
- javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark").map { case (k,v) => s"-D$k=$v" }.toSeq,
- javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g".split(" ").toSeq,
- javaOptions += "-Xmx3g",
- // Show full stack trace and duration in test cases.
- testOptions in Test += Tests.Argument("-oDF"),
- // Remove certain packages from Scaladoc
- scalacOptions in (Compile, doc) := Seq(
- "-groups",
- "-skip-packages", Seq(
- "akka",
- "org.apache.spark.api.python",
- "org.apache.spark.network",
- "org.apache.spark.deploy",
- "org.apache.spark.util.collection"
- ).mkString(":"),
- "-doc-title", "Spark " + SPARK_VERSION_SHORT + " ScalaDoc"
- ),
-
- // Only allow one test at a time, even across projects, since they run in the same JVM
- concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
-
- resolvers ++= Seq(
- // HTTPS is unavailable for Maven Central
- "Maven Repository" at "http://repo.maven.apache.org/maven2",
- "Apache Repository" at "https://repository.apache.org/content/repositories/releases",
- "JBoss Repository" at "https://repository.jboss.org/nexus/content/repositories/releases/",
- "MQTT Repository" at "https://repo.eclipse.org/content/repositories/paho-releases/",
- "Cloudera Repository" at "http://repository.cloudera.com/artifactory/cloudera-repos/",
- "Pivotal Repository" at "http://repo.spring.io/libs-release/",
- // For Sonatype publishing
- // "sonatype-snapshots" at "https://oss.sonatype.org/content/repositories/snapshots",
- // "sonatype-staging" at "https://oss.sonatype.org/service/local/staging/deploy/maven2/",
- // also check the local Maven repository ~/.m2
- Resolver.mavenLocal
- ),
-
- publishMavenStyle := true,
-
- // useGpg in Global := true,
-
- pomExtra := (
-
- org.apache
- apache
- 14
-
- http://spark.apache.org/
-
-
- Apache 2.0 License
- http://www.apache.org/licenses/LICENSE-2.0.html
- repo
-
-
-
- scm:git:git@github.com:apache/spark.git
- scm:git:git@github.com:apache/spark.git
-
-
-
- matei
- Matei Zaharia
- matei.zaharia@gmail.com
- http://www.cs.berkeley.edu/~matei
- Apache Software Foundation
- http://spark.apache.org
-
-
-
- JIRA
- https://issues.apache.org/jira/browse/SPARK
-
- ),
-
- /*
- publishTo <<= version { (v: String) =>
- val nexus = "https://oss.sonatype.org/"
- if (v.trim.endsWith("SNAPSHOT"))
- Some("sonatype-snapshots" at nexus + "content/repositories/snapshots")
- else
- Some("sonatype-staging" at nexus + "service/local/staging/deploy/maven2")
- },
+ publishMavenStyle := true
+ )
- */
-
- libraryDependencies ++= Seq(
- "io.netty" % "netty-all" % "4.0.17.Final",
- "org.eclipse.jetty" % "jetty-server" % jettyVersion,
- "org.eclipse.jetty" % "jetty-util" % jettyVersion,
- "org.eclipse.jetty" % "jetty-plus" % jettyVersion,
- "org.eclipse.jetty" % "jetty-security" % jettyVersion,
- "org.scalatest" %% "scalatest" % "2.1.5" % "test",
- "org.scalacheck" %% "scalacheck" % "1.11.3" % "test",
- "com.novocode" % "junit-interface" % "0.10" % "test",
- "org.easymock" % "easymockclassextension" % "3.1" % "test",
- "org.mockito" % "mockito-all" % "1.9.0" % "test",
- "junit" % "junit" % "4.10" % "test",
- // Needed by cglib which is needed by easymock.
- "asm" % "asm" % "3.3.1" % "test"
- ),
+ /** Following project only exists to pull previous artifacts of Spark for generating
+ Mima ignores. For more information see: SPARK 2071 */
+ lazy val oldDeps = Project("oldDeps", file("dev"), settings = oldDepsSettings)
- testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
- parallelExecution := true,
- /* Workaround for issue #206 (fixed after SBT 0.11.0) */
- watchTransitiveSources <<= Defaults.inDependencies[Task[Seq[File]]](watchSources.task,
- const(std.TaskExtra.constant(Nil)), aggregate = true, includeRoot = true) apply { _.join.map(_.flatten) },
-
- otherResolvers := Seq(Resolver.file("dotM2", file(Path.userHome + "/.m2/repository"))),
- publishLocalConfiguration in MavenCompile <<= (packagedArtifacts, deliverLocal, ivyLoggingLevel) map {
- (arts, _, level) => new PublishConfiguration(None, "dotM2", arts, Seq(), level)
- },
- publishMavenStyle in MavenCompile := true,
- publishLocal in MavenCompile <<= publishTask(publishLocalConfiguration in MavenCompile, deliverLocal),
- publishLocalBoth <<= Seq(publishLocal in MavenCompile, publishLocal).dependOn
- ) ++ net.virtualvoid.sbt.graph.Plugin.graphSettings ++ ScalaStyleSettings ++ genjavadocSettings
-
- val akkaVersion = "2.2.3-shaded-protobuf"
- val chillVersion = "0.3.6"
- val codahaleMetricsVersion = "3.0.0"
- val jblasVersion = "1.2.3"
- val jets3tVersion = if ("^2\\.[3-9]+".r.findFirstIn(hadoopVersion).isDefined) "0.9.0" else "0.7.1"
- val jettyVersion = "8.1.14.v20131031"
- val hiveVersion = "0.12.0"
- val parquetVersion = "1.4.3"
- val slf4jVersion = "1.7.5"
-
- val excludeJBossNetty = ExclusionRule(organization = "org.jboss.netty")
- val excludeIONetty = ExclusionRule(organization = "io.netty")
- val excludeEclipseJetty = ExclusionRule(organization = "org.eclipse.jetty")
- val excludeAsm = ExclusionRule(organization = "org.ow2.asm")
- val excludeOldAsm = ExclusionRule(organization = "asm")
- val excludeCommonsLogging = ExclusionRule(organization = "commons-logging")
- val excludeSLF4J = ExclusionRule(organization = "org.slf4j")
- val excludeScalap = ExclusionRule(organization = "org.scala-lang", artifact = "scalap")
- val excludeHadoop = ExclusionRule(organization = "org.apache.hadoop")
- val excludeCurator = ExclusionRule(organization = "org.apache.curator")
- val excludePowermock = ExclusionRule(organization = "org.powermock")
- val excludeFastutil = ExclusionRule(organization = "it.unimi.dsi")
- val excludeJruby = ExclusionRule(organization = "org.jruby")
- val excludeThrift = ExclusionRule(organization = "org.apache.thrift")
- val excludeServletApi = ExclusionRule(organization = "javax.servlet", artifact = "servlet-api")
- val excludeJUnit = ExclusionRule(organization = "junit")
-
- def sparkPreviousArtifact(id: String, organization: String = "org.apache.spark",
- version: String = "1.0.0", crossVersion: String = "2.10"): Option[sbt.ModuleID] = {
- val fullId = if (crossVersion.isEmpty) id else id + "_" + crossVersion
- Some(organization % fullId % version) // the artifact to compare binary compatibility with
+ def versionArtifact(id: String): Option[sbt.ModuleID] = {
+ val fullId = id + "_2.10"
+ Some("org.apache.spark" % fullId % "1.0.0")
}
- def coreSettings = sharedSettings ++ Seq(
- name := "spark-core",
- libraryDependencies ++= Seq(
- "com.google.guava" % "guava" % "14.0.1",
- "org.apache.commons" % "commons-lang3" % "3.3.2",
- "org.apache.commons" % "commons-math3" % "3.3" % "test",
- "com.google.code.findbugs" % "jsr305" % "1.3.9",
- "log4j" % "log4j" % "1.2.17",
- "org.slf4j" % "slf4j-api" % slf4jVersion,
- "org.slf4j" % "slf4j-log4j12" % slf4jVersion,
- "org.slf4j" % "jul-to-slf4j" % slf4jVersion,
- "org.slf4j" % "jcl-over-slf4j" % slf4jVersion,
- "commons-daemon" % "commons-daemon" % "1.0.10", // workaround for bug HADOOP-9407
- "com.ning" % "compress-lzf" % "1.0.0",
- "org.xerial.snappy" % "snappy-java" % "1.0.5",
- "org.spark-project.akka" %% "akka-remote" % akkaVersion,
- "org.spark-project.akka" %% "akka-slf4j" % akkaVersion,
- "org.spark-project.akka" %% "akka-testkit" % akkaVersion % "test",
- "org.json4s" %% "json4s-jackson" % "3.2.6" excludeAll(excludeScalap),
- "colt" % "colt" % "1.2.0",
- "org.apache.mesos" % "mesos" % "0.18.1" classifier("shaded-protobuf") exclude("com.google.protobuf", "protobuf-java"),
- "commons-net" % "commons-net" % "2.2",
- "net.java.dev.jets3t" % "jets3t" % jets3tVersion excludeAll(excludeCommonsLogging),
- "commons-codec" % "commons-codec" % "1.5", // Prevent jets3t from including the older version of commons-codec
- "org.apache.derby" % "derby" % "10.4.2.0" % "test",
- "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJBossNetty, excludeAsm, excludeCommonsLogging, excludeSLF4J, excludeOldAsm, excludeServletApi),
- "org.apache.curator" % "curator-recipes" % "2.4.0" excludeAll(excludeJBossNetty),
- "com.codahale.metrics" % "metrics-core" % codahaleMetricsVersion,
- "com.codahale.metrics" % "metrics-jvm" % codahaleMetricsVersion,
- "com.codahale.metrics" % "metrics-json" % codahaleMetricsVersion,
- "com.codahale.metrics" % "metrics-graphite" % codahaleMetricsVersion,
- "com.twitter" %% "chill" % chillVersion excludeAll(excludeAsm),
- "com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm),
- "org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
- "com.clearspring.analytics" % "stream" % "2.7.0" excludeAll(excludeFastutil), // Only HyperLogLogPlus is used, which does not depend on fastutil.
- "org.spark-project" % "pyrolite" % "2.0.1",
- "net.sf.py4j" % "py4j" % "0.8.1"
- ),
- libraryDependencies ++= maybeAvro,
- assembleDeps,
- previousArtifact := sparkPreviousArtifact("spark-core")
+ def oldDepsSettings() = Defaults.defaultSettings ++ Seq(
+ name := "old-deps",
+ scalaVersion := "2.10.4",
+ retrieveManaged := true,
+ retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
+ libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq",
+ "spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter",
+ "spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx",
+ "spark-core").map(versionArtifact(_).get intransitive())
)
- // Create a colon-separate package list adding "org.apache.spark" in front of all of them,
- // for easier specification of JavaDoc package groups
- def packageList(names: String*): String = {
- names.map(s => "org.apache.spark." + s).mkString(":")
+ def enable(settings: Seq[Setting[_]])(projectRef: ProjectRef) = {
+ val existingSettings = projectsMap.getOrElse(projectRef.project, Seq[Setting[_]]())
+ projectsMap += (projectRef.project -> (existingSettings ++ settings))
}
- def rootSettings = sharedSettings ++ scalaJavaUnidocSettings ++ Seq(
- publish := {},
+ // Note ordering of these settings matter.
+ /* Enable shared settings on all projects */
+ (allProjects ++ optionallyEnabledProjects ++ assemblyProjects).foreach(enable(sharedSettings))
- unidocProjectFilter in (ScalaUnidoc, unidoc) :=
- inAnyProject -- inProjects(repl, examples, tools, catalyst, yarn, yarnAlpha),
- unidocProjectFilter in (JavaUnidoc, unidoc) :=
- inAnyProject -- inProjects(repl, examples, bagel, graphx, catalyst, tools, yarn, yarnAlpha),
+ /* Enable tests settings for all projects except examples, assembly and tools */
+ (allProjects ++ optionallyEnabledProjects).foreach(enable(TestSettings.settings))
- // Skip class names containing $ and some internal packages in Javadocs
- unidocAllSources in (JavaUnidoc, unidoc) := {
- (unidocAllSources in (JavaUnidoc, unidoc)).value
- .map(_.filterNot(_.getName.contains("$")))
- .map(_.filterNot(_.getCanonicalPath.contains("akka")))
- .map(_.filterNot(_.getCanonicalPath.contains("deploy")))
- .map(_.filterNot(_.getCanonicalPath.contains("network")))
- .map(_.filterNot(_.getCanonicalPath.contains("executor")))
- .map(_.filterNot(_.getCanonicalPath.contains("python")))
- .map(_.filterNot(_.getCanonicalPath.contains("collection")))
- },
+ /* Enable Mima for all projects except spark, hive, catalyst, sql and repl */
+ // TODO: Add Sql to mima checks
+ allProjects.filterNot(y => Seq(spark, sql, hive, catalyst, repl).exists(x => x == y)).
+ foreach (x => enable(MimaBuild.mimaSettings(sparkHome, x))(x))
- // Javadoc options: create a window title, and group key packages on index page
- javacOptions in doc := Seq(
- "-windowtitle", "Spark " + SPARK_VERSION_SHORT + " JavaDoc",
- "-public",
- "-group", "Core Java API", packageList("api.java", "api.java.function"),
- "-group", "Spark Streaming", packageList(
- "streaming.api.java", "streaming.flume", "streaming.kafka",
- "streaming.mqtt", "streaming.twitter", "streaming.zeromq"
- ),
- "-group", "MLlib", packageList(
- "mllib.classification", "mllib.clustering", "mllib.evaluation.binary", "mllib.linalg",
- "mllib.linalg.distributed", "mllib.optimization", "mllib.rdd", "mllib.recommendation",
- "mllib.regression", "mllib.stat", "mllib.tree", "mllib.tree.configuration",
- "mllib.tree.impurity", "mllib.tree.model", "mllib.util"
- ),
- "-group", "Spark SQL", packageList("sql.api.java", "sql.hive.api.java"),
- "-noqualifier", "java.lang"
- )
- )
+ /* Enable Assembly for all assembly projects */
+ assemblyProjects.foreach(enable(Assembly.settings))
- def replSettings = sharedSettings ++ Seq(
- name := "spark-repl",
- libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-compiler" % v),
- libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "jline" % v),
- libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v)
- )
+ /* Enable unidoc only for the root spark project */
+ enable(Unidoc.settings)(spark)
- def examplesSettings = sharedSettings ++ Seq(
- name := "spark-examples",
- jarName in assembly <<= version map {
- v => "spark-examples-" + v + "-hadoop" + hadoopVersion + ".jar" },
- libraryDependencies ++= Seq(
- "com.twitter" %% "algebird-core" % "0.1.11",
- "org.apache.hbase" % "hbase" % HBASE_VERSION excludeAll(excludeIONetty, excludeJBossNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging, excludeJruby),
- "org.apache.cassandra" % "cassandra-all" % "1.2.6"
- exclude("com.google.guava", "guava")
- exclude("com.googlecode.concurrentlinkedhashmap", "concurrentlinkedhashmap-lru")
- exclude("com.ning","compress-lzf")
- exclude("io.netty", "netty")
- exclude("jline","jline")
- exclude("org.apache.cassandra.deps", "avro")
- excludeAll(excludeSLF4J, excludeIONetty),
- "com.github.scopt" %% "scopt" % "3.2.0"
- )
- ) ++ assemblySettings ++ extraAssemblySettings
-
- def toolsSettings = sharedSettings ++ Seq(
- name := "spark-tools",
- libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-compiler" % v),
- libraryDependencies <+= scalaVersion(v => "org.scala-lang" % "scala-reflect" % v )
- ) ++ assemblySettings ++ extraAssemblySettings
-
- def graphxSettings = sharedSettings ++ Seq(
- name := "spark-graphx",
- previousArtifact := sparkPreviousArtifact("spark-graphx"),
- libraryDependencies ++= Seq(
- "org.jblas" % "jblas" % jblasVersion
- )
- )
+ /* Hive console settings */
+ enable(Hive.settings)(hive)
- def bagelSettings = sharedSettings ++ Seq(
- name := "spark-bagel",
- previousArtifact := sparkPreviousArtifact("spark-bagel")
- )
+ // TODO: move this to its upstream project.
+ override def projectDefinitions(baseDirectory: File): Seq[Project] = {
+ super.projectDefinitions(baseDirectory).map { x =>
+ if (projectsMap.exists(_._1 == x.id)) x.settings(projectsMap(x.id): _*)
+ else x.settings(Seq[Setting[_]](): _*)
+ } ++ Seq[Project](oldDeps)
+ }
- def mllibSettings = sharedSettings ++ Seq(
- name := "spark-mllib",
- previousArtifact := sparkPreviousArtifact("spark-mllib"),
- libraryDependencies ++= Seq(
- "org.jblas" % "jblas" % jblasVersion,
- "org.scalanlp" %% "breeze" % "0.7" excludeAll(excludeJUnit)
- )
- )
+}
- def catalystSettings = sharedSettings ++ Seq(
- name := "catalyst",
- // The mechanics of rewriting expression ids to compare trees in some test cases makes
- // assumptions about the the expression ids being contiguous. Running tests in parallel breaks
- // this non-deterministically. TODO: FIX THIS.
- parallelExecution in Test := false,
- libraryDependencies ++= Seq(
- "com.typesafe" %% "scalalogging-slf4j" % "1.0.1"
- )
- )
+object Hive {
- def sqlCoreSettings = sharedSettings ++ Seq(
- name := "spark-sql",
- libraryDependencies ++= Seq(
- "com.twitter" % "parquet-column" % parquetVersion,
- "com.twitter" % "parquet-hadoop" % parquetVersion,
- "com.fasterxml.jackson.core" % "jackson-databind" % "2.3.0" // json4s-jackson 3.2.6 requires jackson-databind 2.3.0.
- ),
- initialCommands in console :=
- """
- |import org.apache.spark.sql.catalyst.analysis._
- |import org.apache.spark.sql.catalyst.dsl._
- |import org.apache.spark.sql.catalyst.errors._
- |import org.apache.spark.sql.catalyst.expressions._
- |import org.apache.spark.sql.catalyst.plans.logical._
- |import org.apache.spark.sql.catalyst.rules._
- |import org.apache.spark.sql.catalyst.types._
- |import org.apache.spark.sql.catalyst.util._
- |import org.apache.spark.sql.execution
- |import org.apache.spark.sql.test.TestSQLContext._
- |import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin
- )
+ lazy val settings = Seq(
- // Since we don't include hive in the main assembly this project also acts as an alternative
- // assembly jar.
- def hiveSettings = sharedSettings ++ Seq(
- name := "spark-hive",
javaOptions += "-XX:MaxPermSize=1g",
- libraryDependencies ++= Seq(
- "org.spark-project.hive" % "hive-metastore" % hiveVersion,
- "org.spark-project.hive" % "hive-exec" % hiveVersion excludeAll(excludeCommonsLogging),
- "org.spark-project.hive" % "hive-serde" % hiveVersion
- ),
- // Multiple queries rely on the TestHive singleton. See comments there for more details.
+ // Multiple queries rely on the TestHive singleton. See comments there for more details.
parallelExecution in Test := false,
// Supporting all SerDes requires us to depend on deprecated APIs, so we turn off the warnings
// only for this subproject.
@@ -555,67 +190,16 @@ object SparkBuild extends Build {
|import org.apache.spark.sql.parquet.ParquetTestData""".stripMargin
)
- def streamingSettings = sharedSettings ++ Seq(
- name := "spark-streaming",
- previousArtifact := sparkPreviousArtifact("spark-streaming")
- )
-
- def yarnCommonSettings = sharedSettings ++ Seq(
- unmanagedSourceDirectories in Compile <++= baseDirectory { base =>
- Seq(
- base / "../common/src/main/scala"
- )
- },
-
- unmanagedSourceDirectories in Test <++= baseDirectory { base =>
- Seq(
- base / "../common/src/test/scala"
- )
- }
-
- ) ++ extraYarnSettings
-
- def yarnAlphaSettings = yarnCommonSettings ++ Seq(
- name := "spark-yarn-alpha"
- )
-
- def yarnSettings = yarnCommonSettings ++ Seq(
- name := "spark-yarn"
- )
-
- def gangliaSettings = sharedSettings ++ Seq(
- name := "spark-ganglia-lgpl",
- libraryDependencies += "com.codahale.metrics" % "metrics-ganglia" % "3.0.0"
- )
-
- def java8TestsSettings = sharedSettings ++ Seq(
- name := "java8-tests",
- javacOptions := Seq("-target", "1.8", "-source", "1.8"),
- testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a")
- )
-
- // Conditionally include the YARN dependencies because some tools look at all sub-projects and will complain
- // if we refer to nonexistent dependencies (e.g. hadoop-yarn-api from a Hadoop version without YARN).
- def extraYarnSettings = if(isYarnEnabled) yarnEnabledSettings else Seq()
-
- def yarnEnabledSettings = Seq(
- libraryDependencies ++= Seq(
- // Exclude rule required for all ?
- "org.apache.hadoop" % hadoopClient % hadoopVersion excludeAll(excludeJBossNetty, excludeAsm, excludeOldAsm),
- "org.apache.hadoop" % "hadoop-yarn-api" % hadoopVersion excludeAll(excludeJBossNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging),
- "org.apache.hadoop" % "hadoop-yarn-common" % hadoopVersion excludeAll(excludeJBossNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging),
- "org.apache.hadoop" % "hadoop-yarn-client" % hadoopVersion excludeAll(excludeJBossNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging),
- "org.apache.hadoop" % "hadoop-yarn-server-web-proxy" % hadoopVersion excludeAll(excludeJBossNetty, excludeAsm, excludeOldAsm, excludeCommonsLogging, excludeServletApi)
- )
- )
+}
- def assemblyProjSettings = sharedSettings ++ Seq(
- name := "spark-assembly",
- jarName in assembly <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" }
- ) ++ assemblySettings ++ extraAssemblySettings
+object Assembly {
+ import sbtassembly.Plugin._
+ import AssemblyKeys._
- def extraAssemblySettings() = Seq(
+ lazy val settings = assemblySettings ++ Seq(
test in assembly := {},
+ jarName in assembly <<= (version, moduleName) map { (v, mName) => mName + "-"+v + "-hadoop" +
+ Option(System.getProperty("hadoop.version")).getOrElse("1.0.4") + ".jar" },
mergeStrategy in assembly := {
case PathList("org", "datanucleus", xs @ _*) => MergeStrategy.discard
case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard
@@ -627,57 +211,95 @@ object SparkBuild extends Build {
}
)
- def oldDepsSettings() = Defaults.defaultSettings ++ Seq(
- name := "old-deps",
- scalaVersion := "2.10.4",
- retrieveManaged := true,
- retrievePattern := "[type]s/[artifact](-[revision])(-[classifier]).[ext]",
- libraryDependencies := Seq("spark-streaming-mqtt", "spark-streaming-zeromq",
- "spark-streaming-flume", "spark-streaming-kafka", "spark-streaming-twitter",
- "spark-streaming", "spark-mllib", "spark-bagel", "spark-graphx",
- "spark-core").map(sparkPreviousArtifact(_).get intransitive())
- )
+}
- def twitterSettings() = sharedSettings ++ Seq(
- name := "spark-streaming-twitter",
- previousArtifact := sparkPreviousArtifact("spark-streaming-twitter"),
- libraryDependencies ++= Seq(
- "org.twitter4j" % "twitter4j-stream" % "3.0.3"
- )
- )
+object Unidoc {
- def kafkaSettings() = sharedSettings ++ Seq(
- name := "spark-streaming-kafka",
- previousArtifact := sparkPreviousArtifact("spark-streaming-kafka"),
- libraryDependencies ++= Seq(
- "com.github.sgroschupf" % "zkclient" % "0.1",
- "org.apache.kafka" %% "kafka" % "0.8.0"
- exclude("com.sun.jdmk", "jmxtools")
- exclude("com.sun.jmx", "jmxri")
- exclude("net.sf.jopt-simple", "jopt-simple")
- excludeAll(excludeSLF4J)
- )
- )
+ import BuildCommons._
+ import sbtunidoc.Plugin._
+ import UnidocKeys._
+
+ // for easier specification of JavaDoc package groups
+ private def packageList(names: String*): String = {
+ names.map(s => "org.apache.spark." + s).mkString(":")
+ }
- def flumeSettings() = sharedSettings ++ Seq(
- name := "spark-streaming-flume",
- previousArtifact := sparkPreviousArtifact("spark-streaming-flume"),
- libraryDependencies ++= Seq(
- "org.apache.flume" % "flume-ng-sdk" % "1.4.0" % "compile" excludeAll(excludeIONetty, excludeThrift)
+ lazy val settings = scalaJavaUnidocSettings ++ Seq (
+ publish := {},
+
+ unidocProjectFilter in(ScalaUnidoc, unidoc) :=
+ inAnyProject -- inProjects(repl, examples, tools, catalyst, yarn, yarnAlpha),
+ unidocProjectFilter in(JavaUnidoc, unidoc) :=
+ inAnyProject -- inProjects(repl, bagel, graphx, examples, tools, catalyst, yarn, yarnAlpha),
+
+ // Skip class names containing $ and some internal packages in Javadocs
+ unidocAllSources in (JavaUnidoc, unidoc) := {
+ (unidocAllSources in (JavaUnidoc, unidoc)).value
+ .map(_.filterNot(_.getName.contains("$")))
+ .map(_.filterNot(_.getCanonicalPath.contains("akka")))
+ .map(_.filterNot(_.getCanonicalPath.contains("deploy")))
+ .map(_.filterNot(_.getCanonicalPath.contains("network")))
+ .map(_.filterNot(_.getCanonicalPath.contains("executor")))
+ .map(_.filterNot(_.getCanonicalPath.contains("python")))
+ .map(_.filterNot(_.getCanonicalPath.contains("collection")))
+ },
+
+ // Javadoc options: create a window title, and group key packages on index page
+ javacOptions in doc := Seq(
+ "-windowtitle", "Spark " + version.value.replaceAll("-SNAPSHOT", "") + " JavaDoc",
+ "-public",
+ "-group", "Core Java API", packageList("api.java", "api.java.function"),
+ "-group", "Spark Streaming", packageList(
+ "streaming.api.java", "streaming.flume", "streaming.kafka",
+ "streaming.mqtt", "streaming.twitter", "streaming.zeromq"
+ ),
+ "-group", "MLlib", packageList(
+ "mllib.classification", "mllib.clustering", "mllib.evaluation.binary", "mllib.linalg",
+ "mllib.linalg.distributed", "mllib.optimization", "mllib.rdd", "mllib.recommendation",
+ "mllib.regression", "mllib.stat", "mllib.tree", "mllib.tree.configuration",
+ "mllib.tree.impurity", "mllib.tree.model", "mllib.util"
+ ),
+ "-group", "Spark SQL", packageList("sql.api.java", "sql.hive.api.java"),
+ "-noqualifier", "java.lang"
)
)
+}
- def zeromqSettings() = sharedSettings ++ Seq(
- name := "spark-streaming-zeromq",
- previousArtifact := sparkPreviousArtifact("spark-streaming-zeromq"),
- libraryDependencies ++= Seq(
- "org.spark-project.akka" %% "akka-zeromq" % akkaVersion
+object TestSettings {
+ import BuildCommons._
+
+ lazy val settings = Seq (
+ // Fork new JVMs for tests and set Java options for those
+ fork := true,
+ javaOptions in Test += "-Dspark.home=" + sparkHome,
+ javaOptions in Test += "-Dspark.testing=1",
+ javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true",
+ javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark")
+ .map { case (k,v) => s"-D$k=$v" }.toSeq,
+ javaOptions in Test ++= "-Xmx3g -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g"
+ .split(" ").toSeq,
+ javaOptions += "-Xmx3g",
+
+ // Show full stack trace and duration in test cases.
+ testOptions in Test += Tests.Argument("-oDF"),
+ testOptions += Tests.Argument(TestFrameworks.JUnit, "-v", "-a"),
+ // Enable Junit testing.
+ libraryDependencies += "com.novocode" % "junit-interface" % "0.9" % "test",
+ // Only allow one test at a time, even across projects, since they run in the same JVM
+ parallelExecution in Test := false,
+ concurrentRestrictions in Global += Tags.limit(Tags.Test, 1),
+ // Remove certain packages from Scaladoc
+ scalacOptions in (Compile, doc) := Seq(
+ "-groups",
+ "-skip-packages", Seq(
+ "akka",
+ "org.apache.spark.api.python",
+ "org.apache.spark.network",
+ "org.apache.spark.deploy",
+ "org.apache.spark.util.collection"
+ ).mkString(":"),
+ "-doc-title", "Spark " + version.value.replaceAll("-SNAPSHOT", "") + " ScalaDoc"
)
)
- def mqttSettings() = streamingSettings ++ Seq(
- name := "spark-streaming-mqtt",
- previousArtifact := sparkPreviousArtifact("spark-streaming-mqtt"),
- libraryDependencies ++= Seq("org.eclipse.paho" % "mqtt-client" % "0.4.0")
- )
}
diff --git a/project/build.properties b/project/build.properties
index bcde13f4362a7..c12ef652adfcb 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -14,4 +14,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-sbt.version=0.13.2
+sbt.version=0.13.5
diff --git a/project/plugins.sbt b/project/plugins.sbt
index 472819b9fb8ba..d3ac4bf335e87 100644
--- a/project/plugins.sbt
+++ b/project/plugins.sbt
@@ -21,6 +21,6 @@ addSbtPlugin("org.scalastyle" %% "scalastyle-sbt-plugin" % "0.4.0")
addSbtPlugin("com.typesafe" % "sbt-mima-plugin" % "0.1.6")
-addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.0")
+addSbtPlugin("com.alpinenow" % "junit_xml_listener" % "0.5.1")
addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.0")
diff --git a/project/project/SparkPluginBuild.scala b/project/project/SparkPluginBuild.scala
index e9fba641eb8a1..3ef2d5451da0d 100644
--- a/project/project/SparkPluginBuild.scala
+++ b/project/project/SparkPluginBuild.scala
@@ -24,8 +24,10 @@ import sbt.Keys._
* becomes available for scalastyle sbt plugin.
*/
object SparkPluginDef extends Build {
- lazy val root = Project("plugins", file(".")) dependsOn(sparkStyle)
+ lazy val root = Project("plugins", file(".")) dependsOn(sparkStyle, sbtPomReader)
lazy val sparkStyle = Project("spark-style", file("spark-style"), settings = styleSettings)
+ lazy val sbtPomReader = uri("https://github.com/ScrapCodes/sbt-pom-reader.git")
+
// There is actually no need to publish this artifact.
def styleSettings = Defaults.defaultSettings ++ Seq (
name := "spark-style",
diff --git a/repl/pom.xml b/repl/pom.xml
index 4a66408ef3d2d..4ebb1b82f0e8c 100644
--- a/repl/pom.xml
+++ b/repl/pom.xml
@@ -32,6 +32,7 @@
http://spark.apache.org/
+ repl
/usr/share/spark
root
diff --git a/sbt/sbt b/sbt/sbt
index 9de265bd07dcb..1b1aa1483a829 100755
--- a/sbt/sbt
+++ b/sbt/sbt
@@ -72,6 +72,7 @@ Usage: $script_name [options]
-J-X pass option -X directly to the java runtime
(-J is stripped)
-S-X add -X to sbt's scalacOptions (-J is stripped)
+ -PmavenProfiles Enable a maven profile for the build.
In the case of duplicated or conflicting options, the order above
shows precedence: JAVA_OPTS lowest, command line options highest.
diff --git a/sbt/sbt-launch-lib.bash b/sbt/sbt-launch-lib.bash
index 64e40a88206be..857b62ffa229c 100755
--- a/sbt/sbt-launch-lib.bash
+++ b/sbt/sbt-launch-lib.bash
@@ -16,6 +16,7 @@ declare -a residual_args
declare -a java_args
declare -a scalac_args
declare -a sbt_commands
+declare -a maven_profiles
if test -x "$JAVA_HOME/bin/java"; then
echo -e "Using $JAVA_HOME as default JAVA_HOME."
@@ -87,6 +88,13 @@ addJava () {
dlog "[addJava] arg = '$1'"
java_args=( "${java_args[@]}" "$1" )
}
+
+enableProfile () {
+ dlog "[enableProfile] arg = '$1'"
+ maven_profiles=( "${maven_profiles[@]}" "$1" )
+ export MAVEN_PROFILES="${maven_profiles[@]}"
+}
+
addSbt () {
dlog "[addSbt] arg = '$1'"
sbt_commands=( "${sbt_commands[@]}" "$1" )
@@ -141,7 +149,8 @@ process_args () {
-java-home) require_arg path "$1" "$2" && java_cmd="$2/bin/java" && export JAVA_HOME=$2 && shift 2 ;;
-D*) addJava "$1" && shift ;;
- -J*) addJava "${1:2}" && shift ;;
+ -J*) addJava "${1:2}" && shift ;;
+ -P*) enableProfile "$1" && shift ;;
*) addResidual "$1" && shift ;;
esac
done
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 01d7b569080ea..6decde3fcd62d 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -31,6 +31,9 @@
jar
Spark Project Catalyst
http://spark.apache.org/
+
+ catalyst
+
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index 8210fd1f210d1..c309c43804d97 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -31,6 +31,9 @@
jar
Spark Project SQL
http://spark.apache.org/
+
+ sql
+
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 95ed0f28507fc..2b787e14f3f15 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -25,7 +25,9 @@ import scala.collection.JavaConverters._
* SQLConf holds mutable config parameters and hints. These can be set and
* queried either by passing SET commands into Spark SQL's DSL
* functions (sql(), hql(), etc.), or by programmatically using setters and
- * getters of this class. This class is thread-safe.
+ * getters of this class.
+ *
+ * SQLConf is thread-safe (internally synchronized so safe to be used in multiple threads).
*/
trait SQLConf {
@@ -50,7 +52,7 @@ trait SQLConf {
/** ********************** SQLConf functionality methods ************ */
@transient
- protected[sql] val settings = java.util.Collections.synchronizedMap(
+ private val settings = java.util.Collections.synchronizedMap(
new java.util.HashMap[String, String]())
def set(props: Properties): Unit = {
@@ -71,11 +73,9 @@ trait SQLConf {
Option(settings.get(key)).getOrElse(defaultValue)
}
- def getAll: Array[(String, String)] = settings.asScala.toArray
+ def getAll: Array[(String, String)] = settings.synchronized { settings.asScala.toArray }
- def getOption(key: String): Option[String] = {
- Option(settings.get(key))
- }
+ def getOption(key: String): Option[String] = Option(settings.get(key))
def contains(key: String): Boolean = settings.containsKey(key)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
index 054b14f8f7ffa..e17ecc87fd52a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala
@@ -36,36 +36,6 @@ class JoinSuite extends QueryTest {
assert(planned.size === 1)
}
- test("plans broadcast hash join, given hints") {
-
- def mkTest(buildSide: BuildSide, leftTable: String, rightTable: String) = {
- TestSQLContext.settings.synchronized {
- TestSQLContext.set("spark.sql.join.broadcastTables",
- s"${if (buildSide == BuildRight) rightTable else leftTable}")
- val rdd = sql( s"""SELECT * FROM $leftTable JOIN $rightTable ON key = a""")
- // Using `sparkPlan` because for relevant patterns in HashJoin to be
- // matched, other strategies need to be applied.
- val physical = rdd.queryExecution.sparkPlan
- val bhj = physical.collect { case j: BroadcastHashJoin if j.buildSide == buildSide => j}
-
- assert(bhj.size === 1, "planner does not pick up hint to generate broadcast hash join")
- checkAnswer(
- rdd,
- Seq(
- (1, "1", 1, 1),
- (1, "1", 1, 2),
- (2, "2", 2, 1),
- (2, "2", 2, 2),
- (3, "3", 3, 1),
- (3, "3", 3, 2)
- ))
- }
- }
-
- mkTest(BuildRight, "testData", "testData2")
- mkTest(BuildLeft, "testData", "testData2")
- }
-
test("multiple-key equi-join is hash-join") {
val x = testData2.as('x)
val y = testData2.as('y)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
index 93792f698cfaf..08293f7f0ca30 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLConfSuite.scala
@@ -28,50 +28,46 @@ class SQLConfSuite extends QueryTest {
val testVal = "test.val.0"
test("programmatic ways of basic setting and getting") {
- TestSQLContext.settings.synchronized {
- clear()
- assert(getOption(testKey).isEmpty)
- assert(getAll.toSet === Set())
+ clear()
+ assert(getOption(testKey).isEmpty)
+ assert(getAll.toSet === Set())
- set(testKey, testVal)
- assert(get(testKey) == testVal)
- assert(get(testKey, testVal + "_") == testVal)
- assert(getOption(testKey) == Some(testVal))
- assert(contains(testKey))
+ set(testKey, testVal)
+ assert(get(testKey) == testVal)
+ assert(get(testKey, testVal + "_") == testVal)
+ assert(getOption(testKey) == Some(testVal))
+ assert(contains(testKey))
- // Tests SQLConf as accessed from a SQLContext is mutable after
- // the latter is initialized, unlike SparkConf inside a SparkContext.
- assert(TestSQLContext.get(testKey) == testVal)
- assert(TestSQLContext.get(testKey, testVal + "_") == testVal)
- assert(TestSQLContext.getOption(testKey) == Some(testVal))
- assert(TestSQLContext.contains(testKey))
+ // Tests SQLConf as accessed from a SQLContext is mutable after
+ // the latter is initialized, unlike SparkConf inside a SparkContext.
+ assert(TestSQLContext.get(testKey) == testVal)
+ assert(TestSQLContext.get(testKey, testVal + "_") == testVal)
+ assert(TestSQLContext.getOption(testKey) == Some(testVal))
+ assert(TestSQLContext.contains(testKey))
- clear()
- }
+ clear()
}
test("parse SQL set commands") {
- TestSQLContext.settings.synchronized {
- clear()
- sql(s"set $testKey=$testVal")
- assert(get(testKey, testVal + "_") == testVal)
- assert(TestSQLContext.get(testKey, testVal + "_") == testVal)
+ clear()
+ sql(s"set $testKey=$testVal")
+ assert(get(testKey, testVal + "_") == testVal)
+ assert(TestSQLContext.get(testKey, testVal + "_") == testVal)
- sql("set mapred.reduce.tasks=20")
- assert(get("mapred.reduce.tasks", "0") == "20")
- sql("set mapred.reduce.tasks = 40")
- assert(get("mapred.reduce.tasks", "0") == "40")
+ sql("set mapred.reduce.tasks=20")
+ assert(get("mapred.reduce.tasks", "0") == "20")
+ sql("set mapred.reduce.tasks = 40")
+ assert(get("mapred.reduce.tasks", "0") == "40")
- val key = "spark.sql.key"
- val vs = "val0,val_1,val2.3,my_table"
- sql(s"set $key=$vs")
- assert(get(key, "0") == vs)
+ val key = "spark.sql.key"
+ val vs = "val0,val_1,val2.3,my_table"
+ sql(s"set $key=$vs")
+ assert(get(key, "0") == vs)
- sql(s"set $key=")
- assert(get(key, "0") == "")
+ sql(s"set $key=")
+ assert(get(key, "0") == "")
- clear()
- }
+ clear()
}
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index fa1f32f8a49a9..0743cfe8cff0f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -397,40 +397,38 @@ class SQLQuerySuite extends QueryTest {
}
test("SET commands semantics using sql()") {
- TestSQLContext.settings.synchronized {
- clear()
- val testKey = "test.key.0"
- val testVal = "test.val.0"
- val nonexistentKey = "nonexistent"
-
- // "set" itself returns all config variables currently specified in SQLConf.
- assert(sql("SET").collect().size == 0)
-
- // "set key=val"
- sql(s"SET $testKey=$testVal")
- checkAnswer(
- sql("SET"),
- Seq(Seq(testKey, testVal))
- )
-
- sql(s"SET ${testKey + testKey}=${testVal + testVal}")
- checkAnswer(
- sql("set"),
- Seq(
- Seq(testKey, testVal),
- Seq(testKey + testKey, testVal + testVal))
- )
-
- // "set key"
- checkAnswer(
- sql(s"SET $testKey"),
- Seq(Seq(testKey, testVal))
- )
- checkAnswer(
- sql(s"SET $nonexistentKey"),
- Seq(Seq(nonexistentKey, ""))
- )
- clear()
- }
+ clear()
+ val testKey = "test.key.0"
+ val testVal = "test.val.0"
+ val nonexistentKey = "nonexistent"
+
+ // "set" itself returns all config variables currently specified in SQLConf.
+ assert(sql("SET").collect().size == 0)
+
+ // "set key=val"
+ sql(s"SET $testKey=$testVal")
+ checkAnswer(
+ sql("SET"),
+ Seq(Seq(testKey, testVal))
+ )
+
+ sql(s"SET ${testKey + testKey}=${testVal + testVal}")
+ checkAnswer(
+ sql("set"),
+ Seq(
+ Seq(testKey, testVal),
+ Seq(testKey + testKey, testVal + testVal))
+ )
+
+ // "set key"
+ checkAnswer(
+ sql(s"SET $testKey"),
+ Seq(Seq(testKey, testVal))
+ )
+ checkAnswer(
+ sql(s"SET $nonexistentKey"),
+ Seq(Seq(nonexistentKey, ""))
+ )
+ clear()
}
}
diff --git a/sql/hive/pom.xml b/sql/hive/pom.xml
index 5ede76e5c3904..f30ae28b81e06 100644
--- a/sql/hive/pom.xml
+++ b/sql/hive/pom.xml
@@ -31,6 +31,9 @@
jar
Spark Project Hive
http://spark.apache.org/
+
+ hive
+
@@ -48,6 +51,11 @@
hive-metastore
${hive.version}
+
+ commons-httpclient
+ commons-httpclient
+ 3.1
+
org.spark-project.hive
hive-exec
diff --git a/streaming/pom.xml b/streaming/pom.xml
index f506d6ce34a6f..f60697ce745b7 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -27,6 +27,9 @@
org.apache.spark
spark-streaming_2.10
+
+ streaming
+
jar
Spark Project Streaming
http://spark.apache.org/
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index e878285f6a854..9eecbfaef363f 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -45,7 +45,7 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas
// Files with mod time earlier than this is ignored. This is updated every interval
// such that in the current interval, files older than any file found in the
// previous interval will be ignored. Obviously this time keeps moving forward.
- private var ignoreTime = if (newFilesOnly) 0L else System.currentTimeMillis()
+ private var ignoreTime = if (newFilesOnly) System.currentTimeMillis() else 0L
// Latest file mod time seen till any point of time
@transient private var path_ : Path = null
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
index 6376cff78b78a..ed7da6dc1315e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/QueueInputDStream.scala
@@ -41,7 +41,7 @@ class QueueInputDStream[T: ClassTag](
if (oneAtATime && queue.size > 0) {
buffer += queue.dequeue()
} else {
- buffer ++= queue
+ buffer ++= queue.dequeueAll(_ => true)
}
if (buffer.size > 0) {
if (oneAtATime) {
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
index cd0aa4d0dce70..cc4a65011dd72 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala
@@ -29,7 +29,7 @@ import java.nio.charset.Charset
import java.util.concurrent.{Executors, TimeUnit, ArrayBlockingQueue}
import java.util.concurrent.atomic.AtomicInteger
-import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
+import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer, SynchronizedQueue}
import com.google.common.io.Files
import org.scalatest.BeforeAndAfter
@@ -39,6 +39,7 @@ import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.util.Utils
import org.apache.spark.streaming.receiver.{ActorHelper, Receiver}
+import org.apache.spark.rdd.RDD
class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
@@ -234,6 +235,95 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
logInfo("--------------------------------")
assert(output.sum === numTotalRecords)
}
+
+ test("queue input stream - oneAtATime=true") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val queue = new SynchronizedQueue[RDD[String]]()
+ val queueStream = ssc.queueStream(queue, oneAtATime = true)
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ val outputStream = new TestOutputStream(queueStream, outputBuffer)
+ def output = outputBuffer.filter(_.size > 0)
+ outputStream.register()
+ ssc.start()
+
+ // Setup data queued into the stream
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = Seq("1", "2", "3", "4", "5")
+ val expectedOutput = input.map(Seq(_))
+ //Thread.sleep(1000)
+ val inputIterator = input.toIterator
+ for (i <- 0 until input.size) {
+ // Enqueue more than 1 item per tick but they should dequeue one at a time
+ inputIterator.take(2).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ clock.addToTime(batchDuration.milliseconds)
+ }
+ Thread.sleep(1000)
+ logInfo("Stopping context")
+ ssc.stop()
+
+ // Verify whether data received was as expected
+ logInfo("--------------------------------")
+ logInfo("output.size = " + outputBuffer.size)
+ logInfo("output")
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output.size = " + expectedOutput.size)
+ logInfo("expected output")
+ expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("--------------------------------")
+
+ // Verify whether all the elements received are as expected
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ assert(output(i) === expectedOutput(i))
+ }
+ }
+
+ test("queue input stream - oneAtATime=false") {
+ // Set up the streaming context and input streams
+ val ssc = new StreamingContext(conf, batchDuration)
+ val queue = new SynchronizedQueue[RDD[String]]()
+ val queueStream = ssc.queueStream(queue, oneAtATime = false)
+ val outputBuffer = new ArrayBuffer[Seq[String]] with SynchronizedBuffer[Seq[String]]
+ val outputStream = new TestOutputStream(queueStream, outputBuffer)
+ def output = outputBuffer.filter(_.size > 0)
+ outputStream.register()
+ ssc.start()
+
+ // Setup data queued into the stream
+ val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
+ val input = Seq("1", "2", "3", "4", "5")
+ val expectedOutput = Seq(Seq("1", "2", "3"), Seq("4", "5"))
+
+ // Enqueue the first 3 items (one by one), they should be merged in the next batch
+ val inputIterator = input.toIterator
+ inputIterator.take(3).foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ clock.addToTime(batchDuration.milliseconds)
+ Thread.sleep(1000)
+
+ // Enqueue the remaining items (again one by one), merged in the final batch
+ inputIterator.foreach(i => queue += ssc.sparkContext.makeRDD(Seq(i)))
+ clock.addToTime(batchDuration.milliseconds)
+ Thread.sleep(1000)
+ logInfo("Stopping context")
+ ssc.stop()
+
+ // Verify whether data received was as expected
+ logInfo("--------------------------------")
+ logInfo("output.size = " + outputBuffer.size)
+ logInfo("output")
+ outputBuffer.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("expected output.size = " + expectedOutput.size)
+ logInfo("expected output")
+ expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
+ logInfo("--------------------------------")
+
+ // Verify whether all the elements received are as expected
+ assert(output.size === expectedOutput.size)
+ for (i <- 0 until output.size) {
+ assert(output(i) === expectedOutput(i))
+ }
+ }
}
diff --git a/tools/pom.xml b/tools/pom.xml
index 79cd8551d0722..c0ee8faa7a615 100644
--- a/tools/pom.xml
+++ b/tools/pom.xml
@@ -26,6 +26,9 @@
org.apache.spark
spark-tools_2.10
+
+ tools
+
jar
Spark Project Tools
http://spark.apache.org/
diff --git a/yarn/alpha/pom.xml b/yarn/alpha/pom.xml
index b8a631dd0bb3b..5b13a1f002d6e 100644
--- a/yarn/alpha/pom.xml
+++ b/yarn/alpha/pom.xml
@@ -23,6 +23,9 @@
1.1.0-SNAPSHOT
../pom.xml
+
+ yarn-alpha
+
org.apache.spark
spark-yarn-alpha_2.10
diff --git a/yarn/pom.xml b/yarn/pom.xml
index ef7066ef1fdfc..efb473aa1b261 100644
--- a/yarn/pom.xml
+++ b/yarn/pom.xml
@@ -28,6 +28,9 @@
yarn-parent_2.10
pom
Spark Project YARN Parent POM
+
+ yarn
+
diff --git a/yarn/stable/pom.xml b/yarn/stable/pom.xml
index 0931beb505508..ceaf9f9d71001 100644
--- a/yarn/stable/pom.xml
+++ b/yarn/stable/pom.xml
@@ -23,6 +23,9 @@
1.1.0-SNAPSHOT
../pom.xml
+
+ yarn-stable
+
org.apache.spark
spark-yarn_2.10