Skip to content

Commit

Permalink
Addressed one more comment.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Oct 30, 2014
1 parent ed5fbf0 commit 4a5866f
Showing 1 changed file with 8 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,31 +76,27 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
* and the rest to a write ahead log, and then reading reading it all back using the RDD.
* It can also test if the partitions that were read from the log were again stored in
* block manager.
* @param numPartitionssInBM Number of partitions to write to the Block Manager
* @param numPartitionsInBM Number of partitions to write to the Block Manager
* @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log
* @param testStoreInBM Test whether blocks read from log are stored back into block manager
*/
private def testRDD(
numPartitionssInBM: Int,
numPartitionsInWAL: Int,
testStoreInBM: Boolean = false
) {
val numBlocks = numPartitionssInBM + numPartitionsInWAL
private def testRDD(numPartitionsInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) {
val numBlocks = numPartitionsInBM + numPartitionsInWAL
val data = Seq.fill(numBlocks, 10)(scala.util.Random.nextString(50))

// Put the necessary blocks in the block manager
val blockIds = Array.fill(numBlocks)(StreamBlockId(Random.nextInt(), Random.nextInt()))
data.zip(blockIds).take(numPartitionssInBM).foreach { case(block, blockId) =>
data.zip(blockIds).take(numPartitionsInBM).foreach { case(block, blockId) =>
blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER)
}

// Generate write ahead log segments
val segments = generateFakeSegments(numPartitionssInBM) ++
val segments = generateFakeSegments(numPartitionsInBM) ++
writeLogSegments(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL))

// Make sure that the left `numPartitionssInBM` blocks are in block manager, and others are not
// Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not
require(
blockIds.take(numPartitionssInBM).forall(blockManager.get(_).nonEmpty),
blockIds.take(numPartitionsInBM).forall(blockManager.get(_).nonEmpty),
"Expected blocks not in BlockManager"
)
require(
Expand All @@ -115,7 +111,7 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
"Expected blocks not in write ahead log"
)
require(
segments.take(numPartitionssInBM).forall(s =>
segments.take(numPartitionsInBM).forall(s =>
!new File(s.path.stripPrefix("file://")).exists()),
"Unexpected blocks in write ahead log"
)
Expand Down

0 comments on commit 4a5866f

Please sign in to comment.