Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26443][CORE] Use ConfigEntry for hardcoded configs for history category. #23384

Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -682,11 +682,11 @@ private[spark] object SparkConf extends Logging {
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
"spark.executor.userClassPathFirst" -> Seq(
AlternateConfig("spark.files.userClassPathFirst", "1.3")),
"spark.history.fs.update.interval" -> Seq(
UPDATE_INTERVAL_S.key -> Seq(
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
AlternateConfig("spark.history.fs.updateInterval", "1.3"),
AlternateConfig("spark.history.updateInterval", "1.3")),
"spark.history.fs.cleaner.interval" -> Seq(
CLEANER_INTERVAL_S.key -> Seq(
AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
MAX_LOG_AGE_S.key -> Seq(
AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.fusesource.leveldbjni.internal.NativeDB
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR
import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Status._
import org.apache.spark.io.CompressionCodec
Expand Down Expand Up @@ -91,24 +91,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
import FsHistoryProvider._

// Interval between safemode checks.
private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
"spark.history.fs.safemodeCheck.interval", "5s")
private val SAFEMODE_CHECK_INTERVAL_S = conf.get(History.SAFEMODE_CHECK_INTERVAL_S)

// Interval between each check for event log updates
private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
private val UPDATE_INTERVAL_S = conf.get(History.UPDATE_INTERVAL_S)

// Interval between each cleaner checks for event logs to delete
private val CLEAN_INTERVAL_S = conf.get(CLEANER_INTERVAL_S)
private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S)

// Number of threads used to replay event logs.
private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS,
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS)

private val logDir = conf.get(EVENT_LOG_DIR)
private val logDir = conf.get(History.HISTORY_LOG_DIR)

private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false)
private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "")
private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get("spark.history.ui.admin.acls.groups", "")
private val HISTORY_UI_ACLS_ENABLE = conf.get(History.UI_ACLS_ENABLE)
private val HISTORY_UI_ADMIN_ACLS = conf.get(History.UI_ADMIN_ACLS)
private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get(History.UI_ADMIN_ACLS_GROUPS)
logInfo(s"History server ui acls " + (if (HISTORY_UI_ACLS_ENABLE) "enabled" else "disabled") +
"; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString +
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)
Expand Down Expand Up @@ -1089,7 +1087,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

