Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into ldaRefactor
Browse files Browse the repository at this point in the history
  • Loading branch information
hhbyyh committed Apr 23, 2015
2 parents 0bb8400 + a7d65d3 commit d74fd8f
Show file tree
Hide file tree
Showing 91 changed files with 616 additions and 439 deletions.
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val inputMetrics = blockResult.inputMetrics
val existingMetrics = context.taskMetrics
.getInputMetricsForReadMethod(inputMetrics.readMethod)
existingMetrics.incBytesRead(inputMetrics.bytesRead)
.getInputMetricsForReadMethod(blockResult.readMethod)
existingMetrics.incBytesRead(blockResult.bytes)

val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@

package org.apache.spark

import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.TimeUnit

import scala.collection.mutable

import org.apache.spark.scheduler._
import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.{ThreadUtils, Clock, SystemClock, Utils}

/**
* An agent that dynamically allocates and removes executors based on the workload.
Expand Down Expand Up @@ -132,8 +132,8 @@ private[spark] class ExecutorAllocationManager(
private val listener = new ExecutorAllocationListener

// Executor that handles the scheduling task.
private val executor = Executors.newSingleThreadScheduledExecutor(
Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
private val executor =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("spark-dynamic-executor-allocation")

/**
* Verify that the settings specified through the config are valid.
Expand Down
11 changes: 5 additions & 6 deletions core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@

package org.apache.spark

import java.util.concurrent.{ScheduledFuture, TimeUnit, Executors}
import java.util.concurrent.{ScheduledFuture, TimeUnit}

import scala.collection.mutable

import org.apache.spark.executor.TaskMetrics
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEnv, RpcCallContext}
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.scheduler.{SlaveLost, TaskScheduler}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* A heartbeat from executors to the driver. This is a shared message used by several internal
Expand Down Expand Up @@ -76,11 +76,10 @@ private[spark] class HeartbeatReceiver(sc: SparkContext)

private var timeoutCheckingTask: ScheduledFuture[_] = null

private val timeoutCheckingThread = Executors.newSingleThreadScheduledExecutor(
Utils.namedThreadFactory("heartbeat-timeout-checking-thread"))
private val timeoutCheckingThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("heartbeat-timeout-checking-thread")

private val killExecutorThread = Executors.newSingleThreadExecutor(
Utils.namedThreadFactory("kill-executor-thread"))
private val killExecutorThread = ThreadUtils.newDaemonSingleThreadExecutor("kill-executor-thread")

override def onStart(): Unit = {
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.io.CompressionCodec
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.{Logging, SecurityManager, SparkConf}


Expand Down Expand Up @@ -99,7 +99,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
*/
private val replayExecutor: ExecutorService = {
if (!conf.contains("spark.testing")) {
Executors.newSingleThreadExecutor(Utils.namedThreadFactory("log-replay-executor"))
ThreadUtils.newDaemonSingleThreadExecutor("log-replay-executor")
} else {
MoreExecutors.sameThreadExecutor()
}
Expand Down
7 changes: 3 additions & 4 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io.File
import java.lang.management.ManagementFactory
import java.net.URL
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}
Expand Down Expand Up @@ -76,7 +76,7 @@ private[spark] class Executor(
}

// Start worker thread pool
private val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")
private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor task launch worker")
private val executorSource = new ExecutorSource(threadPool, executorId)

