Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-7139][Streaming] Allow received block metadata to be saved to WAL and recovered on driver failure #5732

Closed
wants to merge 8 commits into from
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/BlockRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ private[spark]
class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds: Array[BlockId])
extends RDD[T](sc, Nil) {

@transient lazy val locations_ = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@transient lazy val _locations = BlockManager.blockIdsToHosts(blockIds, SparkEnv.get)
@volatile private var _isValid = true

override def getPartitions: Array[Partition] = {
Expand All @@ -54,7 +54,7 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds

override def getPreferredLocations(split: Partition): Seq[String] = {
assertValid()
locations_(split.asInstanceOf[BlockRDDPartition].blockId)
_locations(split.asInstanceOf[BlockRDDPartition].blockId)
}

/**
Expand All @@ -79,14 +79,14 @@ class BlockRDD[T: ClassTag](@transient sc: SparkContext, @transient val blockIds

/** Check if this BlockRDD is valid. If not valid, exception is thrown. */
private[spark] def assertValid() {
if (!_isValid) {
if (!isValid) {
throw new SparkException(
"Attempted to use %s after its blocks have been removed!".format(toString))
}
}

protected def getBlockIdLocations(): Map[BlockId, Seq[String]] = {
locations_
_locations
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package org.apache.spark.streaming.dstream
import scala.reflect.ClassTag

import org.apache.spark.rdd.{BlockRDD, RDD}
import org.apache.spark.storage.{BlockId, StorageLevel}
import org.apache.spark.storage.BlockId
import org.apache.spark.streaming._
import org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD
import org.apache.spark.streaming.receiver.{Receiver, WriteAheadLogBasedStoreResult}
import org.apache.spark.streaming.scheduler.{InputInfo, ReceivedBlockInfo}
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.streaming.util.WriteAheadLogUtils

/**
* Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]]
Expand Down Expand Up @@ -64,31 +64,30 @@ abstract class ReceiverInputDStream[T: ClassTag](@transient ssc_ : StreamingCont
} else {
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val blockInfos =
ssc.scheduler.receiverTracker.getBlocksOfBatch(validTime).get(id).getOrElse(Seq.empty)
val blockStoreResults = blockInfos.map { _.blockStoreResult }
val blockIds = blockStoreResults.map { _.blockId.asInstanceOf[BlockId] }.toArray
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

// Register the input blocks information into InputInfoTracker
val inputInfo = InputInfo(id, blockInfos.map(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// Are WAL record handles present with all the blocks
val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do wonder if this should just throw an exception if any have missing wal records... that just means there is an exception right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or rather, can we tell whether the WAL is in use with this receiver and then just throw an exception if we see any blocks that do not have a record handle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can do that. Since it is a conf property.


// Check whether all the results are of the same type
val resultTypes = blockStoreResults.map { _.getClass }.distinct
if (resultTypes.size > 1) {
logWarning("Multiple result types in block information, WAL information will be ignored.")
}

// If all the results are of type WriteAheadLogBasedStoreResult, then create
// WriteAheadLogBackedBlockRDD else create simple BlockRDD.
if (resultTypes.size == 1 && resultTypes.head == classOf[WriteAheadLogBasedStoreResult]) {
val logSegments = blockStoreResults.map {
_.asInstanceOf[WriteAheadLogBasedStoreResult].walRecordHandle
}.toArray
// Since storeInBlockManager = false, the storage level does not matter.
new WriteAheadLogBackedBlockRDD[T](ssc.sparkContext,
blockIds, logSegments, storeInBlockManager = false, StorageLevel.MEMORY_ONLY_SER)
if (areWALRecordHandlesPresent) {
// If all the blocks have WAL record handle, then create a WALBackedBlockRDD
val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
new WriteAheadLogBackedBlockRDD[T](
ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
} else {
// Else, create a BlockRDD. However, if there are some blocks with WAL info but not others
// then that is unexpected and log a warning accordingly.
if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
logError("Some blocks do not have Write Ahead Log information; " +
"this is unexpected and data may not be recoverable after driver failures")
} else {
logWarning("Some blocks have Write Ahead Log information; this is unexpected")
}
}
new BlockRDD[T](ssc.sc, blockIds)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.util.UUID
import scala.reflect.ClassTag
import scala.util.control.NonFatal

import org.apache.commons.io.FileUtils

import org.apache.spark._
import org.apache.spark.rdd.BlockRDD
import org.apache.spark.storage.{BlockId, StorageLevel}
Expand All @@ -31,30 +33,42 @@ import org.apache.spark.streaming.util._
/**
* Partition class for [[org.apache.spark.streaming.rdd.WriteAheadLogBackedBlockRDD]].
* It contains information about the id of the blocks having this partition's data and
* the segment of the write ahead log that backs the partition.
* the corresponding record handle in the write ahead log that backs the partition.
* @param index index of the partition
* @param blockId id of the block having the partition data
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
* executors). If not, then block lookups by the block ids will be skipped.
* By default, this is an empty array signifying true for all the blocks.
* @param walRecordHandle Handle of the record in a write ahead log having the partition data
*/
private[streaming]
class WriteAheadLogBackedBlockRDDPartition(
val index: Int,
val blockId: BlockId,
val walRecordHandle: WriteAheadLogRecordHandle)
extends Partition
val isBlockIdValid: Boolean,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add this param to the scaladoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch.

val walRecordHandle: WriteAheadLogRecordHandle
) extends Partition


/**
* This class represents a special case of the BlockRDD where the data blocks in
* the block manager are also backed by data in write ahead logs. For reading
* the data, this RDD first looks up the blocks by their ids in the block manager.
* If it does not find them, it looks up the corresponding data in the write ahead log.
* If it does not find them, it looks up the WAL using the corresponding record handle.
* The lookup of the blocks from the block manager can be skipped by setting the corresponding
* element in isBlockIdValid to false. This is a performance optimization which does not affect
* correctness, and it can be used in situations where it is known that the block
* does not exist in the Spark executors (e.g. after a failed driver is restarted).
*
*
* @param sc SparkContext
* @param blockIds Ids of the blocks that contains this RDD's data
* @param walRecordHandles Record handles in write ahead logs that contain this RDD's data
* @param storeInBlockManager Whether to store in the block manager after reading
* from the WAL record
* @param isBlockIdValid Whether the block Ids are valid (i.e., the blocks are present in the Spark
* executors). If not, then block lookups by the block ids will be skipped.
* By default, this is an empty array signifying true for all the blocks.
* @param storeInBlockManager Whether to store a block in the block manager
* after reading it from the WAL
* @param storageLevel storage level to store when storing in block manager
* (applicable when storeInBlockManager = true)
*/
Expand All @@ -63,23 +77,32 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
@transient sc: SparkContext,
@transient blockIds: Array[BlockId],
@transient walRecordHandles: Array[WriteAheadLogRecordHandle],
storeInBlockManager: Boolean,
storageLevel: StorageLevel)
@transient isBlockIdValid: Array[Boolean] = Array.empty,
storeInBlockManager: Boolean = false,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER)
extends BlockRDD[T](sc, blockIds) {

require(
blockIds.length == walRecordHandles.length,
s"Number of block ids (${blockIds.length}) must be " +
s"the same as number of WAL record handles (${walRecordHandles.length}})!")
s"Number of block Ids (${blockIds.length}) must be " +
s" same as number of WAL record handles (${walRecordHandles.length}})")

require(
isBlockIdValid.isEmpty || isBlockIdValid.length == blockIds.length,
s"Number of elements in isBlockIdValid (${isBlockIdValid.length}) must be " +
s" same as number of block Ids (${blockIds.length})")

// Hadoop configuration is not serializable, so broadcast it as a serializable.
@transient private val hadoopConfig = sc.hadoopConfiguration
private val broadcastedHadoopConf = new SerializableWritable(hadoopConfig)

override def isValid(): Boolean = true

override def getPartitions: Array[Partition] = {
assertValid()
Array.tabulate(blockIds.size) { i =>
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), walRecordHandles(i))
Array.tabulate(blockIds.length) { i =>
val isValid = if (isBlockIdValid.length == 0) true else isBlockIdValid(i)
new WriteAheadLogBackedBlockRDDPartition(i, blockIds(i), isValid, walRecordHandles(i))
}
}

