Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26443][CORE] Use ConfigEntry for hardcoded configs for history category. #23384

Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -682,11 +682,11 @@ private[spark] object SparkConf extends Logging {
private val configsWithAlternatives = Map[String, Seq[AlternateConfig]](
"spark.executor.userClassPathFirst" -> Seq(
AlternateConfig("spark.files.userClassPathFirst", "1.3")),
"spark.history.fs.update.interval" -> Seq(
UPDATE_INTERVAL_S.key -> Seq(
AlternateConfig("spark.history.fs.update.interval.seconds", "1.4"),
AlternateConfig("spark.history.fs.updateInterval", "1.3"),
AlternateConfig("spark.history.updateInterval", "1.3")),
"spark.history.fs.cleaner.interval" -> Seq(
CLEANER_INTERVAL_S.key -> Seq(
AlternateConfig("spark.history.fs.cleaner.interval.seconds", "1.4")),
MAX_LOG_AGE_S.key -> Seq(
AlternateConfig("spark.history.fs.cleaner.maxAge.seconds", "1.4")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ import org.fusesource.leveldbjni.internal.NativeDB
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DRIVER_LOG_DFS_DIR
import org.apache.spark.internal.config.{DRIVER_LOG_DFS_DIR, History}
import org.apache.spark.internal.config.History._
import org.apache.spark.internal.config.Status._
import org.apache.spark.io.CompressionCodec
Expand Down Expand Up @@ -91,24 +91,22 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
import FsHistoryProvider._

// Interval between safemode checks.
private val SAFEMODE_CHECK_INTERVAL_S = conf.getTimeAsSeconds(
"spark.history.fs.safemodeCheck.interval", "5s")
private val SAFEMODE_CHECK_INTERVAL_S = conf.get(History.SAFEMODE_CHECK_INTERVAL_S)

// Interval between each check for event log updates
private val UPDATE_INTERVAL_S = conf.getTimeAsSeconds("spark.history.fs.update.interval", "10s")
private val UPDATE_INTERVAL_S = conf.get(History.UPDATE_INTERVAL_S)

// Interval between each cleaner checks for event logs to delete
private val CLEAN_INTERVAL_S = conf.get(CLEANER_INTERVAL_S)
private val CLEAN_INTERVAL_S = conf.get(History.CLEANER_INTERVAL_S)

// Number of threads used to replay event logs.
private val NUM_PROCESSING_THREADS = conf.getInt(SPARK_HISTORY_FS_NUM_REPLAY_THREADS,
Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)
private val NUM_PROCESSING_THREADS = conf.get(History.NUM_REPLAY_THREADS)

private val logDir = conf.get(EVENT_LOG_DIR)
private val logDir = conf.get(History.EVENT_LOG_DIR)

private val HISTORY_UI_ACLS_ENABLE = conf.getBoolean("spark.history.ui.acls.enable", false)
private val HISTORY_UI_ADMIN_ACLS = conf.get("spark.history.ui.admin.acls", "")
private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get("spark.history.ui.admin.acls.groups", "")
private val HISTORY_UI_ACLS_ENABLE = conf.get(History.UI_ACLS_ENABLE)
private val HISTORY_UI_ADMIN_ACLS = conf.get(History.UI_ADMIN_ACLS)
private val HISTORY_UI_ADMIN_ACLS_GROUPS = conf.get(History.UI_ADMIN_ACLS_GROUPS)
logInfo(s"History server ui acls " + (if (HISTORY_UI_ACLS_ENABLE) "enabled" else "disabled") +
"; users with admin permissions: " + HISTORY_UI_ADMIN_ACLS.toString +
"; groups with admin permissions" + HISTORY_UI_ADMIN_ACLS_GROUPS.toString)
Expand Down Expand Up @@ -1089,7 +1087,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}

private[history] object FsHistoryProvider {
private val SPARK_HISTORY_FS_NUM_REPLAY_THREADS = "spark.history.fs.numReplayThreads"

private val APPL_START_EVENT_PREFIX = "{\"Event\":\"SparkListenerApplicationStart\""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.History.HISTORY_SERVER_UI_PORT
import org.apache.spark.internal.config.History
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationInfo, UIRoot}
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
import org.apache.spark.ui.JettyUtils._
Expand All @@ -56,7 +56,7 @@ class HistoryServer(
with Logging with UIRoot with ApplicationCacheOperations {

// How many applications to retain
private val retainedApplications = conf.getInt("spark.history.retainedApplications", 50)
private val retainedApplications = conf.get(History.RETAINED_APPLICATIONS)

// How many applications the summary ui displays
private[history] val maxApplications = conf.get(HISTORY_UI_MAX_APPS);
Expand Down Expand Up @@ -273,14 +273,14 @@ object HistoryServer extends Logging {
initSecurity()
val securityManager = createSecurityManager(conf)

val providerName = conf.getOption("spark.history.provider")
val providerName = conf.get(History.PROVIDER)
.getOrElse(classOf[FsHistoryProvider].getName())
val provider = Utils.classForName(providerName)
.getConstructor(classOf[SparkConf])
.newInstance(conf)
.asInstanceOf[ApplicationHistoryProvider]

val port = conf.get(HISTORY_SERVER_UI_PORT)
val port = conf.get(History.HISTORY_SERVER_UI_PORT)

val server = new HistoryServer(conf, provider, securityManager, port)
server.bind()
Expand Down Expand Up @@ -319,10 +319,12 @@ object HistoryServer extends Logging {
// from a keytab file so that we can access HDFS beyond the kerberos ticket expiration.
// As long as it is using Hadoop rpc (hdfs://), a relogin will automatically
// occur from the keytab.
if (conf.getBoolean("spark.history.kerberos.enabled", false)) {
if (conf.get(History.KERBEROS_ENABLED)) {
// if you have enabled kerberos the following 2 params must be set
val principalName = conf.get("spark.history.kerberos.principal")
val keytabFilename = conf.get("spark.history.kerberos.keytab")
val principalName = conf.get(History.KERBEROS_PRINCIPAL)
.getOrElse(throw new NoSuchElementException(History.KERBEROS_PRINCIPAL.key))
val keytabFilename = conf.get(History.KERBEROS_KEYTAB)
.getOrElse(throw new NoSuchElementException(History.KERBEROS_KEYTAB.key))
SparkHadoopUtil.get.loginUserFromKeytab(principalName, keytabFilename)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ private[history] class HistoryServerArguments(conf: SparkConf, args: Array[Strin
|
| spark.history.fs.logDirectory Directory where app logs are stored
| (default: file:/tmp/spark-events)
| spark.history.fs.updateInterval How often to reload log data from storage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to change this? Seems this pattern is quite used too...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like spark.history.fs.updateInterval is deprecated, but let me revert it for now.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur, it is good to remove the deprecated one as many as possible in Spark 3.0. When I searched this string during reviews, there were only two instances; here and deprecation checking test case. @mgaido91 , what do you mean by quite used? We already accepted this deprecated one and new one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see only now that it was deprecated, sorry. Then, this is ok, sorry for the comment.

@dongjoon-hyun I meant that camel case notation for the last element is quite widespread also for other configs, so I didn't see the need for changing the notation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then should I update to include this change back?

Copy link
Member

@HyukjinKwon HyukjinKwon Dec 29, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's okie. It's deprecated and we should get rid of it in the doc basically. I see no issue. Yea, I think it can be included back. LGTM without that change as well.

| spark.history.fs.update.interval How often to reload log data from storage
| (in seconds, default: 10)
|""".stripMargin)
// scalastyle:on println
Expand Down
44 changes: 44 additions & 0 deletions core/src/main/scala/org/apache/spark/internal/config/History.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ private[spark] object History {
.stringConf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not related to this PR, but EVENT_LOG_DIR doesn't seem a great name to me for this property, as we have a spark.eventLog.dir. Shall we update it to HISTORY_LOG_DIR or something similar in order to disambiguate?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with the name HISTORY_LOG_DIR. Let me update it.
cc @HyukjinKwon @srowen @dongjoon-hyun @vanzin

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep. HISTORY_LOG_DIR sounds good to me, too.

.createWithDefault(DEFAULT_LOG_DIR)

val SAFEMODE_CHECK_INTERVAL_S = ConfigBuilder("spark.history.fs.safemodeCheck.interval")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("5s")

val UPDATE_INTERVAL_S = ConfigBuilder("spark.history.fs.update.interval")
.timeConf(TimeUnit.SECONDS)
.createWithDefaultString("10s")

val CLEANER_ENABLED = ConfigBuilder("spark.history.fs.cleaner.enabled")
.booleanConf
.createWithDefault(false)
Expand Down Expand Up @@ -79,4 +87,40 @@ private[spark] object History {

val MAX_DRIVER_LOG_AGE_S = ConfigBuilder("spark.history.fs.driverlog.cleaner.maxAge")
.fallbackConf(MAX_LOG_AGE_S)

val UI_ACLS_ENABLE = ConfigBuilder("spark.history.ui.acls.enable")
.booleanConf
.createWithDefault(false)

val UI_ADMIN_ACLS = ConfigBuilder("spark.history.ui.admin.acls")
.stringConf
.createWithDefault("")

val UI_ADMIN_ACLS_GROUPS = ConfigBuilder("spark.history.ui.admin.acls.groups")
.stringConf
.createWithDefault("")

val NUM_REPLAY_THREADS = ConfigBuilder("spark.history.fs.numReplayThreads")
.intConf
.createWithDefaultFunction(() => Math.ceil(Runtime.getRuntime.availableProcessors() / 4f).toInt)

val RETAINED_APPLICATIONS = ConfigBuilder("spark.history.retainedApplications")
.intConf
.createWithDefault(50)

val PROVIDER = ConfigBuilder("spark.history.provider")
.stringConf
.createOptional

val KERBEROS_ENABLED = ConfigBuilder("spark.history.kerberos.enabled")
.booleanConf
.createWithDefault(false)

val KERBEROS_PRINCIPAL = ConfigBuilder("spark.history.kerberos.principal")
.stringConf
.createOptional

val KERBEROS_KEYTAB = ConfigBuilder("spark.history.kerberos.keytab")
.stringConf
.createOptional
}
2 changes: 1 addition & 1 deletion core/src/test/scala/org/apache/spark/SparkConfSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst

test("deprecated configs") {
val conf = new SparkConf()
val newName = "spark.history.fs.update.interval"
val newName = UPDATE_INTERVAL_S.key

assert(!conf.contains(newName))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val maxAge = TimeUnit.SECONDS.toMillis(10)
val clock = new ManualClock(maxAge / 2)
val provider = new FsHistoryProvider(
createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)

val log1 = newLogFile("app1", Some("attempt1"), inProgress = false)
writeFile(log1, true, None,
Expand Down Expand Up @@ -379,7 +379,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val maxAge = TimeUnit.SECONDS.toMillis(40)
val clock = new ManualClock(0)
val provider = new FsHistoryProvider(
createTestConf().set("spark.history.fs.cleaner.maxAge", s"${maxAge}ms"), clock)
createTestConf().set(MAX_LOG_AGE_S.key, s"${maxAge}ms"), clock)

val log1 = newLogFile("inProgressApp1", None, inProgress = true)
writeFile(log1, true, None,
Expand Down Expand Up @@ -462,8 +462,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc
val maxAge = TimeUnit.SECONDS.toSeconds(40)
val clock = new ManualClock(0)
val testConf = new SparkConf()
testConf.set("spark.history.fs.logDirectory",
Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
testConf.set(EVENT_LOG_DIR, Utils.createTempDir(namePrefix = "eventLog").getAbsolutePath())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

testConf.set(DRIVER_LOG_DFS_DIR, testDir.getAbsolutePath())
testConf.set(DRIVER_LOG_CLEANER_ENABLED, true)
testConf.set(DRIVER_LOG_CLEANER_INTERVAL, maxAge / 4)
Expand Down Expand Up @@ -645,9 +644,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

// Test both history ui admin acls and application acls are configured.
val conf1 = createTestConf()
.set("spark.history.ui.acls.enable", "true")
.set("spark.history.ui.admin.acls", "user1,user2")
.set("spark.history.ui.admin.acls.groups", "group1")
.set(UI_ACLS_ENABLE, true)
.set(UI_ADMIN_ACLS, "user1,user2")
.set(UI_ADMIN_ACLS_GROUPS, "group1")
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)

createAndCheck(conf1, ("spark.admin.acls", "user"), ("spark.admin.acls.groups", "group")) {
Expand All @@ -667,9 +666,9 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

// Test only history ui admin acls are configured.
val conf2 = createTestConf()
.set("spark.history.ui.acls.enable", "true")
.set("spark.history.ui.admin.acls", "user1,user2")
.set("spark.history.ui.admin.acls.groups", "group1")
.set(UI_ACLS_ENABLE, true)
.set(UI_ADMIN_ACLS, "user1,user2")
.set(UI_ADMIN_ACLS_GROUPS, "group1")
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
createAndCheck(conf2) { securityManager =>
// Test whether user has permission to access UI.
Expand All @@ -687,7 +686,7 @@ class FsHistoryProviderSuite extends SparkFunSuite with BeforeAndAfter with Matc

// Test neither history ui admin acls nor application acls are configured.
val conf3 = createTestConf()
.set("spark.history.ui.acls.enable", "true")
.set(UI_ACLS_ENABLE, true)
.set("spark.user.groups.mapping", classOf[TestGroupsMappingProvider].getName)
createAndCheck(conf3) { securityManager =>
// Test whether user has permission to access UI.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,22 @@ import java.nio.charset.StandardCharsets._
import com.google.common.io.Files

import org.apache.spark._
import org.apache.spark.internal.config.History._
import org.apache.spark.util.Utils

class HistoryServerArgumentsSuite extends SparkFunSuite {

private val logDir = new File("src/test/resources/spark-events")
private val conf = new SparkConf()
.set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
.set("spark.history.fs.updateInterval", "1")
.set(EVENT_LOG_DIR, logDir.getAbsolutePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: .key?

.set(UPDATE_INTERVAL_S, 1L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.set("spark.testing", "true")

test("No Arguments Parsing") {
val argStrings = Array.empty[String]
val hsa = new HistoryServerArguments(conf, argStrings)
assert(conf.get("spark.history.fs.logDirectory") === logDir.getAbsolutePath)
assert(conf.get("spark.history.fs.updateInterval") === "1")
assert(conf.get(EVENT_LOG_DIR) === logDir.getAbsolutePath)
assert(conf.get(UPDATE_INTERVAL_S) === 1L)
assert(conf.get("spark.testing") === "true")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
Utils.deleteRecursively(storeDir)
assert(storeDir.mkdir())
val conf = new SparkConf()
.set("spark.history.fs.logDirectory", logDir)
.set("spark.history.fs.update.interval", "0")
.set(EVENT_LOG_DIR, logDir)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EVENT_LOG_DIR.key?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use the config entry as a key directly except for configs the string expression of which is more readable like timeConf or bytesConf. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually I am fine with both, this was rather a nit, I just meant to have one notation instead of both them if possible, in order to avoid confusion. But if you prefer this way, I am fine with it, thanks.

.set(UPDATE_INTERVAL_S.key, "0")
.set("spark.testing", "true")
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.set("spark.eventLog.logStageExecutorMetrics.enabled", "true")
Expand Down Expand Up @@ -416,11 +416,10 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
// allowed refresh rate (1Hz)
stop()
val myConf = new SparkConf()
.set("spark.history.fs.logDirectory", logDir.getAbsolutePath)
.set(EVENT_LOG_DIR, logDir.getAbsolutePath)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

.set("spark.eventLog.dir", logDir.getAbsolutePath)
.set("spark.history.fs.update.interval", "1s")
.set(UPDATE_INTERVAL_S.key, "1s")
.set("spark.eventLog.enabled", "true")
.set("spark.history.cache.window", "250ms")
.set(LOCAL_STORE_DIR, storeDir.getAbsolutePath())
.remove("spark.testing")
val provider = new FsHistoryProvider(myConf)
Expand Down Expand Up @@ -613,8 +612,8 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
stop()
init(
"spark.ui.filters" -> classOf[FakeAuthFilter].getName(),
"spark.history.ui.acls.enable" -> "true",
"spark.history.ui.admin.acls" -> admin)
UI_ACLS_ENABLE.key -> "true",
UI_ADMIN_ACLS.key -> admin)

val tests = Seq(
(owner, HttpServletResponse.SC_OK),
Expand Down