From 675f5d9e3e7e8b4f2e0b954131703e270d017175 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Wed, 6 May 2015 12:58:51 -0700 Subject: [PATCH] Fix the bug that ReceiverInputDStream doesn't report InputInfo The bug is because SPARK-7139 removed some codes from SPARK-7112 unintentionally here: https://github.com/apache/spark/commit/1854ac326a9cc6014817d8df30ed0458eee5d7d1#diff-5c8651dd78abd20439b8eb938175075dL72 --- .../spark/streaming/dstream/ReceiverInputDStream.scala | 5 +++++ .../org/apache/spark/streaming/InputStreamsSuite.scala | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala index 15d9710d37cd4..5cfe43a1ce726 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/ReceiverInputDStream.scala @@ -24,6 +24,7 @@ import org.apache.spark.storage.BlockId import org.apache.spark.streaming._ import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD import org.apache.spark.streaming.receiver.Receiver +import org.apache.spark.streaming.scheduler.InputInfo import org.apache.spark.streaming.util.WriteAheadLogUtils /** @@ -68,6 +69,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray + // Register the input blocks information into InputInfoTracker + val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum) + ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) + // Are WAL record handles present with all the blocks val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala index 60745325029f6..93e6b0cd7c661 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/InputStreamsSuite.scala @@ -50,6 +50,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { // Set up the streaming context and input streams withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc => + ssc.addStreamingListener(ssc.progressListener) + val input = Seq(1, 2, 3, 4, 5) // Use "batchCount" to make sure we check the result after all batches finish val batchCounter = new BatchCounter(ssc) @@ -72,6 +74,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter { if (!batchCounter.waitUntilBatchesCompleted(input.size, 30000)) { fail("Timeout: cannot finish all batches in 30 seconds") } + + // Verify all "InputInfo"s have been reported + assert(ssc.progressListener.numTotalReceivedRecords === input.size) + assert(ssc.progressListener.numTotalProcessedRecords === input.size) + logInfo("Stopping server") testServer.stop() logInfo("Stopping context")