Expand All @@ -94,51 +117,57 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
val blockManager = SparkEnv.get.blockManager
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockId = partition.blockId
blockManager.get(blockId) match {
case Some(block) => // Data is in Block Manager
val iterator = block.data.asInstanceOf[Iterator[T]]
logDebug(s"Read partition data of $this from block manager, block $blockId")
iterator
case None => // Data not found in Block Manager, grab it from write ahead log file
var dataRead: ByteBuffer = null
var writeAheadLog: WriteAheadLog = null
try {
// The WriteAheadLogUtils.createLog*** method needs a directory to create a
// WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
// writing log data. However, the directory is not needed if data needs to be read, hence
// a dummy path is provided to satisfy the method parameter requirements.
// FileBasedWriteAheadLog will not create any file or directory at that path.
// FileBasedWriteAheadLog will not create any file or directory at that path. Also,
// this dummy directory should not already exist otherwise the WAL will try to recover
// past events from the directory and throw errors.
val nonExistentDirectory = new File(
System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
dataRead = writeAheadLog.read(partition.walRecordHandle)
} catch {
case NonFatal(e) =>
throw new SparkException(
s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
} finally {
if (writeAheadLog != null) {
writeAheadLog.close()
writeAheadLog = null
}
}
if (dataRead == null) {

def getBlockFromBlockManager(): Option[Iterator[T]] = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole thing just puts the old code within inner functions, not really a big change. The real change is that these functions are called selectively in lines 157..160.

blockManager.get(blockId).map(_.data.asInstanceOf[Iterator[T]])
}

def getBlockFromWriteAheadLog(): Iterator[T] = {
var dataRead: ByteBuffer = null
var writeAheadLog: WriteAheadLog = null
try {
// The WriteAheadLogUtils.createLog*** method needs a directory to create a
// WriteAheadLog object as the default FileBasedWriteAheadLog needs a directory for
// writing log data. However, the directory is not needed if data needs to be read, hence
// a dummy path is provided to satisfy the method parameter requirements.
// FileBasedWriteAheadLog will not create any file or directory at that path.
// FileBasedWriteAheadLog will not create any file or directory at that path. Also,
// this dummy directory should not already exist otherwise the WAL will try to recover
// past events from the directory and throw errors.
val nonExistentDirectory = new File(
System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath
writeAheadLog = WriteAheadLogUtils.createLogForReceiver(
SparkEnv.get.conf, nonExistentDirectory, hadoopConf)
dataRead = writeAheadLog.read(partition.walRecordHandle)
} catch {
case NonFatal(e) =>
throw new SparkException(
s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
s"read returned null")
s"Could not read data from write ahead log record ${partition.walRecordHandle}", e)
} finally {
if (writeAheadLog != null) {
writeAheadLog.close()
writeAheadLog = null
}
logInfo(s"Read partition data of $this from write ahead log, record handle " +
partition.walRecordHandle)
if (storeInBlockManager) {
blockManager.putBytes(blockId, dataRead, storageLevel)
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
}
if (dataRead == null) {
throw new SparkException(
s"Could not read data from write ahead log record ${partition.walRecordHandle}, " +
s"read returned null")
}
logInfo(s"Read partition data of $this from write ahead log, record handle " +
partition.walRecordHandle)
if (storeInBlockManager) {
blockManager.putBytes(blockId, dataRead, storageLevel)
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
dataRead.rewind()
}
blockManager.dataDeserialize(blockId, dataRead).asInstanceOf[Iterator[T]]
}

if (partition.isBlockIdValid) {
getBlockFromBlockManager().getOrElse { getBlockFromWriteAheadLog() }
} else {
getBlockFromWriteAheadLog()
}
}

Expand All @@ -149,12 +178,23 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
*/
override def getPreferredLocations(split: Partition): Seq[String] = {
val partition = split.asInstanceOf[WriteAheadLogBackedBlockRDDPartition]
val blockLocations = getBlockIdLocations().get(partition.blockId)
val blockLocations = if (partition.isBlockIdValid) {
getBlockIdLocations().get(partition.blockId)
} else {
None
}

blockLocations.getOrElse {
partition.walRecordHandle match {
case fileSegment: FileBasedWriteAheadLogSegment =>
HdfsUtils.getFileSegmentLocations(
fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
try {
HdfsUtils.getFileSegmentLocations(
fileSegment.path, fileSegment.offset, fileSegment.length, hadoopConfig)
} catch {
case NonFatal(e) =>
logError("Error getting WAL file segment locations", e)
Seq.empty
}
case _ =>
Seq.empty
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private[streaming] class ReceiverSupervisorImpl(
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")

val blockInfo = ReceivedBlockInfo(streamId, numRecords, blockStoreResult)
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult)
trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))
logDebug(s"Reported block $blockId")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,38 @@

package org.apache.spark.streaming.scheduler

import org.apache.spark.streaming.receiver.ReceivedBlockStoreResult
import org.apache.spark.storage.StreamBlockId
import org.apache.spark.streaming.receiver.{ReceivedBlockStoreResult, WriteAheadLogBasedStoreResult}
import org.apache.spark.streaming.util.WriteAheadLogRecordHandle

/** Information about blocks received by the receiver */
private[streaming] case class ReceivedBlockInfo(
streamId: Int,
numRecords: Long,
metadataOption: Option[Any],
blockStoreResult: ReceivedBlockStoreResult
)
) {

@volatile private var _isBlockIdValid = true

def blockId: StreamBlockId = blockStoreResult.blockId

def walRecordHandleOption: Option[WriteAheadLogRecordHandle] = {
blockStoreResult match {
case walStoreResult: WriteAheadLogBasedStoreResult => Some(walStoreResult.walRecordHandle)
case _ => None
}
}

/** Is the block ID valid, that is, is the block present in the Spark executors. */
def isBlockIdValid(): Boolean = _isBlockIdValid

/**
* Set the block ID as invalid. This is useful when it is known that the block is not present
* in the Spark executors.
*/
def setBlockIdInvalid(): Unit = {
_isBlockIdValid = false
}
}

Loading