From 613dad04e7922654519eef35237176e110d1b136 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Wed, 26 Dec 2018 14:50:55 +0900 Subject: [PATCH 1/4] Use ConfigEntry for hardcoded configs for history category. --- .../scala/org/apache/spark/SparkConf.scala | 4 +- .../deploy/history/FsHistoryProvider.scala | 21 ++++----- .../spark/deploy/history/HistoryServer.scala | 16 ++++--- .../history/HistoryServerArguments.scala | 2 +- .../spark/internal/config/History.scala | 44 +++++++++++++++++++ .../org/apache/spark/SparkConfSuite.scala | 2 +- .../history/FsHistoryProviderSuite.scala | 21 +++++---- .../history/HistoryServerArgumentsSuite.scala | 9 ++-- .../deploy/history/HistoryServerSuite.scala | 13 +++--- 9 files changed, 87 insertions(+), 45 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 8d135d3e083d7..0b47da12b5b42 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -682,11 +682,11 @@ private[spark] object SparkConf extends Logging { private val configsWithAlternatives = Map[String, Seq[AlternateConfig]]( "spark.executor.userClassPathFirst" -> Seq( AlternateConfig("spark.files.userClassPathFirst", "1.3")), - "spark.history.fs.update.interval" -> Seq( + UPDATE_INTERVAL_S.key -> Seq( AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"), AlternateConfig("spark.history.fs.updateInterval", "1.3"), AlternateConfig("spark.history.updateInterval", "1.3")), - "spark.history.fs.cleaner.interval" -> Seq( + CLEANER_INTERVAL_S.key -> Seq( AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")), MAX_LOG_AGE_S.key -> Seq( AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")), diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index da6e5f03aabb5..b5eb2bcaa2f2c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -42,7 +42,7 @@ import org.fusesource.leveldbjni.internal.NativeDB import org.apache.spark.{SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging -import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR +import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History} import org.apache.spark.internal.config.History._ import org.apache.spark.internal.config.Status._ import org.apache.spark.io.CompressionCodec @@ -91,24 +91,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) import FsHistoryProvider._ // Interval between safemode checks. - private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds( - "spark.history.fs.safemodeCheck.interval", "5s") + private val SAFEMODE_CHECK_INTERVAL_S = conf.get(History.SAFEMODE_CHECK_INTERVAL_S) // Interval between each check for event log updates - private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s") + private val UPDATE_INTERVAL_S = conf.get(History.UPDATE_INTERVAL_S) // Interval between each cleaner checks for event logs to delete - private val CLEAN_INTERVAL_S = conf.get(CLEANER_INTERVAL_S) + private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S) // Number of threads used to replay event logs. - private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS, - Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) + private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS) - private val logDir = conf.get(EVENT_LOG_DIR) + private val logDir = conf.get(History.EVENT_LOG_DIR) - private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false) - private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "") - private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get("spark.history.ui.admin.acls.groups", "") + private val HISTORY_UI_ACLS_ENABLE = conf.get(History.UI_ACLS_ENABLE) + private val HISTORY_UI_ADMIN_ACLS = conf.get(History.UI_ADMIN_ACLS) + private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get(History.UI_ADMIN_ACLS_GROUPS) logInfo(s"History server ui acls " + (if (HISTORY_UI_ACLS_ENABLE) "enabled" else "disabled") + "; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString + "; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString) @@ -1089,7 +1087,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } private[history] object FsHistoryProvider { - private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads" private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\"" diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 5856c7057b745..b9303388638fd 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -30,7 +30,7 @@ import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.internal.Logging import org.apache.spark.internal.config._ -import org.apache.spark.internal.config.History.HISTORY_SERVER_UI_PORT +import org.apache.spark.internal.config.History import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot} import org.apache.spark.ui.{SparkUI, UIUtils, WebUI} import org.apache.spark.ui.JettyUtils._ @@ -56,7 +56,7 @@ class HistoryServer( with Logging with UIRoot with ApplicationCacheOperations { // How many applications to retain - private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50) + private val retainedApplications = conf.get(History.RETAINED_APPLICATIONS) // How many applications the summary ui displays private[history] val maxApplications = conf.get(HISTORY_UI_MAX_APPS); @@ -273,14 +273,14 @@ object HistoryServer extends Logging { initSecurity() val securityManager = createSecurityManager(conf) - val providerName = conf.getOption("spark.history.provider") + val providerName = conf.get(History.PROVIDER) .getOrElse(classOf[FsHistoryProvider].getName()) val provider = Utils.classForName(providerName) .getConstructor(classOf[SparkConf]) .newInstance(conf) .asInstanceOf[ApplicationHistoryProvider] - val port = conf.get(HISTORY_SERVER_UI_PORT) + val port = conf.get(History.HISTORY_SERVER_UI_PORT) val server = new HistoryServer(conf, provider, securityManager, port) server.bind() @@ -319,10 +319,12 @@ object HistoryServer extends Logging { // from a keytab file so that we can access HDFS beyond the kerberos ticket expiration. // As long as it is using Hadoop rpc (hdfs://), a relogin will automatically // occur from the keytab. - if (conf.getBoolean("spark.history.kerberos.enabled", false)) { + if (conf.get(History.KERBEROS_ENABLED)) { // if you have enabled kerberos the following 2 params must be set - val principalName = conf.get("spark.history.kerberos.principal") - val keytabFilename = conf.get("spark.history.kerberos.keytab") + val principalName = conf.get(History.KERBEROS_PRINCIPAL) + .getOrElse(throw new NoSuchElementException(History.KERBEROS_PRINCIPAL.key)) + val keytabFilename = conf.get(History.KERBEROS_KEYTAB) + .getOrElse(throw new NoSuchElementException(History.KERBEROS_KEYTAB.key)) SparkHadoopUtil.get.loginUserFromKeytab(principalName, keytabFilename) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index 49f00cb10179e..dec89769c030b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -79,7 +79,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin | | spark.history.fs.logDirectory Directory where app logs are stored | (default: file:/tmp/spark-events) - | spark.history.fs.updateInterval How often to reload log data from storage + | spark.history.fs.update.interval How often to reload log data from storage | (in seconds, default: 10) |""".stripMargin) // scalastyle:on println diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index b7d8061d26d21..860e7529718bf 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -29,6 +29,14 @@ private[spark] object History { .stringConf .createWithDefault(DEFAULT_LOG_DIR) + val SAFEMODE_CHECK_INTERVAL_S = ConfigBuilder("spark.history.fs.safemodeCheck.interval") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("5s") + + val UPDATE_INTERVAL_S = ConfigBuilder("spark.history.fs.update.interval") + .timeConf(TimeUnit.SECONDS) + .createWithDefaultString("10s") + val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled") .booleanConf .createWithDefault(false) @@ -79,4 +87,40 @@ private[spark] object History { val MAX_DRIVER_LOG_AGE_S = ConfigBuilder("spark.history.fs.driverlog.cleaner.maxAge") .fallbackConf(MAX_LOG_AGE_S) + + val UI_ACLS_ENABLE = ConfigBuilder("spark.history.ui.acls.enable") + .booleanConf + .createWithDefault(false) + + val UI_ADMIN_ACLS = ConfigBuilder("spark.history.ui.admin.acls") + .stringConf + .createWithDefault("") + + val UI_ADMIN_ACLS_GROUPS = ConfigBuilder("spark.history.ui.admin.acls.groups") + .stringConf + .createWithDefault("") + + val NUM_REPLAY_THREADS = ConfigBuilder("spark.history.fs.numReplayThreads") + .intConf + .createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt) + + val RETAINED_APPLICATIONS = ConfigBuilder("spark.history.retainedApplications") + .intConf + .createWithDefault(50) + + val PROVIDER = ConfigBuilder("spark.history.provider") + .stringConf + .createOptional + + val KERBEROS_ENABLED = ConfigBuilder("spark.history.kerberos.enabled") + .booleanConf + .createWithDefault(false) + + val KERBEROS_PRINCIPAL = ConfigBuilder("spark.history.kerberos.principal") + .stringConf + .createOptional + + val KERBEROS_KEYTAB = ConfigBuilder("spark.history.kerberos.keytab") + .stringConf + .createOptional } diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 7cb03deae1391..e14a5dcb5ef84 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -232,7 +232,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst test("deprecated configs") { val conf = new SparkConf() - val newName = "spark.history.fs.update.interval" + val newName = UPDATE_INTERVAL_S.key assert(!conf.contains(newName)) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index c1ae27aa940f6..04abf93729c74 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -294,7 +294,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val maxAge = TimeUnit.SECONDS.toMillis(10) val clock = new ManualClock(maxAge / 2) val provider = new FsHistoryProvider( - createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock) + createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock) val log1 = newLogFile("app1", Some("attempt1"), inProgress = false) writeFile(log1, true, None, @@ -379,7 +379,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val maxAge = TimeUnit.SECONDS.toMillis(40) val clock = new ManualClock(0) val provider = new FsHistoryProvider( - createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock) + createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock) val log1 = newLogFile("inProgressApp1", None, inProgress = true) writeFile(log1, true, None, @@ -462,8 +462,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val maxAge = TimeUnit.SECONDS.toSeconds(40) val clock = new ManualClock(0) val testConf = new SparkConf() - testConf.set("spark.history.fs.logDirectory", - Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath()) + testConf.set(EVENT_LOG_DIR, Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath()) testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath()) testConf.set(DRIVER_LOG_CLEANER_ENABLED, true) testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4) @@ -645,9 +644,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc // Test both history ui admin acls and application acls are configured. val conf1 = createTestConf() - .set("spark.history.ui.acls.enable", "true") - .set("spark.history.ui.admin.acls", "user1,user2") - .set("spark.history.ui.admin.acls.groups", "group1") + .set(UI_ACLS_ENABLE, true) + .set(UI_ADMIN_ACLS, "user1,user2") + .set(UI_ADMIN_ACLS_GROUPS, "group1") .set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName) createAndCheck(conf1, ("spark.admin.acls", "user"), ("spark.admin.acls.groups", "group")) { @@ -667,9 +666,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc // Test only history ui admin acls are configured. val conf2 = createTestConf() - .set("spark.history.ui.acls.enable", "true") - .set("spark.history.ui.admin.acls", "user1,user2") - .set("spark.history.ui.admin.acls.groups", "group1") + .set(UI_ACLS_ENABLE, true) + .set(UI_ADMIN_ACLS, "user1,user2") + .set(UI_ADMIN_ACLS_GROUPS, "group1") .set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName) createAndCheck(conf2) { securityManager => // Test whether user has permission to access UI. @@ -687,7 +686,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc // Test neither history ui admin acls nor application acls are configured. val conf3 = createTestConf() - .set("spark.history.ui.acls.enable", "true") + .set(UI_ACLS_ENABLE, true) .set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName) createAndCheck(conf3) { securityManager => // Test whether user has permission to access UI. diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala index e89733a144cfa..53e92d705bca2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala @@ -22,21 +22,22 @@ import java.nio.charset.StandardCharsets._ import com.google.common.io.Files import org.apache.spark._ +import org.apache.spark.internal.config.History._ import org.apache.spark.util.Utils class HistoryServerArgumentsSuite extends SparkFunSuite { private val logDir = new File("src/test/resources/spark-events") private val conf = new SparkConf() - .set("spark.history.fs.logDirectory", logDir.getAbsolutePath) - .set("spark.history.fs.updateInterval", "1") + .set(EVENT_LOG_DIR, logDir.getAbsolutePath) + .set(UPDATE_INTERVAL_S, 1L) .set("spark.testing", "true") test("No Arguments Parsing") { val argStrings = Array.empty[String] val hsa = new HistoryServerArguments(conf, argStrings) - assert(conf.get("spark.history.fs.logDirectory") === logDir.getAbsolutePath) - assert(conf.get("spark.history.fs.updateInterval") === "1") + assert(conf.get(EVENT_LOG_DIR) === logDir.getAbsolutePath) + assert(conf.get(UPDATE_INTERVAL_S) === 1L) assert(conf.get("spark.testing") === "true") } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 2a2d013bacbda..7e32a975d25c7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -78,8 +78,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers Utils.deleteRecursively(storeDir) assert(storeDir.mkdir()) val conf = new SparkConf() - .set("spark.history.fs.logDirectory", logDir) - .set("spark.history.fs.update.interval", "0") + .set(EVENT_LOG_DIR, logDir) + .set(UPDATE_INTERVAL_S.key, "0") .set("spark.testing", "true") .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) .set("spark.eventLog.logStageExecutorMetrics.enabled", "true") @@ -416,11 +416,10 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers // allowed refresh rate (1Hz) stop() val myConf = new SparkConf() - .set("spark.history.fs.logDirectory", logDir.getAbsolutePath) + .set(EVENT_LOG_DIR, logDir.getAbsolutePath) .set("spark.eventLog.dir", logDir.getAbsolutePath) - .set("spark.history.fs.update.interval", "1s") + .set(UPDATE_INTERVAL_S.key, "1s") .set("spark.eventLog.enabled", "true") - .set("spark.history.cache.window", "250ms") .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) .remove("spark.testing") val provider = new FsHistoryProvider(myConf) @@ -613,8 +612,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers stop() init( "spark.ui.filters" -> classOf[FakeAuthFilter].getName(), - "spark.history.ui.acls.enable" -> "true", - "spark.history.ui.admin.acls" -> admin) + UI_ACLS_ENABLE.key -> "true", + UI_ADMIN_ACLS.key -> admin) val tests = Seq( (owner, HttpServletResponse.SC_OK), From a3640178480054dca894102ff7a95e52c8edb15e Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 28 Dec 2018 21:40:44 +0900 Subject: [PATCH 2/4] Revert a change. --- .../apache/spark/deploy/history/HistoryServerArguments.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index dec89769c030b..49f00cb10179e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -79,7 +79,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin | | spark.history.fs.logDirectory Directory where app logs are stored | (default: file:/tmp/spark-events) - | spark.history.fs.update.interval How often to reload log data from storage + | spark.history.fs.updateInterval How often to reload log data from storage | (in seconds, default: 10) |""".stripMargin) // scalastyle:on println From a9208efe24942774c363d963e31fdc99c0d2bb7b Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 28 Dec 2018 21:45:17 +0900 Subject: [PATCH 3/4] Rename. --- .../org/apache/spark/deploy/history/FsHistoryProvider.scala | 2 +- .../main/scala/org/apache/spark/internal/config/History.scala | 2 +- .../apache/spark/deploy/history/FsHistoryProviderSuite.scala | 4 ++-- .../spark/deploy/history/HistoryServerArgumentsSuite.scala | 4 ++-- .../org/apache/spark/deploy/history/HistoryServerSuite.scala | 4 ++-- 5 files changed, 8 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index b5eb2bcaa2f2c..709a380dfb636 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -102,7 +102,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) // Number of threads used to replay event logs. private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS) - private val logDir = conf.get(History.EVENT_LOG_DIR) + private val logDir = conf.get(History.HISTORY_LOG_DIR) private val HISTORY_UI_ACLS_ENABLE = conf.get(History.UI_ACLS_ENABLE) private val HISTORY_UI_ADMIN_ACLS = conf.get(History.UI_ADMIN_ACLS) diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index 860e7529718bf..f984dd385344b 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -25,7 +25,7 @@ private[spark] object History { val DEFAULT_LOG_DIR = "file:/tmp/spark-events" - val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") + val HISTORY_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory") .stringConf .createWithDefault(DEFAULT_LOG_DIR) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 04abf93729c74..6d2e329094ae2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -462,7 +462,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc val maxAge = TimeUnit.SECONDS.toSeconds(40) val clock = new ManualClock(0) val testConf = new SparkConf() - testConf.set(EVENT_LOG_DIR, Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath()) + testConf.set(HISTORY_LOG_DIR, Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath()) testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath()) testConf.set(DRIVER_LOG_CLEANER_ENABLED, true) testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4) @@ -1035,7 +1035,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc private def createTestConf(inMemory: Boolean = false): SparkConf = { val conf = new SparkConf() - .set(EVENT_LOG_DIR, testDir.getAbsolutePath()) + .set(HISTORY_LOG_DIR, testDir.getAbsolutePath()) .set(FAST_IN_PROGRESS_PARSING, true) if (!inMemory) { diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala index 53e92d705bca2..6b479873f69f2 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerArgumentsSuite.scala @@ -29,14 +29,14 @@ class HistoryServerArgumentsSuite extends SparkFunSuite { private val logDir = new File("src/test/resources/spark-events") private val conf = new SparkConf() - .set(EVENT_LOG_DIR, logDir.getAbsolutePath) + .set(HISTORY_LOG_DIR, logDir.getAbsolutePath) .set(UPDATE_INTERVAL_S, 1L) .set("spark.testing", "true") test("No Arguments Parsing") { val argStrings = Array.empty[String] val hsa = new HistoryServerArguments(conf, argStrings) - assert(conf.get(EVENT_LOG_DIR) === logDir.getAbsolutePath) + assert(conf.get(HISTORY_LOG_DIR) === logDir.getAbsolutePath) assert(conf.get(UPDATE_INTERVAL_S) === 1L) assert(conf.get("spark.testing") === "true") } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 7e32a975d25c7..a9dee67ae9383 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -78,7 +78,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers Utils.deleteRecursively(storeDir) assert(storeDir.mkdir()) val conf = new SparkConf() - .set(EVENT_LOG_DIR, logDir) + .set(HISTORY_LOG_DIR, logDir) .set(UPDATE_INTERVAL_S.key, "0") .set("spark.testing", "true") .set(LOCAL_STORE_DIR, storeDir.getAbsolutePath()) @@ -416,7 +416,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers // allowed refresh rate (1Hz) stop() val myConf = new SparkConf() - .set(EVENT_LOG_DIR, logDir.getAbsolutePath) + .set(HISTORY_LOG_DIR, logDir.getAbsolutePath) .set("spark.eventLog.dir", logDir.getAbsolutePath) .set(UPDATE_INTERVAL_S.key, "1s") .set("spark.eventLog.enabled", "true") From 100902f5239d7663c74d7dc15310268e6b45fe2c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sun, 30 Dec 2018 05:43:13 +0900 Subject: [PATCH 4/4] Revert "Revert a change." This reverts commit a3640178480054dca894102ff7a95e52c8edb15e. --- .../apache/spark/deploy/history/HistoryServerArguments.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala index 49f00cb10179e..dec89769c030b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala @@ -79,7 +79,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin | | spark.history.fs.logDirectory Directory where app logs are stored | (default: file:/tmp/spark-events) - | spark.history.fs.updateInterval How often to reload log data from storage + | spark.history.fs.update.interval How often to reload log data from storage | (in seconds, default: 10) |""".stripMargin) // scalastyle:on println