From 4a204b846a8ce2b1cfbab9ed1ec42e8a2f082184 Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Tue, 28 Oct 2014 17:59:31 -0700 Subject: [PATCH] Fail block fetches if client connection fails --- .../network/netty/NettyBlockTransferService.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index 501a2d123d456..38a3e945155e8 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -58,8 +58,14 @@ class NettyBlockTransferService(conf: SparkConf) extends BlockTransferService { port: Int, blockIds: Seq[String], listener: BlockFetchingListener): Unit = { - val client = clientFactory.createClient(hostname, port) - new NettyBlockFetcher(serializer, client, blockIds, listener).start() + try { + val client = clientFactory.createClient(hostname, port) + new NettyBlockFetcher(serializer, client, blockIds, listener).start() + } catch { + case e: Exception => + logError("Exception while beginning fetchBlocks", e) + blockIds.foreach(listener.onBlockFetchFailure(_, e)) + } } override def hostName: String = Utils.localHostName()