Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark
Browse files Browse the repository at this point in the history
  • Loading branch information
zhzhan committed Oct 1, 2014
2 parents d10bf00 + c5414b6 commit adf4924
Show file tree
Hide file tree
Showing 30 changed files with 609 additions and 514 deletions.
17 changes: 11 additions & 6 deletions core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
/**
* Group the values for each key in the RDD into a single sequence. Allows controlling the
* partitioning of the resulting key-value pair RDD by passing a Partitioner.
* The ordering of elements within each group is not guaranteed, and may even differ
* each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
Expand All @@ -439,7 +441,8 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with into `numPartitions` partitions.
* resulting RDD with into `numPartitions` partitions. The ordering of elements within
* each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
Expand Down Expand Up @@ -535,7 +538,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])

/**
* Group the values for each key in the RDD into a single sequence. Hash-partitions the
* resulting RDD with the existing partitioner/parallelism level.
* resulting RDD with the existing partitioner/parallelism level. The ordering of elements
* within each group is not guaranteed, and may even differ each time the resulting RDD is
* evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
Expand Down Expand Up @@ -951,9 +956,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
val attemptNumber = (context.getAttemptId % Int.MaxValue).toInt
/* "reduce task" <split #> <attempt # = spark task #> */
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId,
val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.getPartitionId,
attemptNumber)
val hadoopContext = newTaskAttemptContext(wrappedConf.value, attemptId)
val format = outfmt.newInstance
Expand Down Expand Up @@ -1022,9 +1027,9 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
// Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
// around by taking a mod. We expect that no task will be attempted 2 billion times.
val attemptNumber = (context.attemptId % Int.MaxValue).toInt
val attemptNumber = (context.getAttemptId % Int.MaxValue).toInt

