Skip to content

Commit

Permalink
Fix the bug that ReceiverInputDStream doesn't report InputInfo
Browse files Browse the repository at this point in the history
The bug is because SPARK-7139 removed some codes from SPARK-7112 unintentionally here: 1854ac3#diff-5c8651dd78abd20439b8eb938175075dL72
  • Loading branch information
zsxwing committed May 6, 2015
1 parent 845d1d4 commit 675f5d9
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.scheduler.InputInfo
import org.apache.spark.streaming.util.WriteAheadLogUtils

/**
Expand Down Expand Up @@ -68,6 +69,10 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

// Register the input blocks information into InputInfoTracker
val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {

// Set up the streaming context and input streams
withStreamingContext(new StreamingContext(conf, batchDuration)) { ssc =>
ssc.addStreamingListener(ssc.progressListener)

val input = Seq(1, 2, 3, 4, 5)
// Use "batchCount" to make sure we check the result after all batches finish
val batchCounter = new BatchCounter(ssc)
Expand All @@ -72,6 +74,11 @@ class InputStreamsSuite extends TestSuiteBase with BeforeAndAfter {
if (!batchCounter.waitUntilBatchesCompleted(input.size, 30000)) {
fail("Timeout: cannot finish all batches in 30 seconds")
}

// Verify all "InputInfo"s have been reported
assert(ssc.progressListener.numTotalReceivedRecords === input.size)
assert(ssc.progressListener.numTotalProcessedRecords === input.size)

logInfo("Stopping server")
testServer.stop()
logInfo("Stopping context")
Expand Down

0 comments on commit 675f5d9

Please sign in to comment.