if (!isLocal) {
Expand Down Expand Up @@ -110,8 +110,7 @@ private[spark] class Executor(
private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]

// Executor for the heartbeat task.
private val heartbeater = Executors.newSingleThreadScheduledExecutor(
Utils.namedThreadFactory("driver-heartbeater"))
private val heartbeater = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-heartbeater")

startDriverHeartbeater()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import io.netty.util.{Timeout, TimerTask, HashedWheelTimer}

import org.apache.spark._
import org.apache.spark.network.sasl.{SparkSaslClient, SparkSaslServer}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

import scala.util.Try
import scala.util.control.NonFatal
Expand Down Expand Up @@ -79,7 +79,7 @@ private[nio] class ConnectionManager(

private val selector = SelectorProvider.provider.openSelector()
private val ackTimeoutMonitor =
new HashedWheelTimer(Utils.namedThreadFactory("AckTimeoutMonitor"))
new HashedWheelTimer(ThreadUtils.namedThreadFactory("AckTimeoutMonitor"))

private val ackTimeout =
conf.getTimeAsSeconds("spark.core.connection.ack.wait.timeout",
Expand All @@ -102,7 +102,7 @@ private[nio] class ConnectionManager(
handlerThreadCount,
conf.getInt("spark.core.connection.handler.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable](),
Utils.namedThreadFactory("handle-message-executor")) {
ThreadUtils.namedThreadFactory("handle-message-executor")) {

override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
Expand All @@ -117,7 +117,7 @@ private[nio] class ConnectionManager(
ioThreadCount,
conf.getInt("spark.core.connection.io.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable](),
Utils.namedThreadFactory("handle-read-write-executor")) {
ThreadUtils.namedThreadFactory("handle-read-write-executor")) {

override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
Expand All @@ -134,7 +134,7 @@ private[nio] class ConnectionManager(
connectThreadCount,
conf.getInt("spark.core.connection.connect.threads.keepalive", 60), TimeUnit.SECONDS,
new LinkedBlockingDeque[Runnable](),
Utils.namedThreadFactory("handle-connect-executor")) {
ThreadUtils.namedThreadFactory("handle-connect-executor")) {

override def afterExecute(r: Runnable, t: Throwable): Unit = {
super.afterExecute(r, t)
Expand All @@ -160,7 +160,7 @@ private[nio] class ConnectionManager(
private val registerRequests = new SynchronizedQueue[SendingConnection]

implicit val futureExecContext = ExecutionContext.fromExecutor(
Utils.newDaemonCachedThreadPool("Connection manager future execution context"))
ThreadUtils.newDaemonCachedThreadPool("Connection manager future execution context"))

@volatile
private var onReceiveCallback: (BufferMessage, ConnectionManagerId) => Option[Message] = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.scheduler

import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.{TimeUnit, Executors}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map, Stack}
Expand Down Expand Up @@ -129,7 +129,7 @@ class DAGScheduler(
private val disallowStageRetryForTest = sc.getConf.getBoolean("spark.test.noStageRetry", false)

private val messageScheduler =
Executors.newScheduledThreadPool(1, Utils.namedThreadFactory("dag-scheduler-message"))
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")

private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
taskScheduler.setDAGScheduler(this)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.util.control.NonFatal
import org.apache.spark._
import org.apache.spark.TaskState.TaskState
import org.apache.spark.serializer.SerializerInstance
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* Runs a thread pool that deserializes and remotely fetches (if necessary) task results.
Expand All @@ -35,7 +35,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
extends Logging {

private val THREADS = sparkEnv.conf.getInt("spark.resultGetter.threads", 4)
private val getTaskResultExecutor = Utils.newDaemonFixedThreadPool(
private val getTaskResultExecutor = ThreadUtils.newDaemonFixedThreadPool(
THREADS, "task-result-getter")

protected val serializer = new ThreadLocal[SerializerInstance] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster

import java.util.concurrent.{TimeUnit, Executors}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
Expand All @@ -26,7 +26,7 @@ import org.apache.spark.rpc._
import org.apache.spark.{ExecutorAllocationClient, Logging, SparkEnv, SparkException, TaskState}
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.util.{SerializableBuffer, AkkaUtils, Utils}
import org.apache.spark.util.{ThreadUtils, SerializableBuffer, AkkaUtils, Utils}

/**
* A scheduler backend that waits for coarse grained executors to connect to it through Akka.
Expand Down Expand Up @@ -73,7 +73,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private val addressToExecutorId = new HashMap[RpcAddress, String]

private val reviveThread =
Executors.newSingleThreadScheduledExecutor(Utils.namedThreadFactory("driver-revive-thread"))
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

override def onStart() {
// Periodically revive offers to allow delay scheduling to work
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.rpc._
import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.ui.JettyUtils
import org.apache.spark.util.{RpcUtils, Utils}
import org.apache.spark.util.{ThreadUtils, RpcUtils}

import scala.util.control.NonFatal

Expand Down Expand Up @@ -97,7 +97,7 @@ private[spark] abstract class YarnSchedulerBackend(
private var amEndpoint: Option[RpcEndpointRef] = None

private val askAmThreadPool =
Utils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
implicit val askAmExecutor = ExecutionContext.fromExecutor(askAmThreadPool)

override def receive: PartialFunction[Any, Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@
package org.apache.spark.scheduler.local

import java.nio.ByteBuffer
import java.util.concurrent.{Executors, TimeUnit}
import java.util.concurrent.TimeUnit

import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

private case class ReviveOffers()

Expand All @@ -47,8 +47,8 @@ private[spark] class LocalEndpoint(
private val totalCores: Int)
extends ThreadSafeRpcEndpoint with Logging {

private val reviveThread = Executors.newSingleThreadScheduledExecutor(
Utils.namedThreadFactory("local-revive-thread"))
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("local-revive-thread")

private var freeCores = totalCores

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import scala.util.Random
import sun.nio.ch.DirectBuffer

import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.executor.{DataReadMethod, ShuffleWriteMetrics}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.network._
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
Expand All @@ -50,11 +50,8 @@ private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
val data: Iterator[Any],
readMethod: DataReadMethod.Value,
bytes: Long) {
val inputMetrics = new InputMetrics(readMethod)
inputMetrics.incBytesRead(bytes)
}
val readMethod: DataReadMethod.Value,
val bytes: Long)

/**
* Manager running on every node (driver and executors) which provides interfaces for putting and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.storage.BlockManagerMessages._
import org.apache.spark.util.Utils
import org.apache.spark.util.{ThreadUtils, Utils}

/**
* BlockManagerMasterEndpoint is an [[ThreadSafeRpcEndpoint]] on the master node to track statuses
Expand All @@ -51,7 +51,7 @@ class BlockManagerMasterEndpoint(
// Mapping from block id to the set of block managers that have the block.
private val blockLocations = new JHashMap[BlockId, mutable.HashSet[BlockManagerId]]

private val askThreadPool = Utils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
private val askThreadPool = ThreadUtils.newDaemonCachedThreadPool("block-manager-ask-thread-pool")
private implicit val askExecutionContext = ExecutionContext.fromExecutorService(askThreadPool)

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.storage
import scala.concurrent.{ExecutionContext, Future}

import org.apache.spark.rpc.{RpcEnv, RpcCallContext, RpcEndpoint}
import org.apache.spark.util.Utils
import org.apache.spark.util.ThreadUtils
import org.apache.spark.{Logging, MapOutputTracker, SparkEnv}
import org.apache.spark.storage.BlockManagerMessages._

Expand All @@ -36,7 +36,7 @@ class BlockManagerSlaveEndpoint(
extends RpcEndpoint with Logging {

private val asyncThreadPool =
Utils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool")
ThreadUtils.newDaemonCachedThreadPool("block-manager-slave-async-thread-pool")
private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool)

// Operations that involve removing blocks may be slow and should be done asynchronously
Expand Down
Loading

0 comments on commit d74fd8f

Please sign in to comment.