Skip to content

Commit

Permalink
address mateiz's comment about the temp folder name problem. The impl…
Browse files Browse the repository at this point in the history
…ementation followed mateiz's advice.
  • Loading branch information
RongGu committed Mar 23, 2014
1 parent 1dcadf9 commit 77be7e8
Show file tree
Hide file tree
Showing 7 changed files with 24 additions and 29 deletions.
12 changes: 7 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,17 @@ import java.io._
import java.net.URI
import java.util.{Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.{Map, Set}
import scala.collection.generic.Growable
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.reflect.{ClassTag, classTag}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, SequenceFileInputFormat, TextInputFormat}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob}
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
Expand All @@ -45,6 +42,7 @@ import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
import java.util.Random

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -127,6 +125,11 @@ class SparkContext(

val master = conf.get("spark.master")
val appName = conf.get("spark.app.name")

// Generate the random name for a temp folder in Tachyon
// Add a timestamp as the suffix here to make it more safe
val tachyonFolderName = new Random().nextInt() + "_" + System.currentTimeMillis()
conf.set("spark.tachyonstore.foldername", tachyonFolderName)

val isLocal = (master == "local" || master.startsWith("local["))

Expand All @@ -139,8 +142,7 @@ class SparkContext(
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
isDriver = true,
isLocal = isLocal,
"<driver>" + appName)
isLocal = isLocal)
SparkEnv.set(env)

// Used to store a URL for each static file/jar together with the file's local timestamp
Expand Down
7 changes: 2 additions & 5 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import org.apache.spark.util.{AkkaUtils, Utils}
*/
class SparkEnv private[spark] (
val executorId: String,
val appId: String,
val actorSystem: ActorSystem,
val serializerManager: SerializerManager,
val serializer: Serializer,
Expand Down Expand Up @@ -122,8 +121,7 @@ object SparkEnv extends Logging {
hostname: String,
port: Int,
isDriver: Boolean,
isLocal: Boolean,
appId: String = null): SparkEnv = {
isLocal: Boolean): SparkEnv = {

val securityManager = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
Expand Down Expand Up @@ -171,7 +169,7 @@ object SparkEnv extends Logging {
"BlockManagerMaster",
new BlockManagerMasterActor(isLocal, conf)), conf)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
serializer, conf, securityManager, appId)
serializer, conf, securityManager)

val connectionManager = blockManager.connectionManager

Expand Down Expand Up @@ -221,7 +219,6 @@ object SparkEnv extends Logging {

new SparkEnv(
executorId,
appId,
actorSystem,
serializerManager,
serializer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,6 @@ private[spark] class ExecutorRunner(
def substituteVariables(argument: String): String = argument match {
case "{{WORKER_URL}}" => workerUrl
case "{{EXECUTOR_ID}}" => execId.toString
case "{{APP_ID}}" => appId.toString
case "{{HOSTNAME}}" => host
case "{{CORES}}" => cores.toString
case other => other
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import org.apache.spark.util.{AkkaUtils, Utils}
private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
appId: String,
hostPort: String,
cores: Int)
extends Actor
Expand All @@ -55,7 +54,7 @@ private[spark] class CoarseGrainedExecutorBackend(
logInfo("Successfully registered with driver")
// Make this host instead of hostPort ?
executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties,
false, appId)
false)

case RegisterExecutorFailed(message) =>
logError("Slave registration failed: " + message)
Expand Down Expand Up @@ -94,7 +93,7 @@ private[spark] class CoarseGrainedExecutorBackend(
}

private[spark] object CoarseGrainedExecutorBackend {
def run(driverUrl: String, appId: String, executorId: String, hostname: String, cores: Int,
def run(driverUrl: String, executorId: String, hostname: String, cores: Int,
workerUrl: Option[String]) {
// Debug code
Utils.checkHost(hostname)
Expand All @@ -107,7 +106,7 @@ private[spark] object CoarseGrainedExecutorBackend {
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, appId, executorId,
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
sparkHostPort, cores),
name = "Executor")
workerUrl.foreach{ url =>
Expand All @@ -121,13 +120,13 @@ private[spark] object CoarseGrainedExecutorBackend {
case x if x < 4 =>
System.err.println(
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
"Usage: CoarseGrainedExecutorBackend <driverUrl> <appId> <executorId> <hostname> " +
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
"<cores> [<workerUrl>]")
System.exit(1)
case 4 =>
run(args(0), args(1), args(2), args(3), args(4).toInt, None)
run(args(0), args(1), args(2), args(3).toInt, None)
case x if x > 4 =>
run(args(0), args(1), args(2), args(3), args(4).toInt, Some(args(5)))
run(args(0), args(1), args(2), args(3).toInt, Some(args(4)))
}
}
}
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ private[spark] class Executor(
executorId: String,
slaveHostname: String,
properties: Seq[(String, String)],
isLocal: Boolean = false,
appId: String = null)
isLocal: Boolean = false)
extends Logging
{
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
Expand Down Expand Up @@ -104,7 +103,7 @@ private[spark] class Executor(
private val env = {
if (!isLocal) {
val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
isDriver = false, isLocal = false, appId)
isDriver = false, isLocal = false)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
_env
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ private[spark] class SparkDeploySchedulerBackend(
val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format(
conf.get("spark.driver.host"), conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{APP_ID}}", "{{HOSTNAME}}",
"{{CORES}}", "{{WORKER_URL}}")
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome()
Expand Down
12 changes: 6 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,7 @@ private[spark] class BlockManager(
val defaultSerializer: Serializer,
maxMemory: Long,
val conf: SparkConf,
securityManager: SecurityManager,
appId: String = "test")
securityManager: SecurityManager)
extends Logging {

val shuffleBlockManager = new ShuffleBlockManager(this)
Expand All @@ -63,8 +62,9 @@ private[spark] class BlockManager(
var tachyonInitialized = false
private[storage] lazy val tachyonStore : TachyonStore = {
val storeDir = conf.get("spark.tachyonstore.dir", System.getProperty("java.io.tmpdir"))
val tachyonStorePath = s"${storeDir}/${appId}/${this.executorId}"
val tachyonMaster = conf.get("spark.tachyonmaster.address", "localhost:19998")
val appFolderName = conf.get("spark.tachyonstore.foldername")
val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}"
val tachyonMaster = conf.get("spark.tachyonmaster.address", "tachyon://localhost:19998")
val tachyonBlockManager = new TachyonBlockManager(
shuffleBlockManager, tachyonStorePath, tachyonMaster)
tachyonInitialized = true
Expand Down Expand Up @@ -134,9 +134,9 @@ private[spark] class BlockManager(
* Construct a BlockManager with a memory limit set based on system properties.
*/
def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster,
serializer: Serializer, conf: SparkConf, securityManager: SecurityManager, appId: String) = {
serializer: Serializer, conf: SparkConf, securityManager: SecurityManager) = {
this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf,
securityManager, appId)
securityManager)
}

/**
Expand Down

0 comments on commit 77be7e8

Please sign in to comment.