Skip to content

Commit

Permalink
Merge branch 'master' into openstack
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Jun 17, 2014
2 parents 99f095d + 237b96b commit 47ce99d
Show file tree
Hide file tree
Showing 60 changed files with 599 additions and 307 deletions.
1 change: 1 addition & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.3</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package org.apache.spark.deploy

private[spark] object ExecutorState extends Enumeration {

val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST = Value
val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value

type ExecutorState = Value

def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST).contains(state)
def isFinished(state: ExecutorState): Boolean = Seq(KILLED, FAILED, LOST, EXITED).contains(state)
}
Original file line number Diff line number Diff line change
Expand Up @@ -303,10 +303,11 @@ private[spark] class Master(
appInfo.removeExecutor(exec)
exec.worker.removeExecutor(exec)

val normalExit = exitStatus.exists(_ == 0)
// Only retry certain number of times so we don't go into an infinite loop.
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
if (!normalExit && appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) {
schedule()
} else {
} else if (!normalExit) {
logError("Application %s with ID %s failed %d times, removing it".format(
appInfo.desc.name, appInfo.id, appInfo.retryCount))
removeApplication(appInfo, ApplicationState.FAILED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,11 +154,10 @@ private[spark] class ExecutorRunner(
Files.write(header, stderr, Charsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

// Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
// long-lived processes only. However, in the future, we might restart the executor a few
// times on the same machine.
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
state = ExecutorState.FAILED
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
worker ! ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))
} catch {
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,13 @@ class HadoopRDD[K, V](
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
val newJobConf = new JobConf(broadcastedConf.value.value)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
newJobConf
// synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456)
broadcastedConf.synchronized {
val newJobConf = new JobConf(broadcastedConf.value.value)
initLocalJobConfFuncOpt.map(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
newJobConf
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -787,8 +787,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val outfmt = job.getOutputFormatClass
val jobFormat = outfmt.newInstance

if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
jobFormat.isInstanceOf[NewFileOutputFormat[_, _]]) {
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
jobFormat.checkOutputSpecs(job)
}
Expand Down Expand Up @@ -854,8 +853,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
valueClass.getSimpleName + ")")

if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true) &&
outputFormatInstance.isInstanceOf[FileOutputFormat[_, _]]) {
if (self.conf.getBoolean("spark.hadoop.validateOutputSpecs", true)) {
// FileOutputFormat ignores the filesystem parameter
val ignoredFs = FileSystem.get(conf)
conf.getOutputFormat.checkOutputSpecs(ignoredFs, conf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,15 @@ private object ParallelCollectionRDD {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of slices required")
}
// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {
(0 until numSlices).iterator.map(i => {
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
})
}
seq match {
case r: Range.Inclusive => {
val sign = if (r.step < 0) {
Expand All @@ -128,30 +137,28 @@ private object ParallelCollectionRDD {
r.start, r.end + sign, r.step).asInstanceOf[Seq[T]], numSlices)
}
case r: Range => {
(0 until numSlices).map(i => {
val start = ((i * r.length.toLong) / numSlices).toInt
val end = (((i + 1) * r.length.toLong) / numSlices).toInt
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}).asInstanceOf[Seq[Seq[T]]]
positions(r.length, numSlices).map({
case (start, end) =>
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}).toSeq.asInstanceOf[Seq[Seq[T]]]
}
case nr: NumericRange[_] => {
// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
val sliceSize = (nr.size + numSlices - 1) / numSlices // Round up to catch everything
var r = nr
for (i <- 0 until numSlices) {
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
}
case _ => {
val array = seq.toArray // To prevent O(n^2) operations for List etc
(0 until numSlices).map(i => {
val start = ((i * array.length.toLong) / numSlices).toInt
val end = (((i + 1) * array.length.toLong) / numSlices).toInt
array.slice(start, end).toSeq
})
positions(array.length, numSlices).map({
case (start, end) =>
array.slice(start, end).toSeq
}).toSeq
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/storage/RDDInfo.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class RDDInfo(
val id: Int,
val name: String,
val numPartitions: Int,
val storageLevel: StorageLevel)
var storageLevel: StorageLevel)
extends Ordered[RDDInfo] {

var numCachedPartitions = 0
Expand All @@ -36,8 +36,8 @@ class RDDInfo(

override def toString = {
import Utils.bytesToString
("RDD \"%s\" (%d) Storage: %s; CachedPartitions: %d; TotalPartitions: %d; MemorySize: %s; " +
"TachyonSize: %s; DiskSize: %s").format(
("RDD \"%s\" (%d) StorageLevel: %s; CachedPartitions: %d; TotalPartitions: %d; " +
"MemorySize: %s; TachyonSize: %s; DiskSize: %s").format(
name, id, storageLevel.toString, numCachedPartitions, numPartitions,
bytesToString(memSize), bytesToString(tachyonSize), bytesToString(diskSize))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,13 @@ private[spark] object StorageUtils {
// Add up memory, disk and Tachyon sizes
val persistedBlocks =
blocks.filter { status => status.memSize + status.diskSize + status.tachyonSize > 0 }
val _storageLevel =
if (persistedBlocks.length > 0) persistedBlocks(0).storageLevel else StorageLevel.NONE
val memSize = persistedBlocks.map(_.memSize).reduceOption(_ + _).getOrElse(0L)
val diskSize = persistedBlocks.map(_.diskSize).reduceOption(_ + _).getOrElse(0L)
val tachyonSize = persistedBlocks.map(_.tachyonSize).reduceOption(_ + _).getOrElse(0L)
rddInfoMap.get(rddId).map { rddInfo =>
rddInfo.storageLevel = _storageLevel
rddInfo.numCachedPartitions = persistedBlocks.length
rddInfo.memSize = memSize
rddInfo.diskSize = diskSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,24 @@ class ParallelCollectionSplitSuite extends FunSuite with Checkers {
assert(slices.forall(_.isInstanceOf[Range]))
}

test("identical slice sizes between Range and NumericRange") {
val r = ParallelCollectionRDD.slice(1 to 7, 4)
val nr = ParallelCollectionRDD.slice(1L to 7L, 4)
assert(r.size === 4)
for (i <- 0 until r.size) {
assert(r(i).size === nr(i).size)
}
}

test("identical slice sizes between List and NumericRange") {
val r = ParallelCollectionRDD.slice(List(1, 2), 4)
val nr = ParallelCollectionRDD.slice(1L to 2L, 4)
assert(r.size === 4)
for (i <- 0 until r.size) {
assert(r(i).size === nr(i).size)
}
}

test("large ranges don't overflow") {
val N = 100 * 1000 * 1000
val data = 0 until N
Expand Down
28 changes: 21 additions & 7 deletions docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,13 +377,15 @@ Some notes on reading files with Spark:

* The `textFile` method also takes an optional second argument for controlling the number of slices of the file. By default, Spark creates one slice for each block of the file (blocks being 64MB by default in HDFS), but you can also ask for a higher number of slices by passing a larger value. Note that you cannot have fewer slices than blocks.

Apart from reading files as a collection of lines,
`SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file.
Apart from text files, Spark's Python API also supports several other data formats:

### SequenceFile and Hadoop InputFormats
* `SparkContext.wholeTextFiles` lets you read a directory containing multiple small text files, and returns each of them as (filename, content) pairs. This is in contrast with `textFile`, which would return one record per line in each file.

* `RDD.saveAsPickleFile` and `SparkContext.pickleFile` support saving an RDD in a simple format consisting of pickled Python objects. Batching is used on pickle serialization, with default batch size 10.

In addition to reading text files, PySpark supports reading ```SequenceFile```
and any arbitrary ```InputFormat```.
* Details on reading `SequenceFile` and arbitrary Hadoop `InputFormat` are given below.

### SequenceFile and Hadoop InputFormats

**Note** this feature is currently marked ```Experimental``` and is intended for advanced users. It may be replaced in future with read/write support based on SparkSQL, in which case SparkSQL is the preferred approach.

Expand Down Expand Up @@ -760,6 +762,11 @@ val counts = pairs.reduceByKey((a, b) => a + b)
We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
`counts.collect()` to bring them back to the driver program as an array of objects.

**Note:** when using custom objects as the key in key-value pair operations, you must be sure that a
custom `equals()` method is accompanied with a matching `hashCode()` method. For full details, see
the contract outlined in the [Object.hashCode()
documentation](http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#hashCode()).

</div>

<div data-lang="java" markdown="1">
Expand Down Expand Up @@ -792,6 +799,10 @@ JavaPairRDD<String, Integer> counts = pairs.reduceByKey((a, b) -> a + b);
We could also use `counts.sortByKey()`, for example, to sort the pairs alphabetically, and finally
`counts.collect()` to bring them back to the driver program as an array of objects.

**Note:** when using custom objects as the key in key-value pair operations, you must be sure that a
custom `equals()` method is accompanied with a matching `hashCode()` method. For full details, see
the contract outlined in the [Object.hashCode()
documentation](http://docs.oracle.com/javase/7/docs/api/java/lang/Object.html#hashCode()).

</div>

Expand Down Expand Up @@ -888,7 +899,7 @@ for details.
</tr>
<tr>
<td> <b>reduceByKey</b>(<i>func</i>, [<i>numTasks</i>]) </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function <i>func</i>, which must be of type (V,V) => V. Like in <code>groupByKey</code>, the number of reduce tasks is configurable through an optional second argument. </td>
</tr>
<tr>
<td> <b>aggregateByKey</b>(<i>zeroValue</i>)(<i>seqOp</i>, <i>combOp</i>, [<i>numTasks</i>]) </td>
Expand Down Expand Up @@ -1056,7 +1067,10 @@ storage levels is:
<td> Store RDD in serialized format in <a href="http://tachyon-project.org">Tachyon</a>.
Compared to MEMORY_ONLY_SER, OFF_HEAP reduces garbage collection overhead and allows executors
to be smaller and to share a pool of memory, making it attractive in environments with
large heaps or multiple concurrent applications.
large heaps or multiple concurrent applications. Furthermore, as the RDDs reside in Tachyon,
the crash of an executor does not lead to losing the in-memory cache. In this mode, the memory
in Tachyon is discardable. Thus, Tachyon does not attempt to reconstruct a block that it evicts
from memory.
</td>
</tr>
</table>
Expand Down
14 changes: 14 additions & 0 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,20 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
The address of the Spark history server (i.e. host.com:18080). The address should not contain a scheme (http://). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI.
</td>
</tr>
<tr>
<td><code>spark.yarn.executor.memoryOverhead</code></td>
<td>384</code></td>
<td>
The amount of off heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
</td>
</tr>
<tr>
<td><code>spark.yarn.driver.memoryOverhead</code></td>
<td>384</code></td>
<td>
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc.
</td>
</tr>
</table>

By default, Spark on YARN will use a Spark jar installed locally, but the Spark JAR can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn't need to be distributed each time an application runs. To point to a JAR on HDFS, `export SPARK_JAR=hdfs:///some/path`.
Expand Down
2 changes: 1 addition & 1 deletion docs/streaming-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -950,7 +950,7 @@ is 200 milliseconds.

An alternative to receiving data with multiple input streams / receivers is to explicitly repartition
the input data stream (using `inputStream.repartition(<number of partitions>)`).
This distributes the received batches of data across all the machines in the cluster
This distributes the received batches of data across specified number of machines in the cluster
before further processing.

### Level of Parallelism in Data Processing
Expand Down
26 changes: 24 additions & 2 deletions python/pyspark/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#

from pyspark.rdd import RDD
from pyspark.serializers import BatchedSerializer, PickleSerializer

from py4j.protocol import Py4JError

Expand Down Expand Up @@ -76,12 +77,25 @@ def inferSchema(self, rdd):
"""Infer and apply a schema to an RDD of L{dict}s.
We peek at the first row of the RDD to determine the fields names
and types, and then use that to extract all the dictionaries.
and types, and then use that to extract all the dictionaries. Nested
collections are supported, which include array, dict, list, set, and
tuple.
>>> srdd = sqlCtx.inferSchema(rdd)
>>> srdd.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"},
... {"field1" : 3, "field2": "row3"}]
True
>>> from array import array
>>> srdd = sqlCtx.inferSchema(nestedRdd1)
>>> srdd.collect() == [{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
... {"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}]
True
>>> srdd = sqlCtx.inferSchema(nestedRdd2)
>>> srdd.collect() == [{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
... {"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}]
True
"""
if (rdd.__class__ is SchemaRDD):
raise ValueError("Cannot apply schema to %s" % SchemaRDD.__name__)
Expand Down Expand Up @@ -346,7 +360,8 @@ def _toPython(self):
# TODO: This is inefficient, we should construct the Python Row object
# in Java land in the javaToPython function. May require a custom
# pickle serializer in Pyrolite
return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d))
return RDD(jrdd, self._sc, BatchedSerializer(
PickleSerializer())).map(lambda d: Row(d))

# We override the default cache/persist/checkpoint behavior as we want to cache the underlying
# SchemaRDD object in the JVM, not the PythonRDD checkpointed by the super class
Expand Down Expand Up @@ -411,6 +426,7 @@ def subtract(self, other, numPartitions=None):

def _test():
import doctest
from array import array
from pyspark.context import SparkContext
globs = globals().copy()
# The small batch size here ensures that we see multiple batches,
Expand All @@ -420,6 +436,12 @@ def _test():
globs['sqlCtx'] = SQLContext(sc)
globs['rdd'] = sc.parallelize([{"field1" : 1, "field2" : "row1"},
{"field1" : 2, "field2": "row2"}, {"field1" : 3, "field2": "row3"}])
globs['nestedRdd1'] = sc.parallelize([
{"f1" : array('i', [1, 2]), "f2" : {"row1" : 1.0}},
{"f1" : array('i', [2, 3]), "f2" : {"row2" : 2.0}}])
globs['nestedRdd2'] = sc.parallelize([
{"f1" : [[1, 2], [2, 3]], "f2" : set([1, 2]), "f3" : (1, 2)},
{"f1" : [[2, 3], [3, 4]], "f2" : set([2, 3]), "f3" : (2, 3)}])
(failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
Expand Down
Loading

0 comments on commit 47ce99d

Please sign in to comment.