Skip to content

Commit

Permalink
Use ConfigEntry for hardcoded configs for history category.
Browse files Browse the repository at this point in the history
  • Loading branch information
ueshin committed Dec 26, 2018
1 parent 827383a commit 613dad0
Show file tree
Hide file tree
Showing 9 changed files with 87 additions and 45 deletions.
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -682,11 +682,11 @@ private[spark] object SparkConf extends Logging {
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
"spark.executor.userClassPathFirst" -> Seq(
AlternateConfig("spark.files.userClassPathFirst", "1.3")),
"spark.history.fs.update.interval" -> Seq(
UPDATE_INTERVAL_S.key -> Seq(
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
AlternateConfig("spark.history.fs.updateInterval", "1.3"),
AlternateConfig("spark.history.updateInterval", "1.3")),
"spark.history.fs.cleaner.interval" -> Seq(
CLEANER_INTERVAL_S.key -> Seq(
AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
MAX_LOG_AGE_S.key -> Seq(
AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.fusesource.leveldbjni.internal.NativeDB
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR
import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Status._
import org.apache.spark.io.CompressionCodec
Expand Down Expand Up @@ -91,24 +91,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
import FsHistoryProvider._

// Interval between safemode checks.
private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
"spark.history.fs.safemodeCheck.interval", "5s")
private val SAFEMODE_CHECK_INTERVAL_S = conf.get(History.SAFEMODE_CHECK_INTERVAL_S)

// Interval between each check for event log updates
private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
private val UPDATE_INTERVAL_S = conf.get(History.UPDATE_INTERVAL_S)

// Interval between each cleaner checks for event logs to delete
private val CLEAN_INTERVAL_S = conf.get(CLEANER_INTERVAL_S)
private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S)

// Number of threads used to replay event logs.
private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS,
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS)

private val logDir = conf.get(EVENT_LOG_DIR)
private val logDir = conf.get(History.EVENT_LOG_DIR)

private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false)
private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "")
private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get("spark.history.ui.admin.acls.groups", "")
private val HISTORY_UI_ACLS_ENABLE = conf.get(History.UI_ACLS_ENABLE)
private val HISTORY_UI_ADMIN_ACLS = conf.get(History.UI_ADMIN_ACLS)
private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get(History.UI_ADMIN_ACLS_GROUPS)
logInfo(s"History server ui acls " + (if (HISTORY_UI_ACLS_ENABLE) "enabled" else "disabled") +
"; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString +
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)
Expand Down Expand Up @@ -1089,7 +1087,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