private[history] object FsHistoryProvider {
private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"

private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History.HISTORY_SERVER_UI_PORT
import org.apache.spark.internal.config.History
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
Expand All @@ -56,7 +56,7 @@ class HistoryServer(
with Logging with UIRoot with ApplicationCacheOperations {

// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
private val retainedApplications = conf.get(History.RETAINED_APPLICATIONS)

// How many applications the summary ui displays
private[history] val maxApplications = conf.get(HISTORY_UI_MAX_APPS);
Expand Down Expand Up @@ -273,14 +273,14 @@ object HistoryServer extends Logging {
initSecurity()
val securityManager = createSecurityManager(conf)

val providerName = conf.getOption("spark.history.provider")
val providerName = conf.get(History.PROVIDER)
.getOrElse(classOf[FsHistoryProvider].getName())
val provider = Utils.classForName(providerName)
.getConstructor(classOf[SparkConf])
.newInstance(conf)
.asInstanceOf[ApplicationHistoryProvider]

val port = conf.get(HISTORY_SERVER_UI_PORT)
val port = conf.get(History.HISTORY_SERVER_UI_PORT)

val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
Expand Down Expand Up @@ -319,10 +319,12 @@ object HistoryServer extends Logging {
// from a keytab file so that we can access HDFS beyond the kerberos ticket expiration.
// As long as it is using Hadoop rpc (hdfs://), a relogin will automatically
// occur from the keytab.
if (conf.getBoolean("spark.history.kerberos.enabled", false)) {
if (conf.get(History.KERBEROS_ENABLED)) {
// if you have enabled kerberos the following 2 params must be set
val principalName = conf.get("spark.history.kerberos.principal")
val keytabFilename = conf.get("spark.history.kerberos.keytab")
val principalName = conf.get(History.KERBEROS_PRINCIPAL)
.getOrElse(throw new NoSuchElementException(History.KERBEROS_PRINCIPAL.key))
val keytabFilename = conf.get(History.KERBEROS_KEYTAB)
.getOrElse(throw new NoSuchElementException(History.KERBEROS_KEYTAB.key))
SparkHadoopUtil.get.loginUserFromKeytab(principalName, keytabFilename)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,18 @@ private[spark] object History {

val DEFAULT_LOG_DIR = "file:/tmp/spark-events"

val EVENT_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
val HISTORY_LOG_DIR = ConfigBuilder("spark.history.fs.logDirectory")
.stringConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but EVENT_LOG_DIR doesn't seem a great name to me for this property, as we have a spark.eventLog.dir. Shall we update it to HISTORY_LOG_DIR or something similar in order to disambiguate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with the name HISTORY_LOG_DIR. Let me update it.
cc @HyukjinKwon @srowen @dongjoon-hyun @vanzin

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. HISTORY_LOG_DIR sounds good to me, too.

.createWithDefault(DEFAULT_LOG_DIR)

val SAFEMODE_CHECK_INTERVAL_S = ConfigBuilder("spark.history.fs.safemodeCheck.interval")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5s")

val UPDATE_INTERVAL_S = ConfigBuilder("spark.history.fs.update.interval")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")

val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled")
.booleanConf
.createWithDefault(false)
Expand Down Expand Up @@ -79,4 +87,40 @@ private[spark] object History {

val MAX_DRIVER_LOG_AGE_S = ConfigBuilder("spark.history.fs.driverlog.cleaner.maxAge")
.fallbackConf(MAX_LOG_AGE_S)

val UI_ACLS_ENABLE = ConfigBuilder("spark.history.ui.acls.enable")
.booleanConf
.createWithDefault(false)

val UI_ADMIN_ACLS = ConfigBuilder("spark.history.ui.admin.acls")
.stringConf
.createWithDefault("")

val UI_ADMIN_ACLS_GROUPS = ConfigBuilder("spark.history.ui.admin.acls.groups")
.stringConf
.createWithDefault("")

val NUM_REPLAY_THREADS = ConfigBuilder("spark.history.fs.numReplayThreads")
.intConf
.createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)

val RETAINED_APPLICATIONS = ConfigBuilder("spark.history.retainedApplications")
.intConf
.createWithDefault(50)

val PROVIDER = ConfigBuilder("spark.history.provider")
.stringConf
.createOptional

val KERBEROS_ENABLED = ConfigBuilder("spark.history.kerberos.enabled")
.booleanConf
.createWithDefault(false)

val KERBEROS_PRINCIPAL = ConfigBuilder("spark.history.kerberos.principal")
.stringConf
.createOptional

val KERBEROS_KEYTAB = ConfigBuilder("spark.history.kerberos.keytab")
.stringConf
.createOptional
}
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst

test("deprecated configs") {
val conf = new SparkConf()
val newName = "spark.history.fs.update.interval"
val newName = UPDATE_INTERVAL_S.key

assert(!conf.contains(newName))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val maxAge = TimeUnit.SECONDS.toMillis(10)
val clock = new ManualClock(maxAge / 2)
val provider = new FsHistoryProvider(
createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)

val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log1, true, None,
Expand Down Expand Up @@ -379,7 +379,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val maxAge = TimeUnit.SECONDS.toMillis(40)
val clock = new ManualClock(0)
val provider = new FsHistoryProvider(
createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)

val log1 = newLogFile("inProgressApp1", None, inProgress = true)
writeFile(log1, true, None,
Expand Down Expand Up @@ -462,8 +462,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val maxAge = TimeUnit.SECONDS.toSeconds(40)
val clock = new ManualClock(0)
val testConf = new SparkConf()
testConf.set("spark.history.fs.logDirectory",
Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
testConf.set(HISTORY_LOG_DIR, Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath())
testConf.set(DRIVER_LOG_CLEANER_ENABLED, true)
testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4)
Expand Down Expand Up @@ -645,9 +644,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

// Test both history ui admin acls and application acls are configured.
val conf1 = createTestConf()
.set("spark.history.ui.acls.enable", "true")
.set("spark.history.ui.admin.acls", "user1,user2")
.set("spark.history.ui.admin.acls.groups", "group1")
.set(UI_ACLS_ENABLE, true)
.set(UI_ADMIN_ACLS, "user1,user2")
.set(UI_ADMIN_ACLS_GROUPS, "group1")
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)

createAndCheck(conf1, ("spark.admin.acls", "user"), ("spark.admin.acls.groups", "group")) {
Expand All @@ -667,9 +666,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

// Test only history ui admin acls are configured.
val conf2 = createTestConf()
.set("spark.history.ui.acls.enable", "true")
.set("spark.history.ui.admin.acls", "user1,user2")
.set("spark.history.ui.admin.acls.groups", "group1")
.set(UI_ACLS_ENABLE, true)
.set(UI_ADMIN_ACLS, "user1,user2")
.set(UI_ADMIN_ACLS_GROUPS, "group1")
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
createAndCheck(conf2) { securityManager =>
// Test whether user has permission to access UI.
Expand All @@ -687,7 +686,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

// Test neither history ui admin acls nor application acls are configured.
val conf3 = createTestConf()
.set("spark.history.ui.acls.enable", "true")
.set(UI_ACLS_ENABLE, true)
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
createAndCheck(conf3) { securityManager =>
// Test whether user has permission to access UI.
Expand Down Expand Up @@ -1036,7 +1035,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

private def createTestConf(inMemory: Boolean = false): SparkConf = {
val conf = new SparkConf()
.set(EVENT_LOG_DIR, testDir.getAbsolutePath())
.set(HISTORY_LOG_DIR, testDir.getAbsolutePath())
.set(FAST_IN_PROGRESS_PARSING, true)

if (!inMemory) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,22 @@ import java.nio.charset.StandardCharsets._
import com.google.common.io.Files

import org.apache.spark._
import org.apache.spark.internal.config.History._
import org.apache.spark.util.Utils

class HistoryServerArgumentsSuite extends SparkFunSuite {

private val logDir = new File("src/test/resources/spark-events")
private val conf = new SparkConf()
.set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
.set("spark.history.fs.updateInterval", "1")
.set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
.set(UPDATE_INTERVAL_S, 1L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.set("spark.testing", "true")

test("No Arguments Parsing") {
val argStrings = Array.empty[String]
val hsa = new HistoryServerArguments(conf, argStrings)
assert(conf.get("spark.history.fs.logDirectory") === logDir.getAbsolutePath)
assert(conf.get("spark.history.fs.updateInterval") === "1")
assert(conf.get(HISTORY_LOG_DIR) === logDir.getAbsolutePath)
assert(conf.get(UPDATE_INTERVAL_S) === 1L)
assert(conf.get("spark.testing") === "true")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
Utils.deleteRecursively(storeDir)
assert(storeDir.mkdir())
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", logDir)
.set("spark.history.fs.update.interval", "0")
.set(HISTORY_LOG_DIR, logDir)
.set(UPDATE_INTERVAL_S.key, "0")
.set("spark.testing", "true")
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.set("spark.eventLog.logStageExecutorMetrics.enabled", "true")
Expand Down Expand Up @@ -416,11 +416,10 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
// allowed refresh rate (1Hz)
stop()
val myConf = new SparkConf()
.set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
.set(HISTORY_LOG_DIR, logDir.getAbsolutePath)
.set("spark.eventLog.dir", logDir.getAbsolutePath)
.set("spark.history.fs.update.interval", "1s")
.set(UPDATE_INTERVAL_S.key, "1s")
.set("spark.eventLog.enabled", "true")
.set("spark.history.cache.window", "250ms")
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.remove("spark.testing")
val provider = new FsHistoryProvider(myConf)
Expand Down Expand Up @@ -613,8 +612,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
stop()
init(
"spark.ui.filters" -> classOf[FakeAuthFilter].getName(),
"spark.history.ui.acls.enable" -> "true",
"spark.history.ui.admin.acls" -> admin)
UI_ACLS_ENABLE.key -> "true",
UI_ADMIN_ACLS.key -> admin)

val tests = Seq(
(owner, HttpServletResponse.SC_OK),
Expand Down