Skip to content

Commit

Permalink
[SPARK-7139] [STREAMING] Allow received block metadata to be saved to…
Browse files Browse the repository at this point in the history
… WAL and recovered on driver failure

- Enabled ReceivedBlockTracker WAL by default
- Stored block metadata in the WAL
- Optimized WALBackedBlockRDD by skipping block fetch when the block is known to not exist in Spark

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5732 from tdas/SPARK-7139 and squashes the following commits:

575476e [Tathagata Das] Added more tests to get 100% coverage of the WALBackedBlockRDD
19668ba [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
685fab3 [Tathagata Das] Addressed comments in PR
637bc9c [Tathagata Das] Changed segment to handle
466212c [Tathagata Das] Merge remote-tracking branch 'apache-github/master' into SPARK-7139
5f67a59 [Tathagata Das] Fixed HdfsUtils to handle append in local file system
1bc5bc3 [Tathagata Das] Fixed bug on unexpected recovery
d06fa21 [Tathagata Das] Enabled ReceivedBlockTracker by default, stored block metadata and optimized block fetching in WALBackedBlockRDD
  • Loading branch information
tdas committed May 5, 2015
1 parent c5790a2 commit 1854ac3
Show file tree
Hide file tree
Showing 10 changed files with 281 additions and 153 deletions.
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[spark]
class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {

@transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@volatile private var _isValid = true

override def getPartitions: Array[Partition] = {
Expand All @@ -54,7 +54,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds

override def getPreferredLocations(split: Partition): Seq[String] = {
assertValid()
locations_(split.asInstanceOf[BlockRDDPartition].blockId)
_locations(split.asInstanceOf[BlockRDDPartition].blockId)
}

/**
Expand All @@ -79,14 +79,14 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds

/** Check if this BlockRDD is valid. If not valid, exception is thrown. */
private[spark] def assertValid() {
if (!_isValid) {
if (!isValid) {
throw new SparkException(
"Attempted to use %s after its blocks have been removed!".format(toString))
}
}

protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
locations_
_locations
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package org.apache.spark.streaming.dstream
import scala.reflect.ClassTag

import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.{Receiver, WriteAheadLogBasedStoreResult}
import org.apache.spark.streaming.scheduler.{InputInfo, ReceivedBlockInfo}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.util.WriteAheadLogUtils

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand Down Expand Up @@ -64,31 +64,30 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val blockInfos =
ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
val blockStoreResults = blockInfos.map { _.blockStoreResult }
val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

// Register the input blocks information into InputInfoTracker
val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

// Check whether all the results are of the same type
val resultTypes = blockStoreResults.map { _.getClass }.distinct
if (resultTypes.size > 1) {
logWarning("Multiple result types in block information, WAL information will be ignored.")
}

// If all the results are of type WriteAheadLogBasedStoreResult, then create
// WriteAheadLogBackedBlockRDD else create simple BlockRDD.
if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
val logSegments = blockStoreResults.map {
_.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
}.toArray
// Since storeInBlockManager = false, the storage level does not matter.
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
if (areWALRecordHandlesPresent) {
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
} else {
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
// then that is unexpected and log a warning accordingly.
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logError("Some blocks do not have Write Ahead Log information; " +
"this is unexpected and data may not be recoverable after driver failures")
} else {
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
}
}
new BlockRDD[T](ssc.sc, blockIds)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.util.UUID
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.commons.io.FileUtils

import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
Expand All @@ -31,30 +33,42 @@ import org.apache.spark.streaming.util._
/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
* It contains information about the id of the blocks having this partition's data and
* the segment of the write ahead log that backs the partition.
* the corresponding record handle in the write ahead log that backs the partition.
* @param index index of the partition
* @param blockId id of the block having the partition data
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
* executors). If not, then block lookups by the block ids will be skipped.
* By default, this is an empty array signifying true for all the blocks.
* @param walRecordHandle Handle of the record in a write ahead log having the partition data
*/
private[streaming]
class WriteAheadLogBackedBlockRDDPartition(
val index: Int,
val blockId: BlockId,
val walRecordHandle: WriteAheadLogRecordHandle)
extends Partition
val isBlockIdValid: Boolean,
val walRecordHandle: WriteAheadLogRecordHandle
) extends Partition


/**
* This class represents a special case of the BlockRDD where the data blocks in
* the block manager are also backed by data in write ahead logs. For reading
* the data, this RDD first looks up the blocks by their ids in the block manager.
* If it does not find them, it looks up the corresponding data in the write ahead log.
* If it does not find them, it looks up the WAL using the corresponding record handle.
* The lookup of the blocks from the block manager can be skipped by setting the corresponding
* element in isBlockIdValid to false. This is a performance optimization which does not affect
* correctness, and it can be used in situations where it is known that the block
* does not exist in the Spark executors (e.g. after a failed driver is restarted).
*
*
* @param sc SparkContext
* @param blockIds Ids of the blocks that contains this RDD's data
* @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
* @param storeInBlockManager Whether to store in the block manager after reading
* from the WAL record
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
* executors). If not, then block lookups by the block ids will be skipped.
* By default, this is an empty array signifying true for all the blocks.
* @param storeInBlockManager Whether to store a block in the block manager
* after reading it from the WAL
* @param storageLevel storage level to store when storing in block manager
* (applicable when storeInBlockManager = true)
*/
Expand All @@ -63,23 +77,32 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient blockIds: Array[BlockId],
@transient walRecordHandles: Array[WriteAheadLogRecordHandle],
storeInBlockManager: Boolean,
storageLevel: StorageLevel)
@transient isBlockIdValid: Array[Boolean] = Array.empty,
storeInBlockManager: Boolean = false,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
extends BlockRDD[T](sc, blockIds) {

require(
blockIds.length == walRecordHandles.length,
s"Number of block ids (${blockIds.length}) must be " +
s"the same as number of WAL record handles (${walRecordHandles.length}})!")
s"Number of block Ids (${blockIds.length}) must be " +
s" same as number of WAL record handles (${walRecordHandles.length}})")

