From 4a5866fb19fcd4fde7551294a30bd740500c834a Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 30 Oct 2014 03:42:37 -0700 Subject: [PATCH] Addressed one more comment. --- .../WriteAheadLogBackedBlockRDDSuite.scala | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala index 458c0ee7ca530..10160244bcc91 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDDSuite.scala @@ -76,31 +76,27 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { * and the rest to a write ahead log, and then reading reading it all back using the RDD. * It can also test if the partitions that were read from the log were again stored in * block manager. - * @param numPartitionssInBM Number of partitions to write to the Block Manager + * @param numPartitionsInBM Number of partitions to write to the Block Manager * @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log * @param testStoreInBM Test whether blocks read from log are stored back into block manager */ - private def testRDD( - numPartitionssInBM: Int, - numPartitionsInWAL: Int, - testStoreInBM: Boolean = false - ) { - val numBlocks = numPartitionssInBM + numPartitionsInWAL + private def testRDD(numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) { + val numBlocks = numPartitionsInBM + numPartitionsInWAL val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50)) // Put the necessary blocks in the block manager val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt())) - data.zip(blockIds).take(numPartitionssInBM).foreach { case(block, blockId) => + data.zip(blockIds).take(numPartitionsInBM).foreach { case(block, blockId) => blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER) } // Generate write ahead log segments - val segments = generateFakeSegments(numPartitionssInBM) ++ + val segments = generateFakeSegments(numPartitionsInBM) ++ writeLogSegments(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL)) - // Make sure that the left `numPartitionssInBM` blocks are in block manager, and others are not + // Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not require( - blockIds.take(numPartitionssInBM).forall(blockManager.get(_).nonEmpty), + blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty), "Expected blocks not in BlockManager" ) require( @@ -115,7 +111,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll { "Expected blocks not in write ahead log" ) require( - segments.take(numPartitionssInBM).forall(s => + segments.take(numPartitionsInBM).forall(s => !new File(s.path.stripPrefix("file://")).exists()), "Unexpected blocks in write ahead log" )