Skip to content

Commit

Permalink
Renamed segment to record handle everywhere
Browse files Browse the repository at this point in the history
  • Loading branch information
tdas committed Apr 29, 2015
1 parent b65e155 commit bde26b1
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,16 @@
@org.apache.spark.annotation.DeveloperApi
public abstract class WriteAheadLog {
/**
* Write the record to the log and return the segment information that is necessary to read
* back the written record. The time is used to the index the record, such that it can be
* cleaned later. Note that the written data must be durable and readable (using the
* segment info) by the time this function returns.
* Write the record to the log and return a record handle, which contains all the information
* necessary to read back the written record. The time is used to the index the record,
* such that it can be cleaned later. Note that implementations of this abstract class must
* ensure that the written data is durable and readable (using the record handle) by the
* time this function returns.
*/
abstract public WriteAheadLogRecordHandle write(ByteBuffer record, long time);

/**
* Read a written record based on the given segment information.
* Read a written record based on the given record handle.
*/
abstract public ByteBuffer read(WriteAheadLogRecordHandle handle);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
// WriteAheadLogBackedBlockRDD else create simple BlockRDD.
if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
val logSegments = blockStoreResults.map {
_.asInstanceOf[WriteAheadLogBasedStoreResult].segment
_.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
}.toArray
// Since storeInBlockManager = false, the storage level does not matter.
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,42 +34,43 @@ import org.apache.spark.streaming.util._
* the segment of the write ahead log that backs the partition.
* @param index index of the partition
* @param blockId id of the block having the partition data
* @param segment segment of the write ahead log having the partition data
* @param walRecordHandle Handle of the record in a write ahead log having the partition data
*/
private[streaming]
class WriteAheadLogBackedBlockRDDPartition(
val index: Int,
val blockId: BlockId,
val segment: WriteAheadLogRecordHandle)
val walRecordHandle: WriteAheadLogRecordHandle)
extends Partition


/**
* This class represents a special case of the BlockRDD where the data blocks in
* the block manager are also backed by segments in write ahead logs. For reading
* the block manager are also backed by data in write ahead logs. For reading
* the data, this RDD first looks up the blocks by their ids in the block manager.
* If it does not find them, it looks up the corresponding file segment.
* If it does not find them, it looks up the corresponding data in the write ahead log.
*
* @param sc SparkContext
* @param blockIds Ids of the blocks that contains this RDD's data
* @param segments Segments in write ahead logs that contain this RDD's data
* @param storeInBlockManager Whether to store in the block manager after reading from the segment
* @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
* @param storeInBlockManager Whether to store in the block manager after reading
* from the WAL record
* @param storageLevel storage level to store when storing in block manager
* (applicable when storeInBlockManager = true)
*/
private[streaming]
class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient blockIds: Array[BlockId],
@transient segments: Array[WriteAheadLogRecordHandle],
@transient walRecordHandles: Array[WriteAheadLogRecordHandle],
storeInBlockManager: Boolean,
storageLevel: StorageLevel)
extends BlockRDD[T](sc, blockIds) {

require(
blockIds.length == segments.length,
blockIds.length == walRecordHandles.length,
s"Number of block ids (${blockIds.length}) must be " +
s"the same as number of segments (${segments.length}})!")
s"the same as number of WAL record handles (${walRecordHandles.length}})!")

// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
Expand All @@ -78,13 +79,13 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
assertValid()
Array.tabulate(blockIds.size) { i =>
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), segments(i))
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i))
}
}

/**
* Gets the partition data by getting the corresponding block from the block manager.
* If the block does not exist, then the data is read from the corresponding segment
* If the block does not exist, then the data is read from the corresponding record
* in write ahead log files.
*/
override def compute(split: Partition, context: TaskContext): Iterator[T] = {
Expand All @@ -105,22 +106,23 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
val dummyDirectory = FileUtils.getTempDirectoryPath()
writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
SparkEnv.get.conf, dummyDirectory, hadoopConf)
dataRead = writeAheadLog.read(partition.segment)
dataRead = writeAheadLog.read(partition.walRecordHandle)
} catch {
case NonFatal(e) =>
throw new SparkException(
s"Could not read data from write ahead log segment ${partition.segment}", e)
s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
} finally {
if (writeAheadLog != null) {
writeAheadLog.close()
}
}
if (dataRead == null) {
throw new SparkException(
s"Could not read data from write ahead log segment ${partition.segment}, " +
s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
s"read returned null")
}
logInfo(s"Read partition data of $this from write ahead log, segment ${partition.segment}")
logInfo(s"Read partition data of $this from write ahead log, record handle " +
partition.walRecordHandle)
if (storeInBlockManager) {
blockManager.putBytes(blockId, dataRead, storageLevel)
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
Expand All @@ -132,14 +134,14 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](

/**
* Get the preferred location of the partition. This returns the locations of the block
* if it is present in the block manager, else it returns the location of the
* corresponding segment in HDFS.
* if it is present in the block manager, else if FileBasedWriteAheadLogSegment is used,
* it returns the location of the corresponding file segment in HDFS .
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockLocations = getBlockIdLocations().get(partition.blockId)
blockLocations.getOrElse {
partition.segment match {
partition.walRecordHandle match {
case fileSegment: FileBasedWriteAheadLogSegment =>
HdfsUtils.getFileSegmentLocations(
fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private[streaming] class BlockManagerBasedBlockHandler(
*/
private[streaming] case class WriteAheadLogBasedStoreResult(
blockId: StreamBlockId,
segment: WriteAheadLogRecordHandle
walRecordHandle: WriteAheadLogRecordHandle
) extends ReceivedBlockStoreResult


Expand Down Expand Up @@ -178,10 +178,10 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
writeAheadLog.write(serializedBlock, clock.getTimeMillis())
}

