")
diff --git a/docs/bagel-programming-guide.md b/docs/bagel-programming-guide.md
index b280df0c8eeb8..7e55131754a3f 100644
--- a/docs/bagel-programming-guide.md
+++ b/docs/bagel-programming-guide.md
@@ -46,7 +46,7 @@ import org.apache.spark.bagel.Bagel._
Next, we load a sample graph from a text file as a distributed dataset and package it into `PRVertex` objects. We also cache the distributed dataset because Bagel will use it multiple times and we'd like to avoid recomputing it.
{% highlight scala %}
-val input = sc.textFile("data/pagerank_data.txt")
+val input = sc.textFile("data/mllib/pagerank_data.txt")
val numVerts = input.count()
diff --git a/docs/configuration.md b/docs/configuration.md
index b84104cc7e653..a70007c165442 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -336,13 +336,12 @@ Apart from these, the following properties are also available, and may be useful
spark.io.compression.codec |
- org.apache.spark.io. LZFCompressionCodec |
+ org.apache.spark.io. SnappyCompressionCodec |
The codec used to compress internal data such as RDD partitions and shuffle outputs.
- By default, Spark provides two codecs: org.apache.spark.io.LZFCompressionCodec
- and org.apache.spark.io.SnappyCompressionCodec . Of these two choices,
- Snappy offers faster compression and decompression, while LZF offers a better compression
- ratio.
+ By default, Spark provides three codecs: org.apache.spark.io.LZ4CompressionCodec ,
+ org.apache.spark.io.LZFCompressionCodec ,
+ and org.apache.spark.io.SnappyCompressionCodec .
|
@@ -350,7 +349,15 @@ Apart from these, the following properties are also available, and may be useful
32768 |
Block size (in bytes) used in Snappy compression, in the case when Snappy compression codec
- is used.
+ is used. Lowering this block size will also lower shuffle memory usage when Snappy is used.
+ |
+
+
+ spark.io.compression.lz4.block.size |
+ 32768 |
+
+ Block size (in bytes) used in LZ4 compression, in the case when LZ4 compression codec
+ is used. Lowering this block size will also lower shuffle memory usage when LZ4 is used.
|
@@ -412,7 +419,7 @@ Apart from these, the following properties are also available, and may be useful
spark.broadcast.factory |
- org.apache.spark.broadcast. HttpBroadcastFactory |
+ org.apache.spark.broadcast. TorrentBroadcastFactory |
Which broadcast implementation to use.
|
@@ -699,6 +706,25 @@ Apart from these, the following properties are also available, and may be useful
(in milliseconds)
+
+ spark.scheduler.minRegisteredExecutorsRatio |
+ 0 |
+
+ The minimum ratio of registered executors (registered executors / total expected executors)
+ to wait for before scheduling begins. Specified as a double between 0 and 1.
+ Regardless of whether the minimum ratio of executors has been reached,
+ the maximum amount of time it will wait before scheduling begins is controlled by config
+ spark.scheduler.maxRegisteredExecutorsWaitingTime
+ |
+
+
+ spark.scheduler.maxRegisteredExecutorsWaitingTime |
+ 30000 |
+
+ Maximum amount of time to wait for executors to register before scheduling begins
+ (in milliseconds).
+ |
+
#### Security
@@ -773,6 +799,15 @@ Apart from these, the following properties are also available, and may be useful
into blocks of data before storing them in Spark.
+
+ spark.streaming.receiver.maxRate |
+ infinite |
+
+ Maximum rate (per second) at which each receiver will push data into blocks. Effectively,
+ each stream will consume at most this number of records per second.
+ Setting this configuration to 0 or a negative number will put no limit on the rate.
+ |
+
spark.streaming.unpersist |
true |
diff --git a/docs/hadoop-third-party-distributions.md b/docs/hadoop-third-party-distributions.md
index 32403bc6957a2..ab1023b8f1842 100644
--- a/docs/hadoop-third-party-distributions.md
+++ b/docs/hadoop-third-party-distributions.md
@@ -48,9 +48,9 @@ the _exact_ Hadoop version you are running to avoid any compatibility errors.
-In SBT, the equivalent can be achieved by setting the SPARK_HADOOP_VERSION flag:
+In SBT, the equivalent can be achieved by setting the the `hadoop.version` property:
- SPARK_HADOOP_VERSION=1.0.4 sbt/sbt assembly
+ sbt/sbt -Dhadoop.version=1.0.4 assembly
# Linking Applications to the Hadoop Version
diff --git a/docs/mllib-basics.md b/docs/mllib-basics.md
index 5796e16e8f99c..f9585251fafac 100644
--- a/docs/mllib-basics.md
+++ b/docs/mllib-basics.md
@@ -193,7 +193,7 @@ import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.rdd.RDD
-val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "mllib/data/sample_libsvm_data.txt")
+val examples: RDD[LabeledPoint] = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
{% endhighlight %}
@@ -207,7 +207,7 @@ import org.apache.spark.mllib.util.MLUtils;
import org.apache.spark.api.java.JavaRDD;
JavaRDD examples =
- MLUtils.loadLibSVMFile(jsc.sc(), "mllib/data/sample_libsvm_data.txt").toJavaRDD();
+ MLUtils.loadLibSVMFile(jsc.sc(), "data/mllib/sample_libsvm_data.txt").toJavaRDD();
{% endhighlight %}
@@ -218,7 +218,7 @@ examples stored in LIBSVM format.
{% highlight python %}
from pyspark.mllib.util import MLUtils
-examples = MLUtils.loadLibSVMFile(sc, "mllib/data/sample_libsvm_data.txt")
+examples = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
{% endhighlight %}
diff --git a/docs/mllib-clustering.md b/docs/mllib-clustering.md
index 429cdf8d40cec..c76ac010d3f81 100644
--- a/docs/mllib-clustering.md
+++ b/docs/mllib-clustering.md
@@ -51,7 +51,7 @@ import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
-val data = sc.textFile("data/kmeans_data.txt")
+val data = sc.textFile("data/mllib/kmeans_data.txt")
val parsedData = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
// Cluster the data into two classes using KMeans
@@ -86,7 +86,7 @@ from numpy import array
from math import sqrt
# Load and parse the data
-data = sc.textFile("data/kmeans_data.txt")
+data = sc.textFile("data/mllib/kmeans_data.txt")
parsedData = data.map(lambda line: array([float(x) for x in line.split(' ')]))
# Build the model (cluster the data)
diff --git a/docs/mllib-collaborative-filtering.md b/docs/mllib-collaborative-filtering.md
index d51002f015670..5cd71738722a9 100644
--- a/docs/mllib-collaborative-filtering.md
+++ b/docs/mllib-collaborative-filtering.md
@@ -58,7 +58,7 @@ import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
// Load and parse the data
-val data = sc.textFile("mllib/data/als/test.data")
+val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
Rating(user.toInt, item.toInt, rate.toDouble)
})
@@ -112,7 +112,7 @@ from pyspark.mllib.recommendation import ALS
from numpy import array
# Load and parse the data
-data = sc.textFile("mllib/data/als/test.data")
+data = sc.textFile("data/mllib/als/test.data")
ratings = data.map(lambda line: array([float(x) for x in line.split(',')]))
# Build the recommendation model using Alternating Least Squares
diff --git a/docs/mllib-decision-tree.md b/docs/mllib-decision-tree.md
index 6109991121feb..9cbd880897578 100644
--- a/docs/mllib-decision-tree.md
+++ b/docs/mllib-decision-tree.md
@@ -124,7 +124,7 @@ import org.apache.spark.mllib.tree.configuration.Algo._
import org.apache.spark.mllib.tree.impurity.Gini
// Load and parse the data file
-val data = sc.textFile("mllib/data/sample_tree_data.csv")
+val data = sc.textFile("data/mllib/sample_tree_data.csv")
val parsedData = data.map { line =>
val parts = line.split(',').map(_.toDouble)
LabeledPoint(parts(0), Vectors.dense(parts.tail))
@@ -163,7 +163,7 @@ import org.apache.spark.mllib.tree.configuration.Algo._
import org.apache.spark.mllib.tree.impurity.Variance
// Load and parse the data file
-val data = sc.textFile("mllib/data/sample_tree_data.csv")
+val data = sc.textFile("data/mllib/sample_tree_data.csv")
val parsedData = data.map { line =>
val parts = line.split(',').map(_.toDouble)
LabeledPoint(parts(0), Vectors.dense(parts.tail))
diff --git a/docs/mllib-linear-methods.md b/docs/mllib-linear-methods.md
index 4dfbebbcd04b7..b4d22e0df5a85 100644
--- a/docs/mllib-linear-methods.md
+++ b/docs/mllib-linear-methods.md
@@ -187,7 +187,7 @@ import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
// Load training data in LIBSVM format.
-val data = MLUtils.loadLibSVMFile(sc, "mllib/data/sample_libsvm_data.txt")
+val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
// Split data into training (60%) and test (40%).
val splits = data.randomSplit(Array(0.6, 0.4), seed = 11L)
@@ -259,7 +259,7 @@ def parsePoint(line):
values = [float(x) for x in line.split(' ')]
return LabeledPoint(values[0], values[1:])
-data = sc.textFile("mllib/data/sample_svm_data.txt")
+data = sc.textFile("data/mllib/sample_svm_data.txt")
parsedData = data.map(parsePoint)
# Build the model
@@ -309,7 +309,7 @@ import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
-val data = sc.textFile("mllib/data/ridge-data/lpsa.data")
+val data = sc.textFile("data/mllib/ridge-data/lpsa.data")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
@@ -356,7 +356,7 @@ def parsePoint(line):
values = [float(x) for x in line.replace(',', ' ').split(' ')]
return LabeledPoint(values[0], values[1:])
-data = sc.textFile("mllib/data/ridge-data/lpsa.data")
+data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parsePoint)
# Build the model
diff --git a/docs/mllib-naive-bayes.md b/docs/mllib-naive-bayes.md
index 1d1d7dcf6ffcb..b1650c83c98b9 100644
--- a/docs/mllib-naive-bayes.md
+++ b/docs/mllib-naive-bayes.md
@@ -40,7 +40,7 @@ import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
-val data = sc.textFile("mllib/data/sample_naive_bayes_data.txt")
+val data = sc.textFile("data/mllib/sample_naive_bayes_data.txt")
val parsedData = data.map { line =>
val parts = line.split(',')
LabeledPoint(parts(0).toDouble, Vectors.dense(parts(1).split(' ').map(_.toDouble)))
diff --git a/docs/mllib-optimization.md b/docs/mllib-optimization.md
index ae9ede58e8e60..651958c7812f2 100644
--- a/docs/mllib-optimization.md
+++ b/docs/mllib-optimization.md
@@ -214,7 +214,7 @@ import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import org.apache.spark.mllib.classification.LogisticRegressionModel
-val data = MLUtils.loadLibSVMFile(sc, "mllib/data/sample_libsvm_data.txt")
+val data = MLUtils.loadLibSVMFile(sc, "data/mllib/sample_libsvm_data.txt")
val numFeatures = data.take(1)(0).features.size
// Split data into training (60%) and test (40%).
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 522c83884ef42..38728534a46e0 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -474,7 +474,7 @@ anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)
Spark SQL also supports reading and writing data stored in [Apache Hive](http://hive.apache.org/).
However, since Hive has a large number of dependencies, it is not included in the default Spark assembly.
-In order to use Hive you must first run '`SPARK_HIVE=true sbt/sbt assembly/assembly`' (or use `-Phive` for maven).
+In order to use Hive you must first run '`sbt/sbt -Phive assembly/assembly`' (or use `-Phive` for maven).
This command builds a new assembly jar that includes Hive. Note that this Hive assembly jar must also be present
on all of the worker nodes, as they will need access to the Hive serialization and deserialization libraries
(SerDes) in order to acccess data stored in Hive.
diff --git a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
index 4893b017ed819..822673347bdce 100644
--- a/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/HBaseTest.scala
@@ -31,12 +31,12 @@ object HBaseTest {
val conf = HBaseConfiguration.create()
// Other options for configuring scan behavior are available. More information available at
// http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/mapreduce/TableInputFormat.html
- conf.set(TableInputFormat.INPUT_TABLE, args(1))
+ conf.set(TableInputFormat.INPUT_TABLE, args(0))
// Initialize hBase table if necessary
val admin = new HBaseAdmin(conf)
- if(!admin.isTableAvailable(args(1))) {
- val tableDesc = new HTableDescriptor(args(1))
+ if (!admin.isTableAvailable(args(0))) {
+ val tableDesc = new HTableDescriptor(args(0))
admin.createTable(tableDesc)
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
index 331de3ad1ef53..ed2b38e2ca6f8 100644
--- a/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/HdfsTest.scala
@@ -19,16 +19,22 @@ package org.apache.spark.examples
import org.apache.spark._
+
object HdfsTest {
+
+ /** Usage: HdfsTest [file] */
def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: HdfsTest ")
+ System.exit(1)
+ }
val sparkConf = new SparkConf().setAppName("HdfsTest")
val sc = new SparkContext(sparkConf)
- val file = sc.textFile(args(1))
+ val file = sc.textFile(args(0))
val mapped = file.map(s => s.length).cache()
for (iter <- 1 to 10) {
val start = System.currentTimeMillis()
for (x <- mapped) { x + 2 }
- // println("Processing: " + x)
val end = System.currentTimeMillis()
println("Iteration " + iter + " took " + (end-start) + " ms")
}
diff --git a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
index 40b36c779afd6..4c7e006da0618 100644
--- a/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/SparkPageRank.scala
@@ -31,8 +31,12 @@ import org.apache.spark.{SparkConf, SparkContext}
*/
object SparkPageRank {
def main(args: Array[String]) {
+ if (args.length < 1) {
+ System.err.println("Usage: SparkPageRank ")
+ System.exit(1)
+ }
val sparkConf = new SparkConf().setAppName("PageRank")
- var iters = args(1).toInt
+ val iters = if (args.length > 0) args(1).toInt else 10
val ctx = new SparkContext(sparkConf)
val lines = ctx.textFile(args(0), 1)
val links = lines.map{ s =>
diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
index 07ae88febf916..56d2886b26878 100644
--- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
+++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala
@@ -153,15 +153,15 @@ class FlumeReceiver(
private def initServer() = {
if (enableDecompression) {
- val channelFactory = new NioServerSocketChannelFactory
- (Executors.newCachedThreadPool(), Executors.newCachedThreadPool());
- val channelPipelieFactory = new CompressionChannelPipelineFactory()
+ val channelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool())
+ val channelPipelineFactory = new CompressionChannelPipelineFactory()
new NettyServer(
responder,
new InetSocketAddress(host, port),
- channelFactory,
- channelPipelieFactory,
+ channelFactory,
+ channelPipelineFactory,
null)
} else {
new NettyServer(responder, new InetSocketAddress(host, port))
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
index 4db45c9af8fae..3507f358bfb40 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
@@ -107,14 +107,16 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
/**
* Repartitions the edges in the graph according to `partitionStrategy`.
*
- * @param the partitioning strategy to use when partitioning the edges in the graph.
+ * @param partitionStrategy the partitioning strategy to use when partitioning the edges
+ * in the graph.
*/
def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED]
/**
* Repartitions the edges in the graph according to `partitionStrategy`.
*
- * @param the partitioning strategy to use when partitioning the edges in the graph.
+ * @param partitionStrategy the partitioning strategy to use when partitioning the edges
+ * in the graph.
* @param numPartitions the number of edge partitions in the new graph.
*/
def partitionBy(partitionStrategy: PartitionStrategy, numPartitions: Int): Graph[VD, ED]
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
index f1b6df9a3025e..4825d12fc27b3 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
@@ -182,8 +182,8 @@ class VertexRDD[@specialized VD: ClassTag](
/**
* Left joins this RDD with another VertexRDD with the same index. This function will fail if
* both VertexRDDs do not share the same index. The resulting vertex set contains an entry for
- * each
- * vertex in `this`. If `other` is missing any vertex in this VertexRDD, `f` is passed `None`.
+ * each vertex in `this`.
+ * If `other` is missing any vertex in this VertexRDD, `f` is passed `None`.
*
* @tparam VD2 the attribute type of the other VertexRDD
* @tparam VD3 the attribute type of the resulting VertexRDD
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
index 3827ac8d0fd6a..502b112d31c2e 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/RoutingTablePartition.scala
@@ -119,7 +119,7 @@ object RoutingTablePartition {
*/
private[graphx]
class RoutingTablePartition(
- private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) {
+ private val routingTable: Array[(Array[VertexId], BitSet, BitSet)]) extends Serializable {
/** The maximum number of edge partitions this `RoutingTablePartition` is built to join with. */
val numEdgePartitions: Int = routingTable.size
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
index 34939b24440aa..5ad6390a56c4f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBase.scala
@@ -60,7 +60,8 @@ private[graphx] object VertexPartitionBase {
* `VertexPartitionBaseOpsConstructor` typeclass (for example,
* [[VertexPartition.VertexPartitionOpsConstructor]]).
*/
-private[graphx] abstract class VertexPartitionBase[@specialized(Long, Int, Double) VD: ClassTag] {
+private[graphx] abstract class VertexPartitionBase[@specialized(Long, Int, Double) VD: ClassTag]
+ extends Serializable {
def index: VertexIdToIndexMap
def values: Array[VD]
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
index a4f769b294010..b40aa1b417a0f 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/VertexPartitionBaseOps.scala
@@ -35,7 +35,7 @@ import org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap
private[graphx] abstract class VertexPartitionBaseOps
[VD: ClassTag, Self[X] <: VertexPartitionBase[X] : VertexPartitionBaseOpsConstructor]
(self: Self[VD])
- extends Logging {
+ extends Serializable with Logging {
def withIndex(index: VertexIdToIndexMap): Self[VD]
def withValues[VD2: ClassTag](values: Array[VD2]): Self[VD2]
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
index 28fd112f2b124..9d00f76327e4c 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/EdgePartitionSuite.scala
@@ -23,6 +23,7 @@ import scala.util.Random
import org.scalatest.FunSuite
import org.apache.spark.SparkConf
+import org.apache.spark.serializer.JavaSerializer
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.graphx._
@@ -124,18 +125,21 @@ class EdgePartitionSuite extends FunSuite {
assert(ep.numActives == Some(2))
}
- test("Kryo serialization") {
+ test("serialization") {
val aList = List((0, 1, 0), (1, 0, 0), (1, 2, 0), (5, 4, 0), (5, 5, 0))
val a: EdgePartition[Int, Int] = makeEdgePartition(aList)
- val conf = new SparkConf()
+ val javaSer = new JavaSerializer(new SparkConf())
+ val kryoSer = new KryoSerializer(new SparkConf()
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
- .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator")
- val s = new KryoSerializer(conf).newInstance()
- val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
- assert(aSer.srcIds.toList === a.srcIds.toList)
- assert(aSer.dstIds.toList === a.dstIds.toList)
- assert(aSer.data.toList === a.data.toList)
- assert(aSer.index != null)
- assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
+ .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator"))
+
+ for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) {
+ val aSer: EdgePartition[Int, Int] = s.deserialize(s.serialize(a))
+ assert(aSer.srcIds.toList === a.srcIds.toList)
+ assert(aSer.dstIds.toList === a.dstIds.toList)
+ assert(aSer.data.toList === a.data.toList)
+ assert(aSer.index != null)
+ assert(aSer.vertices.iterator.toSet === a.vertices.iterator.toSet)
+ }
}
}
diff --git a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
index 8bf1384d514c1..f9e771a900013 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/impl/VertexPartitionSuite.scala
@@ -17,9 +17,14 @@
package org.apache.spark.graphx.impl
-import org.apache.spark.graphx._
import org.scalatest.FunSuite
+import org.apache.spark.SparkConf
+import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.serializer.KryoSerializer
+
+import org.apache.spark.graphx._
+
class VertexPartitionSuite extends FunSuite {
test("isDefined, filter") {
@@ -116,4 +121,17 @@ class VertexPartitionSuite extends FunSuite {
assert(vp3.index.getPos(2) === -1)
}
+ test("serialization") {
+ val verts = Set((0L, 1), (1L, 1), (2L, 1))
+ val vp = VertexPartition(verts.iterator)
+ val javaSer = new JavaSerializer(new SparkConf())
+ val kryoSer = new KryoSerializer(new SparkConf()
+ .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+ .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator"))
+
+ for (ser <- List(javaSer, kryoSer); s = ser.newInstance()) {
+ val vpSer: VertexPartition[Int] = s.deserialize(s.serialize(vp))
+ assert(vpSer.iterator.toSet === verts)
+ }
+ }
}
diff --git a/make-distribution.sh b/make-distribution.sh
index 94b473bf91cd3..b5a90f0f3bfe9 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -23,21 +23,6 @@
# The distribution contains fat (assembly) jars that include the Scala library,
# so it is completely self contained.
# It does not contain source or *.class files.
-#
-# Optional Arguments
-# --tgz: Additionally creates spark-$VERSION-bin.tar.gz
-# --hadoop VERSION: Builds against specified version of Hadoop.
-# --with-yarn: Enables support for Hadoop YARN.
-# --with-hive: Enable support for reading Hive tables.
-# --name: A moniker for the release target. Defaults to the Hadoop verison.
-#
-# Recommended deploy/testing procedure (standalone mode):
-# 1) Rsync / deploy the dist/ dir to one host
-# 2) cd to deploy dir; ./sbin/start-master.sh
-# 3) Verify master is up by visiting web page, ie http://master-ip:8080. Note the spark:// URL.
-# 4) ./sbin/start-slave.sh 1 <>
-# 5) ./bin/spark-shell --master spark://my-master-ip:7077
-#
set -o pipefail
set -e
@@ -46,26 +31,35 @@ set -e
FWDIR="$(cd `dirname $0`; pwd)"
DISTDIR="$FWDIR/dist"
-# Initialize defaults
-SPARK_HADOOP_VERSION=1.0.4
-SPARK_YARN=false
-SPARK_HIVE=false
SPARK_TACHYON=false
MAKE_TGZ=false
NAME=none
+function exit_with_usage {
+ echo "make-distribution.sh - tool for making binary distributions of Spark"
+ echo ""
+ echo "usage:"
+ echo "./make-distribution.sh [--name] [--tgz] [--with-tachyon] "
+ echo "See Spark's \"Building with Maven\" doc for correct Maven options."
+ echo ""
+ exit 1
+}
+
# Parse arguments
while (( "$#" )); do
case $1 in
--hadoop)
- SPARK_HADOOP_VERSION="$2"
- shift
+ echo "Error: '--hadoop' is no longer supported:"
+ echo "Error: use Maven options -Phadoop.version and -Pyarn.version"
+ exit_with_usage
;;
--with-yarn)
- SPARK_YARN=true
+ echo "Error: '--with-yarn' is no longer supported, use Maven option -Pyarn"
+ exit_with_usage
;;
--with-hive)
- SPARK_HIVE=true
+ echo "Error: '--with-hive' is no longer supported, use Maven option -Pyarn"
+ exit_with_usage
;;
--skip-java-test)
SKIP_JAVA_TEST=true
@@ -80,6 +74,12 @@ while (( "$#" )); do
NAME="$2"
shift
;;
+ --help)
+ exit_with_usage
+ ;;
+ *)
+ break
+ ;;
esac
shift
done
@@ -143,14 +143,6 @@ else
echo "Making distribution for Spark $VERSION in $DISTDIR..."
fi
-echo "Hadoop version set to $SPARK_HADOOP_VERSION"
-echo "Release name set to $NAME"
-if [ "$SPARK_YARN" == "true" ]; then
- echo "YARN enabled"
-else
- echo "YARN disabled"
-fi
-
if [ "$SPARK_TACHYON" == "true" ]; then
echo "Tachyon Enabled"
else
@@ -162,33 +154,12 @@ cd $FWDIR
export MAVEN_OPTS="-Xmx2g -XX:MaxPermSize=512M -XX:ReservedCodeCacheSize=512m"
-BUILD_COMMAND="mvn clean package"
-
-# Use special profiles for hadoop versions 0.23.x, 2.2.x, 2.3.x, 2.4.x
-if [[ "$SPARK_HADOOP_VERSION" =~ ^0\.23\. ]]; then BUILD_COMMAND="$BUILD_COMMAND -Phadoop-0.23"; fi
-if [[ "$SPARK_HADOOP_VERSION" =~ ^2\.2\. ]]; then BUILD_COMMAND="$BUILD_COMMAND -Phadoop-2.2"; fi
-if [[ "$SPARK_HADOOP_VERSION" =~ ^2\.3\. ]]; then BUILD_COMMAND="$BUILD_COMMAND -Phadoop-2.3"; fi
-if [[ "$SPARK_HADOOP_VERSION" =~ ^2\.4\. ]]; then BUILD_COMMAND="$BUILD_COMMAND -Phadoop-2.4"; fi
-if [[ "$SPARK_HIVE" == "true" ]]; then BUILD_COMMAND="$BUILD_COMMAND -Phive"; fi
-if [[ "$SPARK_YARN" == "true" ]]; then
- # For hadoop versions 0.23.x to 2.1.x, use the yarn-alpha profile
- if [[ "$SPARK_HADOOP_VERSION" =~ ^0\.2[3-9]\. ]] ||
- [[ "$SPARK_HADOOP_VERSION" =~ ^0\.[3-9][0-9]\. ]] ||
- [[ "$SPARK_HADOOP_VERSION" =~ ^1\.[0-9]\. ]] ||
- [[ "$SPARK_HADOOP_VERSION" =~ ^2\.[0-1]\. ]]; then
- BUILD_COMMAND="$BUILD_COMMAND -Pyarn-alpha"
- # For hadoop versions 2.2+, use the yarn profile
- elif [[ "$SPARK_HADOOP_VERSION" =~ ^2.[2-9]. ]]; then
- BUILD_COMMAND="$BUILD_COMMAND -Pyarn"
- fi
- BUILD_COMMAND="$BUILD_COMMAND -Dyarn.version=$SPARK_HADOOP_VERSION"
-fi
-BUILD_COMMAND="$BUILD_COMMAND -Dhadoop.version=$SPARK_HADOOP_VERSION"
-BUILD_COMMAND="$BUILD_COMMAND -DskipTests"
+BUILD_COMMAND="mvn clean package -DskipTests $@"
# Actually build the jar
echo -e "\nBuilding with..."
echo -e "\$ $BUILD_COMMAND\n"
+
${BUILD_COMMAND}
# Make directories
diff --git a/mllib/pom.xml b/mllib/pom.xml
index 87afd7ecf2dd4..92b07e2357db1 100644
--- a/mllib/pom.xml
+++ b/mllib/pom.xml
@@ -78,6 +78,19 @@
test