writer.setup(context.stageId, context.partitionId, attemptNumber)
writer.setup(context.getStageId, context.getPartitionId, attemptNumber)
writer.open()
try {
var count = 0
Expand Down
20 changes: 17 additions & 3 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,8 @@ abstract class RDD[T: ClassTag](

/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key.
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
Expand All @@ -520,7 +521,8 @@ abstract class RDD[T: ClassTag](

/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
Expand All @@ -531,7 +533,8 @@ abstract class RDD[T: ClassTag](

/**
* Return an RDD of grouped items. Each group consists of a key and a sequence of elements
* mapping to that key.
* mapping to that key. The ordering of elements within each group is not guaranteed, and
* may even differ each time the resulting RDD is evaluated.
*
* Note: This operation may be very expensive. If you are grouping in order to perform an
* aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]]
Expand Down Expand Up @@ -1028,15 +1031,26 @@ abstract class RDD[T: ClassTag](
* Zips this RDD with its element indices. The ordering is first based on the partition index
* and then the ordering of items within each partition. So the first item in the first
* partition gets index 0, and the last item in the last partition receives the largest index.
*
* This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
* This method needs to trigger a spark job when this RDD contains more than one partitions.
*
* Note that some RDDs, such as those returned by groupBy(), do not guarantee order of
* elements in a partition. The index assigned to each element is therefore not guaranteed,
* and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
*/
def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this)

/**
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
*
* Note that some RDDs, such as those returned by groupBy(), do not guarantee order of
* elements in a partition. The unique ID assigned to each element is therefore not guaranteed,
* and may even change if the RDD is reevaluated. If a fixed ordering is required to guarantee
* the same index assignments, you should sort the RDD with sortByKey() or save it to a file.
*/
def zipWithUniqueId(): RDD[(T, Long)] = {
val n = this.partitions.size.toLong
Expand Down
15 changes: 8 additions & 7 deletions dev/check-license
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,28 @@

acquire_rat_jar () {

URL1="http://search.maven.org/remotecontent?filepath=org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar"
URL2="http://repo1.maven.org/maven2/org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar"
URL="http://repo1.maven.org/maven2/org/apache/rat/apache-rat/${RAT_VERSION}/apache-rat-${RAT_VERSION}.jar"

JAR="$rat_jar"

if [[ ! -f "$rat_jar" ]]; then
# Download rat launch jar if it hasn't been downloaded yet
if [ ! -f "$JAR" ]; then
# Download
printf "Attempting to fetch rat\n"
JAR_DL="${JAR}.part"
if hash curl 2>/dev/null; then
(curl --silent "${URL1}" > "$JAR_DL" || curl --silent "${URL2}" > "$JAR_DL") && mv "$JAR_DL" "$JAR"
curl --silent "${URL}" > "$JAR_DL" && mv "$JAR_DL" "$JAR"
elif hash wget 2>/dev/null; then
(wget --quiet ${URL1} -O "$JAR_DL" || wget --quiet ${URL2} -O "$JAR_DL") && mv "$JAR_DL" "$JAR"
wget --quiet ${URL} -O "$JAR_DL" && mv "$JAR_DL" "$JAR"
else
printf "You do not have curl or wget installed, please install rat manually.\n"
exit -1
fi
fi
if [ ! -f "$JAR" ]; then

unzip -tq $JAR &> /dev/null
if [ $? -ne 0 ]; then
# We failed to download
printf "Our attempt to download rat locally to ${JAR} failed. Please install rat manually.\n"
exit -1
Expand All @@ -55,7 +56,7 @@ cd "$FWDIR"

if test -x "$JAVA_HOME/bin/java"; then
declare java_cmd="$JAVA_HOME/bin/java"
else
else
declare java_cmd=java
fi

Expand Down
6 changes: 4 additions & 2 deletions dev/run-tests-jenkins
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,10 @@ function post_message () {
test_result="$?"

if [ "$test_result" -eq "124" ]; then
fail_message="**[Tests timed out](${BUILD_URL}consoleFull)** after \
a configured wait of \`${TESTS_TIMEOUT}\`."
fail_message="**[Tests timed out](${BUILD_URL}consoleFull)** \
for PR $ghprbPullId at commit [\`${SHORT_COMMIT_HASH}\`](${COMMIT_URL}) \
after a configured wait of \`${TESTS_TIMEOUT}\`."

post_message "$fail_message"
exit $test_result
else
Expand Down
19 changes: 19 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,25 @@ Apart from these, the following properties are also available, and may be useful
used during aggregation goes above this amount, it will spill the data into disks.
</td>
</tr>
<tr>
<td><code>spark.python.profile</code></td>
<td>false</td>
<td>
Enable profiling in Python worker, the profile result will show up by `sc.show_profiles()`,
or it will be displayed before the driver exiting. It also can be dumped into disk by
`sc.dump_profiles(path)`. If some of the profile results had been displayed maually,
they will not be displayed automatically before driver exiting.
</td>
</tr>
<tr>
<td><code>spark.python.profile.dump</code></td>
<td>(none)</td>
<td>
The directory which is used to dump the profile result before driver exiting.
The results will be dumped as separated file for each RDD. They can be loaded
by ptats.Stats(). If this is specified, the profile result will not be displayed
automatically.
</tr>
<tr>
<td><code>spark.python.worker.reuse</code></td>
<td>true</td>
Expand Down
2 changes: 1 addition & 1 deletion docs/programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -883,7 +883,7 @@ for details.
<tr>
<td> <b>groupByKey</b>([<i>numTasks</i>]) </td>
<td> When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable&lt;V&gt;) pairs. <br />
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
<b>Note:</b> If you are grouping in order to perform an aggregation (such as a sum or
average) over each key, using <code>reduceByKey</code> or <code>combineByKey</code> will yield much better
performance.
<br />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.flume.source.avro.{AvroFlumeEvent, AvroSourceProtocol}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{TestOutputStream, StreamingContext, TestSuiteBase}
import org.apache.spark.streaming.util.ManualClock
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream
import org.apache.spark.util.Utils

import org.jboss.netty.channel.ChannelPipeline
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
Expand All @@ -41,35 +41,40 @@ import org.jboss.netty.handler.codec.compression._
class FlumeStreamSuite extends TestSuiteBase {

test("flume input stream") {
runFlumeStreamTest(false, 9998)
runFlumeStreamTest(false)
}

test("flume input compressed stream") {
runFlumeStreamTest(true, 9997)
runFlumeStreamTest(true)
}

def runFlumeStreamTest(enableDecompression: Boolean, testPort: Int) {
def runFlumeStreamTest(enableDecompression: Boolean) {
// Set up the streaming context and input streams
val ssc = new StreamingContext(conf, batchDuration)
val flumeStream: JavaReceiverInputDStream[SparkFlumeEvent] =
FlumeUtils.createStream(ssc, "localhost", testPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
val (flumeStream, testPort) =
Utils.startServiceOnPort(9997, (trialPort: Int) => {
val dstream = FlumeUtils.createStream(
ssc, "localhost", trialPort, StorageLevel.MEMORY_AND_DISK, enableDecompression)
(dstream, trialPort)
})

val outputBuffer = new ArrayBuffer[Seq[SparkFlumeEvent]]
with SynchronizedBuffer[Seq[SparkFlumeEvent]]
val outputStream = new TestOutputStream(flumeStream.receiverInputDStream, outputBuffer)
val outputStream = new TestOutputStream(flumeStream, outputBuffer)
outputStream.register()
ssc.start()

val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val input = Seq(1, 2, 3, 4, 5)
Thread.sleep(1000)
val transceiver = new NettyTransceiver(new InetSocketAddress("localhost", testPort))
var client: AvroSourceProtocol = null;
var client: AvroSourceProtocol = null

if (enableDecompression) {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol],
new NettyTransceiver(new InetSocketAddress("localhost", testPort),
new CompressionChannelFactory(6)));
new CompressionChannelFactory(6)))
} else {
client = SpecificRequestor.getClient(
classOf[AvroSourceProtocol], transceiver)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ sealed trait Matrix extends Serializable {
}

/**
* Column-majored dense matrix.
* Column-major dense matrix.
* The entry values are stored in a single array of doubles with columns listed in sequence.
* For example, the following matrix
* {{{
Expand Down Expand Up @@ -128,7 +128,7 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double])
}

/**
* Column-majored sparse matrix.
* Column-major sparse matrix.
* The entry values are stored in Compressed Sparse Column (CSC) format.
* For example, the following matrix
* {{{
Expand Down Expand Up @@ -207,7 +207,7 @@ class SparseMatrix(
object Matrices {

/**
* Creates a column-majored dense matrix.
* Creates a column-major dense matrix.
*
* @param numRows number of rows
* @param numCols number of columns
Expand All @@ -218,7 +218,7 @@ object Matrices {
}

/**
* Creates a column-majored sparse matrix in Compressed Sparse Column (CSC) format.
* Creates a column-major sparse matrix in Compressed Sparse Column (CSC) format.
*
* @param numRows number of rows
* @param numCols number of columns
Expand Down
15 changes: 15 additions & 0 deletions python/pyspark/accumulators.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,21 @@ def addInPlace(self, value1, value2):
COMPLEX_ACCUMULATOR_PARAM = AddingAccumulatorParam(0.0j)


class PStatsParam(AccumulatorParam):
"""PStatsParam is used to merge pstats.Stats"""

@staticmethod
def zero(value):
return None

@staticmethod
def addInPlace(value1, value2):
if value1 is None:
return value2
value1.add(value2)
return value1


class _UpdateRequestHandler(SocketServer.StreamRequestHandler):

"""
Expand Down
39 changes: 38 additions & 1 deletion python/pyspark/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import sys
from threading import Lock
from tempfile import NamedTemporaryFile
import atexit

from pyspark import accumulators
from pyspark.accumulators import Accumulator
Expand All @@ -30,7 +31,6 @@
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
PairDeserializer, CompressedSerializer
from pyspark.storagelevel import StorageLevel
from pyspark import rdd
from pyspark.rdd import RDD
from pyspark.traceback_utils import CallSite, first_spark_call

Expand Down Expand Up @@ -192,6 +192,9 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._temp_dir = \
self._jvm.org.apache.spark.util.Utils.createTempDir(local_dir).getAbsolutePath()

# profiling stats collected for each PythonRDD
self._profile_stats = []

def _initialize_context(self, jconf):
"""
Initialize SparkContext in function to allow subclass specific initialization
Expand Down Expand Up @@ -792,6 +795,40 @@ def runJob(self, rdd, partitionFunc, partitions=None, allowLocal=False):
it = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, javaPartitions, allowLocal)
return list(mappedRDD._collect_iterator_through_file(it))

def _add_profile(self, id, profileAcc):
if not self._profile_stats:
dump_path = self._conf.get("spark.python.profile.dump")
if dump_path:
atexit.register(self.dump_profiles, dump_path)
else:
atexit.register(self.show_profiles)

self._profile_stats.append([id, profileAcc, False])

def show_profiles(self):
""" Print the profile stats to stdout """
for i, (id, acc, showed) in enumerate(self._profile_stats):
stats = acc.value
if not showed and stats:
print "=" * 60
print "Profile of RDD<id=%d>" % id
print "=" * 60
stats.sort_stats("time", "cumulative").print_stats()
# mark it as showed
self._profile_stats[i][2] = True

def dump_profiles(self, path):
""" Dump the profile stats into directory `path`
"""
if not os.path.exists(path):
os.makedirs(path)
for id, acc, _ in self._profile_stats:
stats = acc.value
if stats:
p = os.path.join(path, "rdd_%d.pstats" % id)
stats.dump_stats(p)
self._profile_stats = []


def _test():
import atexit
Expand Down
Loading

0 comments on commit adf4924

Please sign in to comment.