Skip to content

Commit

Permalink
Fixed line length issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Oct 28, 2014
1 parent 9e47b5b commit 29aa099
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,41 +41,47 @@ class WriteAheadLogBackedBlockRDDPartition(


/**
* This class represents a special case of the BlockRDD where the data blocks in the block manager are also
* backed by segments in write ahead logs. For reading the data, this RDD first looks up the blocks by their ids
* in the block manager. If it does not find them, it looks up the corresponding file segment.
* This class represents a special case of the BlockRDD where the data blocks in
* the block manager are also backed by segments in write ahead logs. For reading
* the data, this RDD first looks up the blocks by their ids in the block manager.
* If it does not find them, it looks up the corresponding file segment.
*
* @param sc SparkContext
* @param hadoopConfiguration Hadoop configuration
* @param hadoopConfig Hadoop configuration
* @param blockIds Ids of the blocks that contains this RDD's data
* @param segments Segments in write ahead logs that contain this RDD's data
* @param storeInBlockManager Whether to store in the block manager after reading from the log segment
* @param storageLevel storage level to store when storing in block manager (applicable when storeInBlockManager = true)
* @param storeInBlockManager Whether to store in the block manager after reading from the segment
* @param storageLevel storage level to store when storing in block manager
* (applicable when storeInBlockManager = true)
*/
private[streaming]
class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient hadoopConfiguration: Configuration,
@transient hadoopConfig: Configuration,
@transient override val blockIds: Array[BlockId],
@transient val segments: Array[WriteAheadLogFileSegment],
val storeInBlockManager: Boolean,
val storageLevel: StorageLevel
) extends BlockRDD[T](sc, blockIds) {

require(blockIds.length == segments.length,
s"Number of block ids (${blockIds.length}) must be the same as number of segments (${segments.length}})!")
require(
blockIds.length == segments.length,
s"Number of block ids (${blockIds.length}) must be " +
s"the same as number of segments (${segments.length}})!")

// Hadoop configuration is not serializable, so broadcast it as a serializable.
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfiguration)
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)

override def getPartitions: Array[Partition] = {
assertValid()
Array.tabulate(blockIds.size){ i => new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) }
Array.tabulate(blockIds.size) { i =>
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i)) }
}

/**
* Gets the partition data by getting the corresponding block from the block manager. If the block does not
* exist, then the data is read from the corresponding segment in write ahead log files.
* Gets the partition data by getting the corresponding block from the block manager.
* If the block does not exist, then the data is read from the corresponding segment
* in write ahead log files.
*/
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
assertValid()
Expand All @@ -86,31 +92,32 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
blockManager.get(blockId) match {
case Some(block) => // Data is in Block Manager
val iterator = block.data.asInstanceOf[Iterator[T]]
logDebug(s"Read partition data of RDD $this from block manager, block $blockId")
logDebug(s"Read partition data of $this from block manager, block $blockId")
iterator
case None => // Data not found in Block Manager, grab it from write ahead log file
val reader = new WriteAheadLogRandomReader(partition.segment.path, hadoopConf)
val dataRead = reader.read(partition.segment)
reader.close()
logInfo(s"Read partition data of RDD $this from write ahead log, segment ${partition.segment}")
logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}")
if (storeInBlockManager) {
blockManager.putBytes(blockId, dataRead, storageLevel)
logDebug(s"Stored partition data of RDD $this into block manager with level $storageLevel")
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
}
}

/**
* Get the preferred location of the partition. This returns the locations of the block if it is present in the
* block manager, else it returns the location of the corresponding segment in HDFS.
* Get the preferred location of the partition. This returns the locations of the block
* if it is present in the block manager, else it returns the location of the
* corresponding segment in HDFS.
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockLocations = getBlockIdLocations().get(partition.blockId)
lazy val segmentLocations = HdfsUtils.getBlockLocations(
partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfiguration)
partition.segment.path, partition.segment.offset, partition.segment.length, hadoopConfig)
blockLocations.orElse(segmentLocations).getOrElse(Seq.empty)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,19 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
}

/**
* Test the WriteAheadLogBackedRDD, by writing some partitions of the data to Block Manager and the rest
* to a WriteAheadLog, and then reading reading it all back using the RDD.
* It can also test if the partitions that were read from the log were again stored in block manager.
* Test the WriteAheadLogBackedRDD, by writing some partitions of the data to block manager
* and the rest to a write ahead log, and then reading reading it all back using the RDD.
* It can also test if the partitions that were read from the log were again stored in
* block manager.
* @param numPartitionssInBM Number of partitions to write to the Block Manager
* @param numPartitionsInWAL Number of partitions to write to the Write Ahead Log
* @param testStoreInBM Test whether blocks read from log are stored back into block manager
*/
private def testRDD(numPartitionssInBM: Int, numPartitionsInWAL: Int, testStoreInBM: Boolean = false) {
private def testRDD(
numPartitionssInBM: Int,
numPartitionsInWAL: Int,
testStoreInBM: Boolean = false
) {
val numBlocks = numPartitionssInBM + numPartitionsInWAL
val data = Seq.tabulate(numBlocks) { _ => Seq.fill(10) { scala.util.Random.nextString(50) } }

Expand All @@ -104,11 +109,13 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {

// Make sure that the right `numPartitionsInWAL` blocks are in write ahead logs, and other are not
require(
segments.takeRight(numPartitionsInWAL).forall(s => new File(s.path.stripPrefix("file://")).exists()),
segments.takeRight(numPartitionsInWAL).forall(s =>
new File(s.path.stripPrefix("file://")).exists()),
"Expected blocks not in write ahead log"
)
require(
segments.take(numPartitionssInBM).forall(s => !new File(s.path.stripPrefix("file://")).exists()),
segments.take(numPartitionssInBM).forall(s =>
!new File(s.path.stripPrefix("file://")).exists()),
"Unexpected blocks in write ahead log"
)

Expand All @@ -128,7 +135,10 @@ class WriteAheadLogBackedBlockRDDSuite extends FunSuite with BeforeAndAfterAll {
}
}

private def writeLogSegments(blockData: Seq[Seq[String]], blockIds: Seq[BlockId]): Seq[WriteAheadLogFileSegment] = {
private def writeLogSegments(
blockData: Seq[Seq[String]],
blockIds: Seq[BlockId]
): Seq[WriteAheadLogFileSegment] = {
require(blockData.size === blockIds.size)
val writer = new WriteAheadLogWriter(new File(dir, Random.nextString(10)).toString, hadoopConf)
val segments = blockData.zip(blockIds).map { case (data, id) =>
Expand Down

0 comments on commit 29aa099

Please sign in to comment.