From 98c668ae98e6e7d3d22504b3607527ad162356fc Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 2 Sep 2014 15:03:58 -0700 Subject: [PATCH] Added failure handling and fixed unit tests. --- .../spark/network/BlockFetchingListener.scala | 2 +- .../spark/network/BlockTransferService.scala | 24 ++++++++++--------- .../network/cm/CMBlockTransferService.scala | 6 ++++- .../storage/ShuffleBlockFetcherIterator.scala | 17 +++++++------ .../org/apache/spark/DistributedSuite.scala | 2 -- 5 files changed, 27 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala b/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala index c1dfcf1c12d39..6bc123701bddf 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockFetchingListener.scala @@ -33,5 +33,5 @@ trait BlockFetchingListener extends EventListener { /** * Called upon failures. */ - def onBlockFetchFailure(exception: Exception): Unit + def onBlockFetchFailure(exception: Throwable): Unit } diff --git a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala index 645adbce23117..3000fc74fdb01 100644 --- a/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/BlockTransferService.scala @@ -44,7 +44,11 @@ abstract class BlockTransferService { def hostName: String /** - * Fetch a sequence of blocks from a remote node, available only after [[init]] is invoked. + * Fetch a sequence of blocks from a remote node asynchronously, + * available only after [[init]] is invoked. + * + * Note that [[BlockFetchingListener.onBlockFetchSuccess]] is called once per block, + * while [[BlockFetchingListener.onBlockFetchSuccess]] is called once per failure. * * This takes a sequence so the implementation can batch requests. */ @@ -55,19 +59,17 @@ abstract class BlockTransferService { listener: BlockFetchingListener): Unit /** - * Fetch a single block from a remote node, available only after [[init]] is invoked. - * - * This is functionally equivalent to - * {{{ - * fetchBlocks(hostName, port, Seq(blockId)).iterator().next()._2 - * }}} + * Fetch a single block from a remote node, synchronously, + * available only after [[init]] is invoked. */ def fetchBlock(hostName: String, port: Int, blockId: String): ManagedBuffer = { // TODO(rxin): Add timeout? + + // A monitor for the thread to wait on. val lock = new Object - @volatile var result: Either[ManagedBuffer, Exception] = null + @volatile var result: Either[ManagedBuffer, Throwable] = null fetchBlocks(hostName, port, Seq(blockId), new BlockFetchingListener { - override def onBlockFetchFailure(exception: Exception): Unit = { + override def onBlockFetchFailure(exception: Throwable): Unit = { lock.synchronized { result = Right(exception) lock.notify() @@ -93,8 +95,8 @@ abstract class BlockTransferService { } result match { - case Left(data: ManagedBuffer) => data - case Right(e: Exception) => throw e + case Left(data) => data + case Right(e) => throw e } } diff --git a/core/src/main/scala/org/apache/spark/network/cm/CMBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/cm/CMBlockTransferService.scala index 3b61c0ee852c5..86d6396dfdc2b 100644 --- a/core/src/main/scala/org/apache/spark/network/cm/CMBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/cm/CMBlockTransferService.scala @@ -84,7 +84,7 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan val future = cm.sendMessageReliably(cmId, blockMessageArray.toBufferMessage) - // If succeeds in getting blocks from a remote connection manager, put the block in results. + // Register the listener on success/failure future callback. future.onSuccess { case message => val bufferMessage = message.asInstanceOf[BufferMessage] val blockMessageArray = BlockMessageArray.fromBufferMessage(bufferMessage) @@ -101,6 +101,10 @@ final class CMBlockTransferService(conf: SparkConf, securityManager: SecurityMan } } }(cm.futureExecContext) + + future.onFailure { case exception => + listener.onBlockFetchFailure(exception) + }(cm.futureExecContext) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 5c647f9754f4d..bdba7b133f92c 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -116,18 +116,17 @@ final class ShuffleBlockFetcherIterator( logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } - override def onBlockFetchFailure(exception: Exception): Unit = { - + override def onBlockFetchFailure(e: Throwable): Unit = { + logError("Failed to get block(s) from ${req.address.host}:${req.address.port}", e) + // Note that there is a chance that some blocks have been fetched successfully, but we + // still add them to the failed queue. This is fine because when the caller see a + // FetchFailedException, it is going to fail the entire task anyway. + for ((blockId, size) <- req.blocks) { + results.put(new FetchResult(blockId, -1, null)) + } } } ) - // case Failure(exception) => { - // logError("Could not get block(s) from " + cmId, exception) - // for ((blockId, size) <- req.blocks) { - // results.put(new FetchResult(blockId, -1, null)) - // } - // } - // } } private[this] def splitLocalRemoteBlocks(): ArrayBuffer[FetchRequest] = { diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index b54163a3bcfa7..2cd1fc1870281 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -341,7 +341,6 @@ object DistributedSuite { // Act like an identity function, but if the argument is true, set mark to true. def markNodeIfIdentity(item: Boolean): Boolean = { if (item) { - println("marking node!!!!!!!!!!!!!!!") assert(!amMaster) mark = true } @@ -352,7 +351,6 @@ object DistributedSuite { // crashing the entire JVM. def failOnMarkedIdentity(item: Boolean): Boolean = { if (mark) { - println("failing node !!!!!!!!!!!!!!!") System.exit(42) } item