require(
isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
s"Number of elements in isBlockIdValid (${isBlockIdValid.length}) must be " +
s" same as number of block Ids (${blockIds.length})")

// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)

override def isValid(): Boolean = true

override def getPartitions: Array[Partition] = {
assertValid()
Array.tabulate(blockIds.size) { i =>
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i))
Array.tabulate(blockIds.length) { i =>
val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), isValid, walRecordHandles(i))
}
}

Expand All @@ -94,51 +117,57 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
val blockManager = SparkEnv.get.blockManager
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockId = partition.blockId
blockManager.get(blockId) match {
case Some(block) => // Data is in Block Manager
val iterator = block.data.asInstanceOf[Iterator[T]]
logDebug(s"Read partition data of $this from block manager, block $blockId")
iterator
case None => // Data not found in Block Manager, grab it from write ahead log file
var dataRead: ByteBuffer = null
var writeAheadLog: WriteAheadLog = null
try {
// The WriteAheadLogUtils.createLog*** method needs a directory to create a
// WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
// writing log data. However, the directory is not needed if data needs to be read, hence
// a dummy path is provided to satisfy the method parameter requirements.
// FileBasedWriteAheadLog will not create any file or directory at that path.
// FileBasedWriteAheadLog will not create any file or directory at that path. Also,
// this dummy directory should not already exist otherwise the WAL will try to recover
// past events from the directory and throw errors.
val nonExistentDirectory = new File(
System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
dataRead = writeAheadLog.read(partition.walRecordHandle)
} catch {
case NonFatal(e) =>
throw new SparkException(
s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
} finally {
if (writeAheadLog != null) {
writeAheadLog.close()
writeAheadLog = null
}
}
if (dataRead == null) {

def getBlockFromBlockManager(): Option[Iterator[T]] = {
blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
}

def getBlockFromWriteAheadLog(): Iterator[T] = {
var dataRead: ByteBuffer = null
var writeAheadLog: WriteAheadLog = null
try {
// The WriteAheadLogUtils.createLog*** method needs a directory to create a
// WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
// writing log data. However, the directory is not needed if data needs to be read, hence
// a dummy path is provided to satisfy the method parameter requirements.
// FileBasedWriteAheadLog will not create any file or directory at that path.
// FileBasedWriteAheadLog will not create any file or directory at that path. Also,
// this dummy directory should not already exist otherwise the WAL will try to recover
// past events from the directory and throw errors.
val nonExistentDirectory = new File(
System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
dataRead = writeAheadLog.read(partition.walRecordHandle)
} catch {
case NonFatal(e) =>
throw new SparkException(
s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
s"read returned null")
s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
} finally {
if (writeAheadLog != null) {
writeAheadLog.close()
writeAheadLog = null
}
logInfo(s"Read partition data of $this from write ahead log, record handle " +
partition.walRecordHandle)
if (storeInBlockManager) {
blockManager.putBytes(blockId, dataRead, storageLevel)
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
}
if (dataRead == null) {
throw new SparkException(
s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
s"read returned null")
}
logInfo(s"Read partition data of $this from write ahead log, record handle " +
partition.walRecordHandle)
if (storeInBlockManager) {
blockManager.putBytes(blockId, dataRead, storageLevel)
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
}

if (partition.isBlockIdValid) {
getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() }
} else {
getBlockFromWriteAheadLog()
}
}

Expand All @@ -149,12 +178,23 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockLocations = getBlockIdLocations().get(partition.blockId)
val blockLocations = if (partition.isBlockIdValid) {
getBlockIdLocations().get(partition.blockId)
} else {
None
}

blockLocations.getOrElse {
partition.walRecordHandle match {
case fileSegment: FileBasedWriteAheadLogSegment =>
HdfsUtils.getFileSegmentLocations(
fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
try {
HdfsUtils.getFileSegmentLocations(
fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
} catch {
case NonFatal(e) =>
logError("Error getting WAL file segment locations", e)
Seq.empty
}
case _ =>
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private[streaming] class ReceiverSupervisorImpl(
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")

val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,38 @@

package org.apache.spark.streaming.scheduler

import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.{ReceivedBlockStoreResult, WriteAheadLogBasedStoreResult}
import org.apache.spark.streaming.util.WriteAheadLogRecordHandle

/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Long,
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
)
) {

@volatile private var _isBlockIdValid = true

def blockId: StreamBlockId = blockStoreResult.blockId

def walRecordHandleOption: Option[WriteAheadLogRecordHandle] = {
blockStoreResult match {
case walStoreResult: WriteAheadLogBasedStoreResult => Some(walStoreResult.walRecordHandle)
case _ => None
}
}

/** Is the block ID valid, that is, is the block present in the Spark executors. */
def isBlockIdValid(): Boolean = _isBlockIdValid

/**
* Set the block ID as invalid. This is useful when it is known that the block is not present
* in the Spark executors.
*/
def setBlockIdInvalid(): Unit = {
_isBlockIdValid = false
}
}

Loading

0 comments on commit 1854ac3

Please sign in to comment.