Skip to content

Commit

Permalink
broke something
Browse files Browse the repository at this point in the history
  • Loading branch information
giwa committed Sep 20, 2014
1 parent 1e126bf commit 795b2cd
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 14 deletions.
46 changes: 46 additions & 0 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,9 +142,49 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):

def _testInputStream(self, test_inputs, numSlices=None):
"""
<<<<<<< HEAD
This function is only for test.
This implementation is inspired by QueStream implementation.
Give list of RDD to generate DStream which contains the RDD.
=======
Generate multiple files to make "stream" in Scala side for test.
Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile.
QueStream maybe good way to implement this function
"""
numSlices = numSlices or self._sc.defaultParallelism
# Calling the Java parallelize() method with an ArrayList is too slow,
# because it sends O(n) Py4J commands. As an alternative, serialized
# objects are written to a file and loaded through textFile().

tempFiles = list()
for test_input in test_inputs:
tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)

# Make sure we distribute data evenly if it's smaller than self.batchSize
if "__len__" not in dir(test_input):
test_input = list(test_input) # Make it a list so we can compute its length
batchSize = min(len(test_input) // numSlices, self._sc._batchSize)
if batchSize > 1:
serializer = BatchedSerializer(self._sc._unbatched_serializer,
batchSize)
else:
serializer = self._sc._unbatched_serializer
serializer.dump_stream(test_input, tempFile)
tempFile.close()
tempFiles.append(tempFile.name)

jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
jtempFiles,
numSlices).asJavaDStream()
return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer()))

def _testInputStream2(self, test_inputs, numSlices=None):
"""
This is inpired by QueStream implementation. Give list of RDD and generate DStream
which contain the RDD.
>>>>>>> broke something
"""
test_rdds = list()
test_rdd_deserializers = list()
Expand All @@ -156,4 +196,10 @@ def _testInputStream(self, test_inputs, numSlices=None):
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()

<<<<<<< HEAD
return DStream(jinput_stream, self, test_rdd_deserializers[0])
=======
dstream = DStream(jinput_stream, self, test_rdd_deserializers[0])
dstream._test_switch_dserializer(test_rdd_deserializers)
return dstream
>>>>>>> broke something
15 changes: 1 addition & 14 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

from collections import defaultdict
from itertools import chain, ifilter, imap
import time
import operator

from pyspark.serializers import NoOpSerializer,\
Expand Down Expand Up @@ -428,20 +429,6 @@ def saveAsTextFile(rdd, time):
# TODO: implemtnt rightOuterJoin


# TODO: implement groupByKey
# TODO: impelment union
# TODO: implement cache
# TODO: implement persist
# TODO: implement repertitions
# TODO: implement saveAsTextFile
# TODO: implement cogroup
# TODO: implement join
# TODO: implement countByValue
# TODO: implement leftOuterJoin
# TODO: implemtnt rightOuterJoin



class PipelinedDStream(DStream):
def __init__(self, prev, func, preservesPartitioning=False):
if not isinstance(prev, PipelinedDStream) or not prev._is_pipelinable():
Expand Down
10 changes: 10 additions & 0 deletions python/pyspark/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,16 @@ def main(infile, outfile):
(func, deserializer, serializer) = command
init_time = time.time()
iterator = deserializer.load_stream(infile)
print "deserializer in worker: %s" % str(deserializer)
iterator, walk = itertools.tee(iterator)
if isinstance(walk, int):
print "this is int"
print walk
else:
try:
print list(walk)
except:
print list(walk)
serializer.dump_stream(func(split_index, iterator), outfile)
except Exception:
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,3 +206,78 @@ class PythonTransformedDStream(
}
*/

<<<<<<< HEAD
=======
/**
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch under manual clock.
*/
class PythonTestInputStream(ssc_ : JavaStreamingContext, inputFiles: JArrayList[String], numPartitions: Int)
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)){

def start() {}

def stop() {}

def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
logInfo("Computing RDD for time " + validTime)
inputFiles.foreach(logInfo(_))
// make a temporary file
// make empty RDD
val prefix = "spark"
val suffix = ".tmp"
val tempFile = File.createTempFile(prefix, suffix)
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
logInfo("Index: " + index)

val selectedInputFile: String = {
if (inputFiles.isEmpty){
tempFile.getAbsolutePath
}else if (index < inputFiles.size()) {
inputFiles.get(index)
} else {
tempFile.getAbsolutePath
}
}
val rdd = PythonRDD.readRDDFromFile(JavaSparkContext.fromSparkContext(ssc_.sparkContext), selectedInputFile, numPartitions).rdd
logInfo("Created RDD " + rdd.id + " with " + selectedInputFile)
Some(rdd)
}

val asJavaDStream = JavaDStream.fromDStream(this)
}

/**
* This is a input stream just for the unitest. This is equivalent to a checkpointable,
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch under manual clock.
* This implementation is close to QueStream
*/

class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[JavaRDD[Array[Byte]]])
extends InputDStream[Array[Byte]](JavaStreamingContext.toStreamingContext(ssc_)) {

def start() {}

def stop() {}

def compute(validTime: Time): Option[RDD[Array[Byte]]] = {
val emptyRDD = ssc.sparkContext.emptyRDD[Array[Byte]]
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedRDD = {
if (inputRDDs.isEmpty) {
emptyRDD
} else if (index < inputRDDs.size()) {
inputRDDs.get(index).rdd
} else {
emptyRDD
}
}

Some(selectedRDD)
}

val asJavaDStream = JavaDStream.fromDStream(this)
}
>>>>>>> broke something

0 comments on commit 795b2cd

Please sign in to comment.