// Combine the futures, wait for both to complete, and return the write ahead log segment
// Combine the futures, wait for both to complete, and return the write ahead log record handle
val combinedFuture = storeInBlockManagerFuture.zip(storeInWriteAheadLogFuture).map(_._2)
val segment = Await.result(combinedFuture, blockStoreTimeout)
WriteAheadLogBasedStoreResult(blockId, segment)
val walRecordHandle = Await.result(combinedFuture, blockStoreTimeout)
WriteAheadLogBasedStoreResult(blockId, walRecordHandle)
}

def cleanupOldBlocks(threshTime: Long) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ class ReceivedBlockHandlerSuite extends FunSuite with BeforeAndAfter with Matche
"Unexpected store result type"
)
// Verify the data in write ahead log files is correct
val walSegments = storeResults.map { _.asInstanceOf[WriteAheadLogBasedStoreResult].segment}
val walSegments = storeResults.map { _.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle}
val loggedData = walSegments.flatMap { walSegment =>
val fileSegment = walSegment.asInstanceOf[FileBasedWriteAheadLogSegment]
val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ class WriteAheadLogBackedBlockRDDSuite
blockManager.putIterator(blockId, block.iterator, StorageLevel.MEMORY_ONLY_SER)
}

// Generate write ahead log segments
val segments = generateFakeSegments(numPartitionsInBM) ++
writeLogSegments(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL))
// Generate write ahead log file segments
val recordHandles = generateFakeRecordHandles(numPartitionsInBM) ++
generateWALRecordHandles(data.takeRight(numPartitionsInWAL), blockIds.takeRight(numPartitionsInWAL))

// Make sure that the left `numPartitionsInBM` blocks are in block manager, and others are not
require(
Expand All @@ -116,24 +116,24 @@ class WriteAheadLogBackedBlockRDDSuite

// Make sure that the right `numPartitionsInWAL` blocks are in WALs, and other are not
require(
segments.takeRight(numPartitionsInWAL).forall(s =>
recordHandles.takeRight(numPartitionsInWAL).forall(s =>
new File(s.path.stripPrefix("file://")).exists()),
"Expected blocks not in write ahead log"
)
require(
segments.take(numPartitionsInBM).forall(s =>
recordHandles.take(numPartitionsInBM).forall(s =>
!new File(s.path.stripPrefix("file://")).exists()),
"Unexpected blocks in write ahead log"
)

// Create the RDD and verify whether the returned data is correct
val rdd = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
segments.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY)
recordHandles.toArray, storeInBlockManager = false, StorageLevel.MEMORY_ONLY)
assert(rdd.collect() === data.flatten)

if (testStoreInBM) {
val rdd2 = new WriteAheadLogBackedBlockRDD[String](sparkContext, blockIds.toArray,
segments.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY)
recordHandles.toArray, storeInBlockManager = true, StorageLevel.MEMORY_ONLY)
assert(rdd2.collect() === data.flatten)
assert(
blockIds.forall(blockManager.get(_).nonEmpty),
Expand All @@ -142,7 +142,7 @@ class WriteAheadLogBackedBlockRDDSuite
}
}

private def writeLogSegments(
private def generateWALRecordHandles(
blockData: Seq[Seq[String]],
blockIds: Seq[BlockId]
): Seq[FileBasedWriteAheadLogSegment] = {
Expand All @@ -155,7 +155,7 @@ class WriteAheadLogBackedBlockRDDSuite
segments
}

private def generateFakeSegments(count: Int): Seq[FileBasedWriteAheadLogSegment] = {
private def generateFakeRecordHandles(count: Int): Seq[FileBasedWriteAheadLogSegment] = {
Array.fill(count)(new FileBasedWriteAheadLogSegment("random", 0L, 0))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ object WriteAheadLogSuite {

class MockWriteAheadLog0() extends WriteAheadLog {
override def write(record: ByteBuffer, time: Long): WriteAheadLogRecordHandle = { null }
override def read(segment: WriteAheadLogRecordHandle): ByteBuffer = { null }
override def read(handle: WriteAheadLogRecordHandle): ByteBuffer = { null }
override def readAll(): util.Iterator[ByteBuffer] = { null }
override def clean(threshTime: Long, waitForCompletion: Boolean): Unit = { }
override def close(): Unit = { }
Expand Down

0 comments on commit bde26b1

Please sign in to comment.