diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 7f2756391fd97..4361ccc391bd0 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -21,12 +21,10 @@ import java.io._ import java.net.URI import java.util.{Properties, UUID} import java.util.concurrent.atomic.AtomicInteger - import scala.collection.{Map, Set} import scala.collection.generic.Growable import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.reflect.{ClassTag, classTag} - import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, DoubleWritable, FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable} @@ -34,7 +32,6 @@ import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, Sequence import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHadoopJob} import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat} import org.apache.mesos.MesosNativeLibrary - import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import org.apache.spark.partial.{ApproximateEvaluator, PartialResult} import org.apache.spark.rdd._ @@ -45,6 +42,7 @@ import org.apache.spark.scheduler.local.LocalBackend import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils} import org.apache.spark.ui.SparkUI import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils} +import java.util.Random /** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark @@ -127,6 +125,11 @@ class SparkContext( val master = conf.get("spark.master") val appName = conf.get("spark.app.name") + + // Generate the random name for a temp folder in Tachyon + // Add a timestamp as the suffix here to make it more safe + val tachyonFolderName = new Random().nextInt() + "_" + System.currentTimeMillis() + conf.set("spark.tachyonstore.foldername", tachyonFolderName) val isLocal = (master == "local" || master.startsWith("local[")) @@ -139,8 +142,7 @@ class SparkContext( conf.get("spark.driver.host"), conf.get("spark.driver.port").toInt, isDriver = true, - isLocal = isLocal, - "" + appName) + isLocal = isLocal) SparkEnv.set(env) // Used to store a URL for each static file/jar together with the file's local timestamp diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index fd74081b0491e..5e43b5198422c 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -40,7 +40,6 @@ import org.apache.spark.util.{AkkaUtils, Utils} */ class SparkEnv private[spark] ( val executorId: String, - val appId: String, val actorSystem: ActorSystem, val serializerManager: SerializerManager, val serializer: Serializer, @@ -122,8 +121,7 @@ object SparkEnv extends Logging { hostname: String, port: Int, isDriver: Boolean, - isLocal: Boolean, - appId: String = null): SparkEnv = { + isLocal: Boolean): SparkEnv = { val securityManager = new SecurityManager(conf) val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf, @@ -171,7 +169,7 @@ object SparkEnv extends Logging { "BlockManagerMaster", new BlockManagerMasterActor(isLocal, conf)), conf) val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, - serializer, conf, securityManager, appId) + serializer, conf, securityManager) val connectionManager = blockManager.connectionManager @@ -221,7 +219,6 @@ object SparkEnv extends Logging { new SparkEnv( executorId, - appId, actorSystem, serializerManager, serializer, diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index 0e758a7e1f147..2edd921066876 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -92,7 +92,6 @@ private[spark] class ExecutorRunner( def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl case "{{EXECUTOR_ID}}" => execId.toString - case "{{APP_ID}}" => appId.toString case "{{HOSTNAME}}" => host case "{{CORES}}" => cores.toString case other => other diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 6d5963296d480..16887d8892b31 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -31,7 +31,6 @@ import org.apache.spark.util.{AkkaUtils, Utils} private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, - appId: String, hostPort: String, cores: Int) extends Actor @@ -55,7 +54,7 @@ private[spark] class CoarseGrainedExecutorBackend( logInfo("Successfully registered with driver") // Make this host instead of hostPort ? executor = new Executor(executorId, Utils.parseHostPort(hostPort)._1, sparkProperties, - false, appId) + false) case RegisterExecutorFailed(message) => logError("Slave registration failed: " + message) @@ -94,7 +93,7 @@ private[spark] class CoarseGrainedExecutorBackend( } private[spark] object CoarseGrainedExecutorBackend { - def run(driverUrl: String, appId: String, executorId: String, hostname: String, cores: Int, + def run(driverUrl: String, executorId: String, hostname: String, cores: Int, workerUrl: Option[String]) { // Debug code Utils.checkHost(hostname) @@ -107,7 +106,7 @@ private[spark] object CoarseGrainedExecutorBackend { // set it val sparkHostPort = hostname + ":" + boundPort actorSystem.actorOf( - Props(classOf[CoarseGrainedExecutorBackend], driverUrl, appId, executorId, + Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId, sparkHostPort, cores), name = "Executor") workerUrl.foreach{ url => @@ -121,13 +120,13 @@ private[spark] object CoarseGrainedExecutorBackend { case x if x < 4 => System.err.println( // Worker url is used in spark standalone mode to enforce fate-sharing with worker - "Usage: CoarseGrainedExecutorBackend " + + "Usage: CoarseGrainedExecutorBackend " + " []") System.exit(1) case 4 => - run(args(0), args(1), args(2), args(3), args(4).toInt, None) + run(args(0), args(1), args(2), args(3).toInt, None) case x if x > 4 => - run(args(0), args(1), args(2), args(3), args(4).toInt, Some(args(5))) + run(args(0), args(1), args(2), args(3).toInt, Some(args(4))) } } } diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index fae96ff245bcb..e69f6f72d3275 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -38,8 +38,7 @@ private[spark] class Executor( executorId: String, slaveHostname: String, properties: Seq[(String, String)], - isLocal: Boolean = false, - appId: String = null) + isLocal: Boolean = false) extends Logging { // Application dependencies (added through SparkContext) that we've fetched so far on this node. @@ -104,7 +103,7 @@ private[spark] class Executor( private val env = { if (!isLocal) { val _env = SparkEnv.create(conf, executorId, slaveHostname, 0, - isDriver = false, isLocal = false, appId) + isDriver = false, isLocal = false) SparkEnv.set(_env) _env.metricsSystem.registerSource(executorSource) _env diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 1152ff9e55acd..ee4b65e312abc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -45,8 +45,7 @@ private[spark] class SparkDeploySchedulerBackend( val driverUrl = "akka.tcp://spark@%s:%s/user/%s".format( conf.get("spark.driver.host"), conf.get("spark.driver.port"), CoarseGrainedSchedulerBackend.ACTOR_NAME) - val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{APP_ID}}", "{{HOSTNAME}}", - "{{CORES}}", "{{WORKER_URL}}") + val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}") val command = Command( "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs) val sparkHome = sc.getSparkHome() diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index d72f81720dc37..55be03a22c027 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -48,8 +48,7 @@ private[spark] class BlockManager( val defaultSerializer: Serializer, maxMemory: Long, val conf: SparkConf, - securityManager: SecurityManager, - appId: String = "test") + securityManager: SecurityManager) extends Logging { val shuffleBlockManager = new ShuffleBlockManager(this) @@ -63,8 +62,9 @@ private[spark] class BlockManager( var tachyonInitialized = false private[storage] lazy val tachyonStore : TachyonStore = { val storeDir = conf.get("spark.tachyonstore.dir", System.getProperty("java.io.tmpdir")) - val tachyonStorePath = s"${storeDir}/${appId}/${this.executorId}" - val tachyonMaster = conf.get("spark.tachyonmaster.address", "localhost:19998") + val appFolderName = conf.get("spark.tachyonstore.foldername") + val tachyonStorePath = s"${storeDir}/${appFolderName}/${this.executorId}" + val tachyonMaster = conf.get("spark.tachyonmaster.address", "tachyon://localhost:19998") val tachyonBlockManager = new TachyonBlockManager( shuffleBlockManager, tachyonStorePath, tachyonMaster) tachyonInitialized = true @@ -134,9 +134,9 @@ private[spark] class BlockManager( * Construct a BlockManager with a memory limit set based on system properties. */ def this(execId: String, actorSystem: ActorSystem, master: BlockManagerMaster, - serializer: Serializer, conf: SparkConf, securityManager: SecurityManager, appId: String) = { + serializer: Serializer, conf: SparkConf, securityManager: SecurityManager) = { this(execId, actorSystem, master, serializer, BlockManager.getMaxMemory(conf), conf, - securityManager, appId) + securityManager) } /**