Skip to content
This repository has been archived by the owner on Nov 30, 2019. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'upstream/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
bilna committed Dec 31, 2014
2 parents 4b58094 + 352ed6b commit fc8eb28
Show file tree
Hide file tree
Showing 75 changed files with 866 additions and 355 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializ
import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator}

import org.apache.spark._
import org.apache.spark.api.python.PythonBroadcast
import org.apache.spark.broadcast.HttpBroadcast
import org.apache.spark.network.nio.{PutBlock, GotBlock, GetBlock}
import org.apache.spark.scheduler.MapStatus
Expand Down Expand Up @@ -90,6 +91,7 @@ class KryoSerializer(conf: SparkConf)
// Allow sending SerializableWritable
kryo.register(classOf[SerializableWritable[_]], new KryoJavaSerializer())
kryo.register(classOf[HttpBroadcast[_]], new KryoJavaSerializer())
kryo.register(classOf[PythonBroadcast], new KryoJavaSerializer())

try {
// Use the default classloader when calling the user registrator.
Expand Down
5 changes: 3 additions & 2 deletions core/src/main/scala/org/apache/spark/ui/UIUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -234,8 +234,9 @@ private[spark] object UIUtils extends Logging {
<div class="span12">
<h3 style="vertical-align: middle; display: inline-block;">
<a style="text-decoration: none" href={prependBaseUri("/")}>
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")}
style="margin-right: 15px;" />
<img src={prependBaseUri("/static/spark-logo-77x50px-hd.png")} />
<span class="version"
style="margin-right: 15px;">{org.apache.spark.SPARK_VERSION}</span>
</a>
{title}
</h3>
Expand Down
21 changes: 5 additions & 16 deletions core/src/test/scala/org/apache/spark/DistributedSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark

import org.scalatest.BeforeAndAfter
import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.Matchers
Expand All @@ -29,16 +28,10 @@ class NotSerializableClass
class NotSerializableExn(val notSer: NotSerializableClass) extends Throwable() {}


class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
with LocalSparkContext {
class DistributedSuite extends FunSuite with Matchers with LocalSparkContext {

val clusterUrl = "local-cluster[2,1,512]"

after {
System.clearProperty("spark.reducer.maxMbInFlight")
System.clearProperty("spark.storage.memoryFraction")
}

test("task throws not serializable exception") {
// Ensures that executors do not crash when an exn is not serializable. If executors crash,
// this test will hang. Correct behavior is that executors don't crash but fail tasks
Expand Down Expand Up @@ -84,15 +77,14 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
}

test("groupByKey where map output sizes exceed maxMbInFlight") {
System.setProperty("spark.reducer.maxMbInFlight", "1")
sc = new SparkContext(clusterUrl, "test")
val conf = new SparkConf().set("spark.reducer.maxMbInFlight", "1")
sc = new SparkContext(clusterUrl, "test", conf)
// This data should be around 20 MB, so even with 4 mappers and 2 reducers, each map output
// file should be about 2.5 MB
val pairs = sc.parallelize(1 to 2000, 4).map(x => (x % 16, new Array[Byte](10000)))
val groups = pairs.groupByKey(2).map(x => (x._1, x._2.size)).collect()
assert(groups.length === 16)
assert(groups.map(_._2).sum === 2000)
// Note that spark.reducer.maxMbInFlight will be cleared in the test suite's after{} block
}

test("accumulators") {
Expand Down Expand Up @@ -210,28 +202,25 @@ class DistributedSuite extends FunSuite with Matchers with BeforeAndAfter
}

test("compute without caching when no partitions fit in memory") {
System.setProperty("spark.storage.memoryFraction", "0.0001")
sc = new SparkContext(clusterUrl, "test")
// data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
// to only 50 KB (0.0001 of 512 MB), so no partitions should fit in memory
val data = sc.parallelize(1 to 4000000, 2).persist(StorageLevel.MEMORY_ONLY_SER)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
System.clearProperty("spark.storage.memoryFraction")
}

test("compute when only some partitions fit in memory") {
System.setProperty("spark.storage.memoryFraction", "0.01")
sc = new SparkContext(clusterUrl, "test")
val conf = new SparkConf().set("spark.storage.memoryFraction", "0.01")
sc = new SparkContext(clusterUrl, "test", conf)
// data will be 4 million * 4 bytes = 16 MB in size, but our memoryFraction set the cache
// to only 5 MB (0.01 of 512 MB), so not all of it will fit in memory; we use 20 partitions
// to make sure that *some* of them do fit though
val data = sc.parallelize(1 to 4000000, 20).persist(StorageLevel.MEMORY_ONLY_SER)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
assert(data.count() === 4000000)
System.clearProperty("spark.storage.memoryFraction")
}

test("passing environment variables to cluster") {
Expand Down
16 changes: 8 additions & 8 deletions core/src/test/scala/org/apache/spark/FileServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,11 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
@transient var tmpFile: File = _
@transient var tmpJarUrl: String = _

def newConf: SparkConf = new SparkConf(loadDefaults = false).set("spark.authenticate", "false")

override def beforeEach() {
super.beforeEach()
resetSparkContext()
System.setProperty("spark.authenticate", "false")
}

override def beforeAll() {
Expand All @@ -52,7 +53,6 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
val jarFile = new File(testTempDir, "test.jar")
val jarStream = new FileOutputStream(jarFile)
val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
System.setProperty("spark.authenticate", "false")

val jarEntry = new JarEntry(textFile.getName)
jar.putNextEntry(jarEntry)
Expand All @@ -74,7 +74,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test("Distributing files locally") {
sc = new SparkContext("local[4]", "test")
sc = new SparkContext("local[4]", "test", newConf)
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
Expand Down Expand Up @@ -108,7 +108,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {

test("Distributing files locally using URL as input") {
// addFile("file:///....")
sc = new SparkContext("local[4]", "test")
sc = new SparkContext("local[4]", "test", newConf)
sc.addFile(new File(tmpFile.toString).toURI.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
Expand All @@ -122,7 +122,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test ("Dynamically adding JARS locally") {
sc = new SparkContext("local[4]", "test")
sc = new SparkContext("local[4]", "test", newConf)
sc.addJar(tmpJarUrl)
val testData = Array((1, 1))
sc.parallelize(testData).foreach { x =>
Expand All @@ -133,7 +133,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test("Distributing files on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc.addFile(tmpFile.toString)
val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0))
val result = sc.parallelize(testData).reduceByKey {
Expand All @@ -147,7 +147,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test ("Dynamically adding JARS on a standalone cluster") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc.addJar(tmpJarUrl)
val testData = Array((1,1))
sc.parallelize(testData).foreach { x =>
Expand All @@ -158,7 +158,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
}

test ("Dynamically adding JARS on a standalone cluster using local: URL") {
sc = new SparkContext("local-cluster[1,1,512]", "test")
sc = new SparkContext("local-cluster[1,1,512]", "test", newConf)
sc.addJar(tmpJarUrl.replace("file", "local"))
val testData = Array((1,1))
sc.parallelize(testData).foreach { x =>
Expand Down
21 changes: 10 additions & 11 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,43 +40,42 @@ class JobCancellationSuite extends FunSuite with Matchers with BeforeAndAfter
override def afterEach() {
super.afterEach()
resetSparkContext()
System.clearProperty("spark.scheduler.mode")
}

test("local mode, FIFO scheduler") {
System.setProperty("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local[2]", "test")
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local[2]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
}

test("local mode, fair scheduler") {
System.setProperty("spark.scheduler.mode", "FAIR")
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local[2]", "test")
conf.set("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local[2]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
}

test("cluster mode, FIFO scheduler") {
System.setProperty("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local-cluster[2,1,512]", "test")
val conf = new SparkConf().set("spark.scheduler.mode", "FIFO")
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
assert(sc.parallelize(1 to 10, 2).count === 10)
}

test("cluster mode, fair scheduler") {
System.setProperty("spark.scheduler.mode", "FAIR")
val conf = new SparkConf().set("spark.scheduler.mode", "FAIR")
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local-cluster[2,1,512]", "test")
conf.set("spark.scheduler.allocation.file", xmlPath)
sc = new SparkContext("local-cluster[2,1,512]", "test", conf)
testCount()
testTake()
// Make sure we can still launch tasks.
Expand Down
22 changes: 9 additions & 13 deletions core/src/test/scala/org/apache/spark/ShuffleSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,15 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex
conf.set("spark.test.noStageRetry", "true")

test("groupByKey without compression") {
try {
System.setProperty("spark.shuffle.compress", "false")
sc = new SparkContext("local", "test", conf)
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
val groups = pairs.groupByKey(4).collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
} finally {
System.setProperty("spark.shuffle.compress", "true")
}
val myConf = conf.clone().set("spark.shuffle.compress", "false")
sc = new SparkContext("local", "test", myConf)
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
val groups = pairs.groupByKey(4).collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
}

test("shuffle non-zero block size") {
Expand Down
51 changes: 19 additions & 32 deletions core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,20 @@ package org.apache.spark

import org.scalatest.FunSuite
import org.apache.spark.serializer.{KryoRegistrator, KryoSerializer}
import org.apache.spark.util.ResetSystemProperties
import com.esotericsoftware.kryo.Kryo

class SparkConfSuite extends FunSuite with LocalSparkContext {
class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemProperties {
test("loading from system properties") {
try {
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf()
assert(conf.get("spark.test.testProperty") === "2")
} finally {
System.clearProperty("spark.test.testProperty")
}
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf()
assert(conf.get("spark.test.testProperty") === "2")
}

test("initializing without loading defaults") {
try {
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf(false)
assert(!conf.contains("spark.test.testProperty"))
} finally {
System.clearProperty("spark.test.testProperty")
}
System.setProperty("spark.test.testProperty", "2")
val conf = new SparkConf(false)
assert(!conf.contains("spark.test.testProperty"))
}

test("named set methods") {
Expand Down Expand Up @@ -117,23 +110,17 @@ class SparkConfSuite extends FunSuite with LocalSparkContext {

test("nested property names") {
// This wasn't supported by some external conf parsing libraries
try {
System.setProperty("spark.test.a", "a")
System.setProperty("spark.test.a.b", "a.b")
System.setProperty("spark.test.a.b.c", "a.b.c")
val conf = new SparkConf()
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "a.b")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
conf.set("spark.test.a.b", "A.B")
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "A.B")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
} finally {
System.clearProperty("spark.test.a")
System.clearProperty("spark.test.a.b")
System.clearProperty("spark.test.a.b.c")
}
System.setProperty("spark.test.a", "a")
System.setProperty("spark.test.a.b", "a.b")
System.setProperty("spark.test.a.b.c", "a.b.c")
val conf = new SparkConf()
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "a.b")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
conf.set("spark.test.a.b", "A.B")
assert(conf.get("spark.test.a") === "a")
assert(conf.get("spark.test.a.b") === "A.B")
assert(conf.get("spark.test.a.b.c") === "a.b.c")
}

test("register kryo classes through registerKryoClasses") {
Expand Down
Loading

0 comments on commit fc8eb28

Please sign in to comment.