Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge with apache #28

Merged
merged 3 commits into from
Jan 15, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion LICENSE
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ The text of each license is also included at licenses/LICENSE-[project].txt.
(New BSD license) Protocol Buffer Java API (org.spark-project.protobuf:protobuf-java:2.4.1-shaded - http://code.google.com/p/protobuf)
(The BSD License) Fortran to Java ARPACK (net.sourceforge.f2j:arpack_combined_all:0.1 - http://f2j.sourceforge.net)
(The BSD License) xmlenc Library (xmlenc:xmlenc:0.52 - http://xmlenc.sourceforge.net)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9 - http://py4j.sourceforge.net/)
(The New BSD License) Py4J (net.sf.py4j:py4j:0.9.1 - http://py4j.sourceforge.net/)
(Two-clause BSD-style license) JUnit-Interface (com.novocode:junit-interface:0.10 - http://github.com/szeiger/junit-interface/)
(BSD licence) sbt and sbt-launch-lib.bash
(BSD 3 Clause) d3.min.js (https://github.com/mbostock/d3/blob/master/LICENSE)
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -1173,7 +1173,7 @@ test_that("group by, agg functions", {

expect_equal(3, count(mean(gd)))
expect_equal(3, count(max(gd)))
expect_equal(30, collect(max(gd))[1, 2])
expect_equal(30, collect(max(gd))[2, 2])
expect_equal(1, collect(count(gd))[1, 2])

mockLines2 <- c("{\"name\":\"ID1\", \"value\": \"10\"}",
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9-src.zip:$PYTHONPATH"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.9.1-src.zip:$PYTHONPATH"

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"
Expand Down
2 changes: 1 addition & 1 deletion bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
)

set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9-src.zip;%PYTHONPATH%
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.9.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@
<dependency>
<groupId>net.sf.py4j</groupId>
<artifactId>py4j</artifactId>
<version>0.9</version>
<version>0.9.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ public void write(Iterator<Product2<K, V>> records) throws IOException {
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics).open();
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
Expand Down Expand Up @@ -185,16 +185,19 @@ private long[] writePartitionedFile(File outputFile) throws IOException {
boolean threwException = true;
try {
for (int i = 0; i < numPartitions; i++) {
final FileInputStream in = new FileInputStream(partitionWriters[i].fileSegment().file());
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!partitionWriters[i].fileSegment().file().delete()) {
logger.error("Unable to delete file for partition {}", i);
final File file = partitionWriters[i].fileSegment().file();
if (file.exists()) {
final FileInputStream in = new FileInputStream(file);
boolean copyThrewException = true;
try {
lengths[i] = Utils.copyStream(in, out, false, transferToEnabled);
copyThrewException = false;
} finally {
Closeables.close(in, copyThrewException);
}
if (!file.delete()) {
logger.error("Unable to delete file for partition {}", i);
}
}
}
threwException = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,10 @@ public UnsafeInMemorySorter(
* Free the memory used by pointer array.
*/
public void free() {
consumer.freeArray(array);
array = null;
if (consumer != null) {
consumer.freeArray(array);
array = null;
}
}

public void reset() {
Expand Down
5 changes: 0 additions & 5 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,6 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)

// If the task is running locally, do not persist the result
if (context.isRunningLocally) {
return computedValues
}

// Otherwise, cache the values and keep track of any updates in block statuses
val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]
val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ abstract class TaskContext extends Serializable {

/**
* Returns true if the task is running locally in the driver program.
* @return
* @return false
*/
@deprecated("Local execution was removed, so this always returns false", "2.0.0")
def isRunningLocally(): Boolean

/**
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/TaskContextImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ private[spark] class TaskContextImpl(
override val taskMemoryManager: TaskMemoryManager,
@transient private val metricsSystem: MetricsSystem,
internalAccumulators: Seq[Accumulator[Long]],
val runningLocally: Boolean = false,
val taskMetrics: TaskMetrics = TaskMetrics.empty)
extends TaskContext
with Logging {
Expand Down Expand Up @@ -85,7 +84,7 @@ private[spark] class TaskContextImpl(

override def isCompleted(): Boolean = completed

override def isRunningLocally(): Boolean = runningLocally
override def isRunningLocally(): Boolean = false

override def isInterrupted(): Boolean = interrupted

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ private[spark] object PythonUtils {
val pythonPath = new ArrayBuffer[String]
for (sparkHome <- sys.env.get("SPARK_HOME")) {
pythonPath += Seq(sparkHome, "python", "lib", "pyspark.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9-src.zip").mkString(File.separator)
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.9.1-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
pythonPath.mkString(File.pathSeparator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class CartesianRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1 : RDD[T],
var rdd2 : RDD[U])
extends RDD[Pair[T, U]](sc, Nil)
extends RDD[(T, U)](sc, Nil)
with Serializable {

val numPartitionsInRdd2 = rdd2.partitions.length
Expand Down
16 changes: 15 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ abstract class RDD[T: ClassTag](

/** Construct an RDD with just a one-to-one dependency on one parent */
def this(@transient oneParent: RDD[_]) =
this(oneParent.context , List(new OneToOneDependency(oneParent)))
this(oneParent.context, List(new OneToOneDependency(oneParent)))

private[spark] def conf = sc.conf
// =======================================================================
Expand Down Expand Up @@ -970,6 +970,13 @@ abstract class RDD[T: ClassTag](
* apply the fold to each element sequentially in some defined ordering. For functions
* that are not commutative, the result may differ from that of a fold applied to a
* non-distributed collection.
*
* @param zeroValue the initial value for the accumulated result of each partition for the `op`
* operator, and also the initial value for the combine results from different
* partitions for the `op` operator - this will typically be the neutral
* element (e.g. `Nil` for list concatenation or `0` for summation)
* @param op an operator used to both accumulate results within a partition and combine results
* from different partitions
*/
def fold(zeroValue: T)(op: (T, T) => T): T = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
Expand All @@ -988,6 +995,13 @@ abstract class RDD[T: ClassTag](
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
*
* @param zeroValue the initial value for the accumulated result of each partition for the
* `seqOp` operator, and also the initial value for the combine results from
* different partitions for the `combOp` operator - this will typically be the
* neutral element (e.g. `Nil` for list concatenation or `0` for summation)
* @param seqOp an operator used to accumulate results within a partition
* @param combOp an associative operator used to combine results from different partitions
*/
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = withScope {
// Clone the zero value since we will also be serializing it as part of tasks
Expand Down
3 changes: 1 addition & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,7 @@ private[spark] abstract class Task[T](
attemptNumber,
taskMemoryManager,
metricsSystem,
internalAccumulators,
runningLocally = false)
internalAccumulators)
TaskContext.setTaskContext(context)
context.taskMetrics.setHostname(Utils.localHostName())
context.taskMetrics.setAccumulatorsUpdater(context.collectInternalAccumulators)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ private[v1] class ExecutorListResource(ui: SparkUI) {
@GET
def executorList(): Seq[ExecutorSummary] = {
val listener = ui.executorsListener
val storageStatusList = listener.storageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId)
listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
val storageStatusList = listener.storageStatusList
(0 until storageStatusList.size).map { statusId =>
ExecutorsPage.getExecInfo(listener, statusId)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class StageData private[spark](
val status: StageStatus,
val stageId: Int,
val attemptId: Int,
val numActiveTasks: Int ,
val numActiveTasks: Int,
val numCompleteTasks: Int,
val numFailedTasks: Int,

Expand Down
13 changes: 10 additions & 3 deletions core/src/main/scala/org/apache/spark/ui/exec/ExecutorsPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,19 @@ private[ui] class ExecutorsPage(
private val listener = parent.listener

def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = listener.storageStatusList
val (storageStatusList, execInfo) = listener.synchronized {
// The follow codes should be protected by `listener` to make sure no executors will be
// removed before we query their status. See SPARK-12784.
val _storageStatusList = listener.storageStatusList
val _execInfo = {
for (statusId <- 0 until _storageStatusList.size)
yield ExecutorsPage.getExecInfo(listener, statusId)
}
(_storageStatusList, _execInfo)
}
val maxMem = storageStatusList.map(_.maxMem).sum
val memUsed = storageStatusList.map(_.memUsed).sum
val diskUsed = storageStatusList.map(_.diskUsed).sum
val execInfo = for (statusId <- 0 until storageStatusList.size) yield
ExecutorsPage.getExecInfo(listener, statusId)
val execInfoSorted = execInfo.sortBy(_.id)
val logsExist = execInfo.filter(_.executorLogs.nonEmpty).nonEmpty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.util.logging

import java.io.{File, FileOutputStream, InputStream}
import java.io.{File, FileOutputStream, InputStream, IOException}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.{IntParam, Utils}
Expand Down Expand Up @@ -58,20 +58,28 @@ private[spark] class FileAppender(inputStream: InputStream, file: File, bufferSi
protected def appendStreamToFile() {
try {
logDebug("Started appending thread")
openFile()
val buf = new Array[Byte](bufferSize)
var n = 0
while (!markedForStop && n != -1) {
n = inputStream.read(buf)
if (n != -1) {
appendToFile(buf, n)
Utils.tryWithSafeFinally {
openFile()
val buf = new Array[Byte](bufferSize)
var n = 0
while (!markedForStop && n != -1) {
try {
n = inputStream.read(buf)
} catch {
// An InputStream can throw IOException during read if the stream is closed
// asynchronously, so once appender has been flagged to stop these will be ignored
case _: IOException if markedForStop => // do nothing and proceed to stop appending
}
if (n > 0) {
appendToFile(buf, n)
}
}
} {
closeFile()
}
} catch {
case e: Exception =>
logError(s"Error writing stream to file $file", e)
} finally {
closeFile()
}
}

Expand Down
9 changes: 0 additions & 9 deletions core/src/test/scala/org/apache/spark/CacheManagerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,6 @@ class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with Before
assert(value.toList === List(5, 6, 7))
}

test("get uncached local rdd") {
// Local computation should not persist the resulting value, so don't expect a put().
when(blockManager.get(RDDBlockId(0, 0))).thenReturn(None)

val context = new TaskContextImpl(0, 0, 0, 0, null, null, Seq.empty, runningLocally = true)
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(1, 2, 3, 4))
}

test("verify task metrics updated correctly") {
cacheManager = sc.env.cacheManager
val context = TaskContext.empty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class DoubleRDDSuite extends SparkFunSuite with SharedSparkContext {
test("WorksWithOutOfRangeWithInfiniteBuckets") {
// Verify that out of range works with two buckets
val rdd = sc.parallelize(Seq(10.01, -0.01, Double.NaN))
val buckets = Array(-1.0/0.0 , 0.0, 1.0/0.0)
val buckets = Array(-1.0/0.0, 0.0, 1.0/0.0)
val histogramResults = rdd.histogram(buckets)
val expectedHistogramResults = Array(1, 1)
assert(histogramResults === expectedHistogramResults)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi

test("check spark-class location correctly") {
val conf = new SparkConf
conf.set("spark.mesos.executor.home" , "/mesos-home")
conf.set("spark.mesos.executor.home", "/mesos-home")

val listenerBus = mock[LiveListenerBus]
listenerBus.post(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
new Answer[(TempShuffleBlockId, File)] {
override def answer(invocation: InvocationOnMock): (TempShuffleBlockId, File) = {
val blockId = new TempShuffleBlockId(UUID.randomUUID)
val file = File.createTempFile(blockId.toString, null, tempDir)
val file = new File(tempDir, blockId.name)
blockIdToFileMap.put(blockId, file)
temporaryFilesCreated.append(file)
(blockId, file)
Expand Down Expand Up @@ -166,6 +166,7 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
writer.stop( /* success = */ true)
assert(temporaryFilesCreated.nonEmpty)
assert(writer.getPartitionLengths.sum === outputFile.length())
assert(writer.getPartitionLengths.filter(_ == 0L).size === 4) // should be 4 zero length files
assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
val shuffleWriteMetrics = taskContext.taskMetrics().shuffleWriteMetrics.get
assert(shuffleWriteMetrics.shuffleBytesWritten === outputFile.length())
Expand All @@ -174,6 +175,41 @@ class BypassMergeSortShuffleWriterSuite extends SparkFunSuite with BeforeAndAfte
assert(taskMetrics.memoryBytesSpilled === 0)
}

test("only generate temp shuffle file for non-empty partition") {
// Using exception to test whether only non-empty partition creates temp shuffle file,
// because temp shuffle file will only be cleaned after calling stop(false) in the failure
// case, so we could use it to validate the temp shuffle files.
def records: Iterator[(Int, Int)] =
Iterator((1, 1), (5, 5)) ++
(0 until 100000).iterator.map { i =>
if (i == 99990) {
throw new SparkException("intentional failure")
} else {
(2, 2)
}
}

val writer = new BypassMergeSortShuffleWriter[Int, Int](
blockManager,
blockResolver,
shuffleHandle,
0, // MapId
taskContext,
conf
)

intercept[SparkException] {
writer.write(records)
}

assert(temporaryFilesCreated.nonEmpty)
// Only 3 temp shuffle files will be created
assert(temporaryFilesCreated.count(_.exists()) === 3)

writer.stop( /* success = */ false)
assert(temporaryFilesCreated.count(_.exists()) === 0) // check that temporary files were deleted
}

test("cleanup of intermediate files after errors") {
val writer = new BypassMergeSortShuffleWriter[Int, Int](
blockManager,
Expand Down
Loading