private[history] object FsHistoryProvider {
private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"

private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History.HISTORY_SERVER_UI_PORT
import org.apache.spark.internal.config.History
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
Expand All @@ -56,7 +56,7 @@ class HistoryServer(
with Logging with UIRoot with ApplicationCacheOperations {

// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
private val retainedApplications = conf.get(History.RETAINED_APPLICATIONS)

// How many applications the summary ui displays
private[history] val maxApplications = conf.get(HISTORY_UI_MAX_APPS);
Expand Down Expand Up @@ -273,14 +273,14 @@ object HistoryServer extends Logging {
initSecurity()
val securityManager = createSecurityManager(conf)

val providerName = conf.getOption("spark.history.provider")
val providerName = conf.get(History.PROVIDER)
.getOrElse(classOf[FsHistoryProvider].getName())
val provider = Utils.classForName(providerName)
.getConstructor(classOf[SparkConf])
.newInstance(conf)
.asInstanceOf[ApplicationHistoryProvider]

val port = conf.get(HISTORY_SERVER_UI_PORT)
val port = conf.get(History.HISTORY_SERVER_UI_PORT)

val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
Expand Down Expand Up @@ -319,10 +319,12 @@ object HistoryServer extends Logging {
// from a keytab file so that we can access HDFS beyond the kerberos ticket expiration.
// As long as it is using Hadoop rpc (hdfs://), a relogin will automatically
// occur from the keytab.
if (conf.getBoolean("spark.history.kerberos.enabled", false)) {
if (conf.get(History.KERBEROS_ENABLED)) {
// if you have enabled kerberos the following 2 params must be set
val principalName = conf.get("spark.history.kerberos.principal")
val keytabFilename = conf.get("spark.history.kerberos.keytab")
val principalName = conf.get(History.KERBEROS_PRINCIPAL)
.getOrElse(throw new NoSuchElementException(History.KERBEROS_PRINCIPAL.key))
val keytabFilename = conf.get(History.KERBEROS_KEYTAB)
.getOrElse(throw new NoSuchElementException(History.KERBEROS_KEYTAB.key))
SparkHadoopUtil.get.loginUserFromKeytab(principalName, keytabFilename)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin
|
| spark.history.fs.logDirectory Directory where app logs are stored
| (default: file:/tmp/spark-events)
| spark.history.fs.updateInterval How often to reload log data from storage
| spark.history.fs.update.interval How often to reload log data from storage
| (in seconds, default: 10)
|""".stripMargin)
// scalastyle:on println
Expand Down
44 changes: 44 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/History.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ private[spark] object History {
.stringConf
.createWithDefault(DEFAULT_LOG_DIR)

val SAFEMODE_CHECK_INTERVAL_S = ConfigBuilder("spark.history.fs.safemodeCheck.interval")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5s")

val UPDATE_INTERVAL_S = ConfigBuilder("spark.history.fs.update.interval")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")

val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled")
.booleanConf
.createWithDefault(false)
Expand Down Expand Up @@ -79,4 +87,40 @@ private[spark] object History {

val MAX_DRIVER_LOG_AGE_S = ConfigBuilder("spark.history.fs.driverlog.cleaner.maxAge")
.fallbackConf(MAX_LOG_AGE_S)

val UI_ACLS_ENABLE = ConfigBuilder("spark.history.ui.acls.enable")
.booleanConf
.createWithDefault(false)

val UI_ADMIN_ACLS = ConfigBuilder("spark.history.ui.admin.acls")
.stringConf
.createWithDefault("")

val UI_ADMIN_ACLS_GROUPS = ConfigBuilder("spark.history.ui.admin.acls.groups")
.stringConf
.createWithDefault("")

val NUM_REPLAY_THREADS = ConfigBuilder("spark.history.fs.numReplayThreads")
.intConf
.createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)

val RETAINED_APPLICATIONS = ConfigBuilder("spark.history.retainedApplications")
.intConf
.createWithDefault(50)

val PROVIDER = ConfigBuilder("spark.history.provider")
.stringConf
.createOptional

val KERBEROS_ENABLED = ConfigBuilder("spark.history.kerberos.enabled")
.booleanConf
.createWithDefault(false)

val KERBEROS_PRINCIPAL = ConfigBuilder("spark.history.kerberos.principal")
.stringConf
.createOptional

val KERBEROS_KEYTAB = ConfigBuilder("spark.history.kerberos.keytab")
.stringConf
.createOptional
}
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst

test("deprecated configs") {
val conf = new SparkConf()
val newName = "spark.history.fs.update.interval"
val newName = UPDATE_INTERVAL_S.key

assert(!conf.contains(newName))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val maxAge = TimeUnit.SECONDS.toMillis(10)
val clock = new ManualClock(maxAge / 2)
val provider = new FsHistoryProvider(
createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)

val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log1, true, None,
Expand Down Expand Up @@ -379,7 +379,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val maxAge = TimeUnit.SECONDS.toMillis(40)
val clock = new ManualClock(0)
val provider = new FsHistoryProvider(
createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)

val log1 = newLogFile("inProgressApp1", None, inProgress = true)
writeFile(log1, true, None,
Expand Down Expand Up @@ -462,8 +462,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val maxAge = TimeUnit.SECONDS.toSeconds(40)
val clock = new ManualClock(0)
val testConf = new SparkConf()
testConf.set("spark.history.fs.logDirectory",
Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
testConf.set(EVENT_LOG_DIR, Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath())
testConf.set(DRIVER_LOG_CLEANER_ENABLED, true)
testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4)
Expand Down Expand Up @@ -645,9 +644,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

// Test both history ui admin acls and application acls are configured.
val conf1 = createTestConf()
.set("spark.history.ui.acls.enable", "true")
.set("spark.history.ui.admin.acls", "user1,user2")
.set("spark.history.ui.admin.acls.groups", "group1")
.set(UI_ACLS_ENABLE, true)
.set(UI_ADMIN_ACLS, "user1,user2")
.set(UI_ADMIN_ACLS_GROUPS, "group1")
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)

createAndCheck(conf1, ("spark.admin.acls", "user"), ("spark.admin.acls.groups", "group")) {
Expand All @@ -667,9 +666,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

// Test only history ui admin acls are configured.
val conf2 = createTestConf()
.set("spark.history.ui.acls.enable", "true")
.set("spark.history.ui.admin.acls", "user1,user2")
.set("spark.history.ui.admin.acls.groups", "group1")
.set(UI_ACLS_ENABLE, true)
.set(UI_ADMIN_ACLS, "user1,user2")
.set(UI_ADMIN_ACLS_GROUPS, "group1")
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
createAndCheck(conf2) { securityManager =>
// Test whether user has permission to access UI.
Expand All @@ -687,7 +686,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

// Test neither history ui admin acls nor application acls are configured.
val conf3 = createTestConf()
.set("spark.history.ui.acls.enable", "true")
.set(UI_ACLS_ENABLE, true)
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
createAndCheck(conf3) { securityManager =>
// Test whether user has permission to access UI.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,22 @@ import java.nio.charset.StandardCharsets._
import com.google.common.io.Files

import org.apache.spark._
import org.apache.spark.internal.config.History._
import org.apache.spark.util.Utils

class HistoryServerArgumentsSuite extends SparkFunSuite {

private val logDir = new File("src/test/resources/spark-events")
private val conf = new SparkConf()
.set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
.set("spark.history.fs.updateInterval", "1")
.set(EVENT_LOG_DIR, logDir.getAbsolutePath)
.set(UPDATE_INTERVAL_S, 1L)
.set("spark.testing", "true")

test("No Arguments Parsing") {
val argStrings = Array.empty[String]
val hsa = new HistoryServerArguments(conf, argStrings)
assert(conf.get("spark.history.fs.logDirectory") === logDir.getAbsolutePath)
assert(conf.get("spark.history.fs.updateInterval") === "1")
assert(conf.get(EVENT_LOG_DIR) === logDir.getAbsolutePath)
assert(conf.get(UPDATE_INTERVAL_S) === 1L)
assert(conf.get("spark.testing") === "true")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
Utils.deleteRecursively(storeDir)
assert(storeDir.mkdir())
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", logDir)
.set("spark.history.fs.update.interval", "0")
.set(EVENT_LOG_DIR, logDir)
.set(UPDATE_INTERVAL_S.key, "0")
.set("spark.testing", "true")
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.set("spark.eventLog.logStageExecutorMetrics.enabled", "true")
Expand Down Expand Up @@ -416,11 +416,10 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
// allowed refresh rate (1Hz)
stop()
val myConf = new SparkConf()
.set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
.set(EVENT_LOG_DIR, logDir.getAbsolutePath)
.set("spark.eventLog.dir", logDir.getAbsolutePath)
.set("spark.history.fs.update.interval", "1s")
.set(UPDATE_INTERVAL_S.key, "1s")
.set("spark.eventLog.enabled", "true")
.set("spark.history.cache.window", "250ms")
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.remove("spark.testing")
val provider = new FsHistoryProvider(myConf)
Expand Down Expand Up @@ -613,8 +612,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
stop()
init(
"spark.ui.filters" -> classOf[FakeAuthFilter].getName(),
"spark.history.ui.acls.enable" -> "true",
"spark.history.ui.admin.acls" -> admin)
UI_ACLS_ENABLE.key -> "true",
UI_ADMIN_ACLS.key -> admin)

val tests = Seq(
(owner, HttpServletResponse.SC_OK),
Expand Down

0 comments on commit 613dad0

Please sign in to comment.