Skip to content

Commit

Permalink
Added failure handling and fixed unit tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
rxin committed Sep 2, 2014
1 parent ae05fcd commit 98c668a
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ trait BlockFetchingListener extends EventListener {
/**
* Called upon failures.
*/
def onBlockFetchFailure(exception: Exception): Unit
def onBlockFetchFailure(exception: Throwable): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ abstract class BlockTransferService {
def hostName: String

/**
* Fetch a sequence of blocks from a remote node, available only after [[init]] is invoked.
* Fetch a sequence of blocks from a remote node asynchronously,
* available only after [[init]] is invoked.
*
* Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block,
* while [[BlockFetchingListener.onBlockFetchSuccess]] is called once per failure.
*
* This takes a sequence so the implementation can batch requests.
*/
Expand All @@ -55,19 +59,17 @@ abstract class BlockTransferService {
listener: BlockFetchingListener): Unit

/**
* Fetch a single block from a remote node, available only after [[init]] is invoked.
*
* This is functionally equivalent to
* {{{
* fetchBlocks(hostName, port, Seq(blockId)).iterator().next()._2
* }}}
* Fetch a single block from a remote node, synchronously,
* available only after [[init]] is invoked.
*/
def fetchBlock(hostName: String, port: Int, blockId: String): ManagedBuffer = {
// TODO(rxin): Add timeout?

// A monitor for the thread to wait on.
val lock = new Object
@volatile var result: Either[ManagedBuffer, Exception] = null
@volatile var result: Either[ManagedBuffer, Throwable] = null
fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener {
override def onBlockFetchFailure(exception: Exception): Unit = {
override def onBlockFetchFailure(exception: Throwable): Unit = {
lock.synchronized {
result = Right(exception)
lock.notify()
Expand All @@ -93,8 +95,8 @@ abstract class BlockTransferService {
}

result match {
case Left(data: ManagedBuffer) => data
case Right(e: Exception) => throw e
case Left(data) => data
case Right(e) => throw e
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan

val future = cm.sendMessageReliably(cmId, blockMessageArray.toBufferMessage)

// If succeeds in getting blocks from a remote connection manager, put the block in results.
// Register the listener on success/failure future callback.
future.onSuccess { case message =>
val bufferMessage = message.asInstanceOf[BufferMessage]
val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage)
Expand All @@ -101,6 +101,10 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan
}
}
}(cm.futureExecContext)

future.onFailure { case exception =>
listener.onBlockFetchFailure(exception)
}(cm.futureExecContext)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,18 +116,17 @@ final class ShuffleBlockFetcherIterator(
logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime))
}

override def onBlockFetchFailure(exception: Exception): Unit = {

override def onBlockFetchFailure(e: Throwable): Unit = {
logError("Failed to get block(s) from ${req.address.host}:${req.address.port}", e)
// Note that there is a chance that some blocks have been fetched successfully, but we
// still add them to the failed queue. This is fine because when the caller see a
// FetchFailedException, it is going to fail the entire task anyway.
for ((blockId, size) <- req.blocks) {
results.put(new FetchResult(blockId, -1, null))
}
}
}
)
// case Failure(exception) => {
// logError("Could not get block(s) from " + cmId, exception)
// for ((blockId, size) <- req.blocks) {
// results.put(new FetchResult(blockId, -1, null))
// }
// }
// }
}

private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = {
Expand Down
2 changes: 0 additions & 2 deletions core/src/test/scala/org/apache/spark/DistributedSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,6 @@ object DistributedSuite {
// Act like an identity function, but if the argument is true, set mark to true.
def markNodeIfIdentity(item: Boolean): Boolean = {
if (item) {
println("marking node!!!!!!!!!!!!!!!")
assert(!amMaster)
mark = true
}
Expand All @@ -352,7 +351,6 @@ object DistributedSuite {
// crashing the entire JVM.
def failOnMarkedIdentity(item: Boolean): Boolean = {
if (mark) {
println("failing node !!!!!!!!!!!!!!!")
System.exit(42)
}
item
Expand Down

0 comments on commit 98c668a

Please sign in to comment.