Skip to content

Commit

Permalink
all tests are passed if numSlice is 2 and the numver of each input is…
Browse files Browse the repository at this point in the history
… over 4
  • Loading branch information
giwa committed Sep 20, 2014
1 parent 795b2cd commit 8dcda84
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 47 deletions.
46 changes: 0 additions & 46 deletions python/pyspark/streaming/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,49 +142,9 @@ def stop(self, stopSparkContext=True, stopGraceFully=False):

def _testInputStream(self, test_inputs, numSlices=None):
"""
<<<<<<< HEAD
This function is only for test.
This implementation is inspired by QueStream implementation.
Give list of RDD to generate DStream which contains the RDD.
=======
Generate multiple files to make "stream" in Scala side for test.
Scala chooses one of the files and generates RDD using PythonRDD.readRDDFromFile.
QueStream maybe good way to implement this function
"""
numSlices = numSlices or self._sc.defaultParallelism
# Calling the Java parallelize() method with an ArrayList is too slow,
# because it sends O(n) Py4J commands. As an alternative, serialized
# objects are written to a file and loaded through textFile().

tempFiles = list()
for test_input in test_inputs:
tempFile = NamedTemporaryFile(delete=False, dir=self._sc._temp_dir)

# Make sure we distribute data evenly if it's smaller than self.batchSize
if "__len__" not in dir(test_input):
test_input = list(test_input) # Make it a list so we can compute its length
batchSize = min(len(test_input) // numSlices, self._sc._batchSize)
if batchSize > 1:
serializer = BatchedSerializer(self._sc._unbatched_serializer,
batchSize)
else:
serializer = self._sc._unbatched_serializer
serializer.dump_stream(test_input, tempFile)
tempFile.close()
tempFiles.append(tempFile.name)

jtempFiles = ListConverter().convert(tempFiles, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream(self._jssc,
jtempFiles,
numSlices).asJavaDStream()
return DStream(jinput_stream, self, BatchedSerializer(PickleSerializer()))

def _testInputStream2(self, test_inputs, numSlices=None):
"""
This is inpired by QueStream implementation. Give list of RDD and generate DStream
which contain the RDD.
>>>>>>> broke something
"""
test_rdds = list()
test_rdd_deserializers = list()
Expand All @@ -196,10 +156,4 @@ def _testInputStream2(self, test_inputs, numSlices=None):
jtest_rdds = ListConverter().convert(test_rdds, SparkContext._gateway._gateway_client)
jinput_stream = self._jvm.PythonTestInputStream(self._jssc, jtest_rdds).asJavaDStream()

<<<<<<< HEAD
return DStream(jinput_stream, self, test_rdd_deserializers[0])
=======
dstream = DStream(jinput_stream, self, test_rdd_deserializers[0])
dstream._test_switch_dserializer(test_rdd_deserializers)
return dstream
>>>>>>> broke something
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,21 @@ class PythonTestInputStream2(ssc_ : JavaStreamingContext, inputRDDs: JArrayList[

val asJavaDStream = JavaDStream.fromDStream(this)
}
>>>>>>> broke something


class PythonTestInputStream3(ssc_ : JavaStreamingContext)
extends InputDStream[Any](JavaStreamingContext.toStreamingContext(ssc_)) {

def start() {}

def stop() {}

def compute(validTime: Time): Option[RDD[Any]] = {
val index = ((validTime - zeroTime) / slideDuration - 1).toInt
val selectedInput = ArrayBuffer(1, 2, 3).toSeq
val rdd :RDD[Any] = ssc.sc.makeRDD(selectedInput, 2)
Some(rdd)
}

val asJavaDStream = JavaDStream.fromDStream(this)
}>>>>>>> broke something

0 comments on commit 8dcda84

Please sign in to comment.