Skip to content

Commit

Permalink
Changed segment to handle
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 30, 2015
1 parent 466212c commit 637bc9c
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,15 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

// Is WAL segment info present with all the blocks
val isWALSegmentInfoPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

if (isWALSegmentInfoPresent) {
// If all the blocks have WAL segment info, then create a WALBackedBlockRDD
if (areWALRecordHandlesPresent) {
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
val blockWALSegments = blockInfos.map { _.walRecordHandleOption.get }.toArray
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, blockIds, blockWALSegments, isBlockIdValid)
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
} else {
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
// then that is unexpected and log a warning accordingly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.streaming.util._
/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
* It contains information about the id of the blocks having this partition's data and
* the segment of the write ahead log that backs the partition.
* the corresponding record handle in the write ahead log that backs the partition.
* @param index index of the partition
* @param blockId id of the block having the partition data
* @param walRecordHandle Handle of the record in a write ahead log having the partition data
Expand All @@ -49,9 +49,9 @@ class WriteAheadLogBackedBlockRDDPartition(
* This class represents a special case of the BlockRDD where the data blocks in
* the block manager are also backed by data in write ahead logs. For reading
* the data, this RDD first looks up the blocks by their ids in the block manager.
* If it does not find them, it looks up the corresponding file segment. The finding
* of the blocks by their ids can be skipped by setting the corresponding element in
* isBlockIdValid to false. This is a performance optimization which does not affect
* If it does not find them, it looks up the WAL using the corresponding record handle.
* The lookup of the blocks from the block manager can be skipped by setting the corresponding
* element in isBlockIdValid to false. This is a performance optimization which does not affect
* correctness, and it can be used in situations where it is known that the block
* does not exist in the Spark executors (e.g. after a failed driver is restarted).
*
Expand All @@ -62,7 +62,8 @@ class WriteAheadLogBackedBlockRDDPartition(
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
* executors). If not, then block lookups by the block ids will be skipped.
* By default, this is an empty array signifying true for all the blocks.
* @param storeInBlockManager Whether to store in the block manager after reading from the segment
* @param storeInBlockManager Whether to store a block in the block manager
* after reading it from the WAL
* @param storageLevel storage level to store when storing in block manager
* (applicable when storeInBlockManager = true)
*/
Expand All @@ -79,7 +80,7 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
require(
blockIds.length == walRecordHandles.length,
s"Number of block Ids (${blockIds.length}) must be " +
s" same as number of segments (${walRecordHandles.length}})")
s" same as number of WAL record handles (${walRecordHandles.length}})")

require(
isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class WriteAheadLogBackedBlockRDDSuite
blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER)
}

// Generate write ahead log file segments
// Generate write ahead log record handles
val recordHandles = generateFakeRecordHandles(numPartitionsInBM) ++
generateWALRecordHandles(data.takeRight(numPartitionsInWAL),
blockIds.takeRight(numPartitionsInWAL))
Expand Down

0 comments on commit 637bc9c

Please sign in to comment.