Skip to content

Commit

Permalink
[SPARK-6980] Cleaned up import ordering, comments, spacing from PR fe…
Browse files Browse the repository at this point in the history
…edback
  • Loading branch information
BryanCutler committed Jun 11, 2015
1 parent 7774d56 commit 995d196
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 29 deletions.
10 changes: 3 additions & 7 deletions core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,8 @@ package org.apache.spark.rpc
import java.net.URI
import java.util.concurrent.TimeoutException

import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.concurrent.{Awaitable, Await, Future}
import scala.concurrent.duration._
import scala.language.postfixOps

import org.apache.spark.{SecurityManager, SparkConf}
Expand Down Expand Up @@ -229,7 +228,8 @@ private[spark] class RpcTimeout(timeout: FiniteDuration, description: String) {
}

/**
* Waits for a completed result to catch and amend a TimeoutException message
* Wait for the completed result and return it. If the result is not available within this
* timeout, throw a [[RpcTimeoutException]] to indicate which configuration controls the timeout.
* @param awaitable the `Awaitable` to be awaited
* @throws RpcTimeoutException if after waiting for the specified time `awaitable`
* is still not ready
Expand All @@ -242,10 +242,6 @@ private[spark] class RpcTimeout(timeout: FiniteDuration, description: String) {
}


/**
* Create an RpcTimeout using a configuration property that controls the timeout duration so when
* a TimeoutException is thrown, the property key will be indicated in the message.
*/
object RpcTimeout {

private[this] val messagePrefix = "This timeout is controlled by "
Expand Down
22 changes: 11 additions & 11 deletions core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,17 @@ private[akka] class AkkaRpcEndpointRef(

override def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] = {
actorRef.ask(AkkaMessage(message, true))(timeout.duration).flatMap {
// The function will run in the calling thread, so it should be short and never block.
case msg @ AkkaMessage(message, reply) =>
if (reply) {
logError(s"Receive $msg but the sender cannot reply")
Future.failed(new SparkException(s"Receive $msg but the sender cannot reply"))
} else {
Future.successful(message)
}
case AkkaFailure(e) =>
Future.failed(e)
}(ThreadUtils.sameThread).mapTo[T].
// The function will run in the calling thread, so it should be short and never block.
case msg @ AkkaMessage(message, reply) =>
if (reply) {
logError(s"Receive $msg but the sender cannot reply")
Future.failed(new SparkException(s"Receive $msg but the sender cannot reply"))
} else {
Future.successful(message)
}
case AkkaFailure(e) =>
Future.failed(e)
}(ThreadUtils.sameThread).mapTo[T].
recover(timeout.addMessageIfTimeout)(ThreadUtils.sameThread)
}

Expand Down
9 changes: 4 additions & 5 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,17 @@

package org.apache.spark.util

import org.apache.spark.rpc.RpcTimeout

import scala.collection.JavaConversions.mapAsJavaMap

import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem}
import akka.pattern.ask

import com.typesafe.config.ConfigFactory
import org.apache.log4j.{Level, Logger}

import org.apache.log4j.{Level, Logger}
import org.apache.spark.rpc.RpcTimeout
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv, SparkException}

import scala.collection.JavaConversions.mapAsJavaMap

/**
* Various utility classes for working with Akka.
*/
Expand Down
7 changes: 2 additions & 5 deletions core/src/main/scala/org/apache/spark/util/RpcUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.spark.util

import scala.language.postfixOps
import scala.concurrent.duration._

import org.apache.spark.{SparkEnv, SparkConf}
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef, RpcEnv, RpcTimeout}
Expand Down Expand Up @@ -48,13 +47,11 @@ object RpcUtils {

/** Returns the default Spark timeout to use for RPC ask operations. */
def askTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.askTimeout",
"spark.network.timeout"), "120s")
RpcTimeout(conf, Seq("spark.rpc.askTimeout", "spark.network.timeout"), "120s")
}

/** Returns the default Spark timeout to use for RPC remote endpoint lookup. */
def lookupTimeout(conf: SparkConf): RpcTimeout = {
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout",
"spark.network.timeout"), "120s")
RpcTimeout(conf, Seq("spark.rpc.lookupTimeout", "spark.network.timeout"), "120s")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import akka.pattern.ask
import org.apache.spark.rpc._
import org.apache.spark.{SecurityManager, SparkConf}


class AkkaRpcEnvSuite extends RpcEnvSuite {

override def createRpcEnv(conf: SparkConf, name: String, port: Int): RpcEnv = {
Expand Down

0 comments on commit 995d196

Please sign in to comment.