From 995d196d7e1c45c7e6804f3621692d75fb73613d Mon Sep 17 00:00:00 2001 From: Bryan Cutler Date: Thu, 11 Jun 2015 15:09:33 -0700 Subject: [PATCH] [SPARK-6980] Cleaned up import ordering, comments, spacing from PR feedback --- .../scala/org/apache/spark/rpc/RpcEnv.scala | 10 +++------ .../apache/spark/rpc/akka/AkkaRpcEnv.scala | 22 +++++++++---------- .../org/apache/spark/util/AkkaUtils.scala | 9 ++++---- .../org/apache/spark/util/RpcUtils.scala | 7 ++---- .../spark/rpc/akka/AkkaRpcEnvSuite.scala | 1 - 5 files changed, 20 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index 8efe690e78216..5149cf14684ac 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -20,9 +20,8 @@ package org.apache.spark.rpc import java.net.URI import java.util.concurrent.TimeoutException -import scala.concurrent.duration.FiniteDuration -import scala.concurrent.duration._ import scala.concurrent.{Awaitable, Await, Future} +import scala.concurrent.duration._ import scala.language.postfixOps import org.apache.spark.{SecurityManager, SparkConf} @@ -229,7 +228,8 @@ private[spark] class RpcTimeout(timeout: FiniteDuration, description: String) { } /** - * Waits for a completed result to catch and amend a TimeoutException message + * Wait for the completed result and return it. If the result is not available within this + * timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout. * @param awaitable the `Awaitable` to be awaited * @throws RpcTimeoutException if after waiting for the specified time `awaitable` * is still not ready @@ -242,10 +242,6 @@ private[spark] class RpcTimeout(timeout: FiniteDuration, description: String) { } -/** - * Create an RpcTimeout using a configuration property that controls the timeout duration so when - * a TimeoutException is thrown, the property key will be indicated in the message. - */ object RpcTimeout { private[this] val messagePrefix = "This timeout is controlled by " diff --git a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala index cd032a3301b95..b089a6cdc9473 100644 --- a/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala @@ -299,17 +299,17 @@ private[akka] class AkkaRpcEndpointRef( override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = { actorRef.ask(AkkaMessage(message, true))(timeout.duration).flatMap { - // The function will run in the calling thread, so it should be short and never block. - case msg @ AkkaMessage(message, reply) => - if (reply) { - logError(s"Receive $msg but the sender cannot reply") - Future.failed(new SparkException(s"Receive $msg but the sender cannot reply")) - } else { - Future.successful(message) - } - case AkkaFailure(e) => - Future.failed(e) - }(ThreadUtils.sameThread).mapTo[T]. + // The function will run in the calling thread, so it should be short and never block. + case msg @ AkkaMessage(message, reply) => + if (reply) { + logError(s"Receive $msg but the sender cannot reply") + Future.failed(new SparkException(s"Receive $msg but the sender cannot reply")) + } else { + Future.successful(message) + } + case AkkaFailure(e) => + Future.failed(e) + }(ThreadUtils.sameThread).mapTo[T]. recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread) } diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 02e0aa0e51878..42e216906ae33 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,18 +17,17 @@ package org.apache.spark.util -import org.apache.spark.rpc.RpcTimeout - -import scala.collection.JavaConversions.mapAsJavaMap - import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} import akka.pattern.ask import com.typesafe.config.ConfigFactory -import org.apache.log4j.{Level, Logger} +import org.apache.log4j.{Level, Logger} +import org.apache.spark.rpc.RpcTimeout import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException} +import scala.collection.JavaConversions.mapAsJavaMap + /** * Various utility classes for working with Akka. */ diff --git a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala index b028dc1e3a031..e8950ecdfbd5a 100644 --- a/core/src/main/scala/org/apache/spark/util/RpcUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/RpcUtils.scala @@ -18,7 +18,6 @@ package org.apache.spark.util import scala.language.postfixOps -import scala.concurrent.duration._ import org.apache.spark.{SparkEnv, SparkConf} import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout} @@ -48,13 +47,11 @@ object RpcUtils { /** Returns the default Spark timeout to use for RPC ask operations. */ def askTimeout(conf: SparkConf): RpcTimeout = { - RpcTimeout(conf, Seq("spark.rpc.askTimeout", - "spark.network.timeout"), "120s") + RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s") } /** Returns the default Spark timeout to use for RPC remote endpoint lookup. */ def lookupTimeout(conf: SparkConf): RpcTimeout = { - RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", - "spark.network.timeout"), "120s") + RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s") } } diff --git a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala index 06183fab628c0..b72b15bd68102 100644 --- a/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/akka/AkkaRpcEnvSuite.scala @@ -28,7 +28,6 @@ import akka.pattern.ask import org.apache.spark.rpc._ import org.apache.spark.{SecurityManager, SparkConf} - class AkkaRpcEnvSuite extends RpcEnvSuite { override def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv = {