Skip to content

Commit

Permalink
[SPARK-48623][CORE] Structured logging migrations [Part 3]
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR makes additional Scala logging migrations to comply with the scala style changes in #46947

### Why are the changes needed?
This makes development and PR review of the structured logging migration easier.

### Does this PR introduce any user-facing change?
No

### How was this patch tested?
Tested by ensuring dev/scalastyle checks pass

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #47275 from asl3/formatstructuredlogmigrations.

Lead-authored-by: Amanda Liu <amanda.liu@databricks.com>
Co-authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
asl3 and gengliangwang committed Jul 11, 2024
1 parent 4501285 commit 71cf25e
Show file tree
Hide file tree
Showing 17 changed files with 81 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private[spark] object LogKeys {
case object BLOCK_TYPE extends LogKey
case object BOOT extends LogKey
case object BOOTSTRAP_TIME extends LogKey
case object BOOT_TIME extends LogKey
case object BROADCAST extends LogKey
case object BROADCAST_ID extends LogKey
case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey
Expand All @@ -110,6 +111,7 @@ private[spark] object LogKeys {
case object BYTE_SIZE extends LogKey
case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey
case object CACHE_AUTO_REMOVED_SIZE extends LogKey
case object CACHE_SIZE extends LogKey
case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey
case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey
case object CALL_SITE_LONG_FORM extends LogKey
Expand Down Expand Up @@ -282,6 +284,7 @@ private[spark] object LogKeys {
case object FINAL_CONTEXT extends LogKey
case object FINAL_OUTPUT_PATH extends LogKey
case object FINAL_PATH extends LogKey
case object FINISH_TIME extends LogKey
case object FINISH_TRIGGER_DURATION extends LogKey
case object FREE_MEMORY_SIZE extends LogKey
case object FROM_OFFSET extends LogKey
Expand Down Expand Up @@ -320,10 +323,12 @@ private[spark] object LogKeys {
case object INITIAL_CAPACITY extends LogKey
case object INITIAL_HEARTBEAT_INTERVAL extends LogKey
case object INIT_MODE extends LogKey
case object INIT_TIME extends LogKey
case object INPUT extends LogKey
case object INPUT_SPLIT extends LogKey
case object INTEGRAL extends LogKey
case object INTERVAL extends LogKey
case object INVALID_PARAMS extends LogKey
case object ISOLATION_LEVEL extends LogKey
case object ISSUE_DATE extends LogKey
case object IS_NETWORK_REQUEST_DONE extends LogKey
Expand Down Expand Up @@ -369,6 +374,7 @@ private[spark] object LogKeys {
case object LOG_LEVEL extends LogKey
case object LOG_OFFSET extends LogKey
case object LOG_TYPE extends LogKey
case object LOSSES extends LogKey
case object LOWER_BOUND extends LogKey
case object MALFORMATTED_STRING extends LogKey
case object MAP_ID extends LogKey
Expand Down Expand Up @@ -566,6 +572,7 @@ private[spark] object LogKeys {
case object OS_NAME extends LogKey
case object OS_VERSION extends LogKey
case object OUTPUT extends LogKey
case object OUTPUT_BUFFER extends LogKey
case object OVERHEAD_MEMORY_SIZE extends LogKey
case object PAGE_SIZE extends LogKey
case object PARENT_STAGES extends LogKey
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,8 +650,9 @@ private[spark] object MavenUtils extends Logging {
val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq
if (invalidParams.nonEmpty) {
logWarning(
s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " +
s"in Ivy URI query `$uriQuery`.")
log"Invalid parameters `${MDC(LogKeys.INVALID_PARAMS,
invalidParams.sorted.mkString(","))}` " +
log"found in Ivy URI query `${MDC(LogKeys.URI, uriQuery)}`.")
}

(transitive, exclusionList, repos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ import com.google.common.cache.{Cache, CacheBuilder}
import org.apache.spark.{SparkEnv, SparkException, SparkSQLException}
import org.apache.spark.api.python.PythonFunction.PythonAccumulator
import org.apache.spark.connect.proto
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down Expand Up @@ -63,9 +62,11 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
private lazy val planCache: Option[Cache[proto.Relation, LogicalPlan]] = {
if (SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE) <= 0) {
logWarning(
s"Session plan cache is disabled due to non-positive cache size." +
s" Current value of '${Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key}' is" +
s" ${SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE)}.")
log"Session plan cache is disabled due to non-positive cache size." +
log" Current value of " +
log"'${MDC(LogKeys.CONFIG, Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is ${MDC(
LogKeys.CACHE_SIZE,
SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}")
None
} else {
Some(
Expand Down Expand Up @@ -248,15 +249,17 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
private[connect] def updateAccessTime(): Unit = {
lastAccessTimeMs = System.currentTimeMillis()
logInfo(
log"Session ${MDC(SESSION_KEY, key)} accessed, " +
log"time ${MDC(LAST_ACCESS_TIME, lastAccessTimeMs)} ms.")
log"Session with userId: ${MDC(LogKeys.USER_ID, userId)} and " +
log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)} accessed," +
log"time ${MDC(LogKeys.LAST_ACCESS_TIME, lastAccessTimeMs)} ms.")
}

private[connect] def setCustomInactiveTimeoutMs(newInactiveTimeoutMs: Option[Long]): Unit = {
customInactiveTimeoutMs = newInactiveTimeoutMs
logInfo(
log"Session ${MDC(SESSION_KEY, key)} " +
log"inactive timeout set to ${MDC(TIMEOUT, customInactiveTimeoutMs)} ms.")
log"Session with userId: ${MDC(LogKeys.USER_ID, userId)} and " +
log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)} inactive timeout set to " +
log"${MDC(LogKeys.TIMEOUT, customInactiveTimeoutMs)} ms")
}

/**
Expand All @@ -282,8 +285,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
throw new IllegalStateException(s"Session $key is already closed.")
}
logInfo(
log"Closing session with userId: ${MDC(USER_ID, userId)} and " +
log"sessionId: ${MDC(SESSION_ID, sessionId)}")
log"Closing session with userId: ${MDC(LogKeys.USER_ID, userId)} and " +
log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}")
closedTimeMs = Some(System.currentTimeMillis())

if (Utils.isTesting && eventManager.status == SessionStatus.Pending) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.connect.proto.StreamingQueryEventType
import org.apache.spark.connect.proto.StreamingQueryListenerEvent
import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.util.ArrayImplicits._

Expand Down Expand Up @@ -131,10 +131,10 @@ private[sql] class SparkConnectListenerBusListener(
.build())
} catch {
case NonFatal(e) =>
logError(
s"[SessionId: ${sessionHolder.sessionId}][UserId: ${sessionHolder.userId}] " +
s"Removing SparkConnectListenerBusListener and terminating the long-running thread " +
s"because of exception: $e")
logError(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}]" +
log"[UserId: ${MDC(LogKeys.USER_ID, sessionHolder.userId)}] " +
log"Removing SparkConnectListenerBusListener and terminating the long-running thread " +
log"because of exception: ${MDC(LogKeys.EXCEPTION, e)}")
// This likely means that the client is not responsive even with retry, we should
// remove this listener and cleanup resources.
serverSideListenerHolder.cleanUp()
Expand Down
13 changes: 7 additions & 6 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private[spark] class SecurityManager(
*/
def setViewAcls(defaultUsers: Set[String], allowedUsers: Seq[String]): Unit = {
viewAcls = adminAcls ++ defaultUsers ++ allowedUsers
logInfo("Changing view acls to: " + viewAcls.mkString(","))
logInfo(log"Changing view acls to: ${MDC(LogKeys.VIEW_ACLS, viewAcls.mkString(","))}")
}

def setViewAcls(defaultUser: String, allowedUsers: Seq[String]): Unit = {
Expand All @@ -135,7 +135,7 @@ private[spark] class SecurityManager(
*/
def setViewAclsGroups(allowedUserGroups: Seq[String]): Unit = {
viewAclsGroups = adminAclsGroups ++ allowedUserGroups
logInfo("Changing view acls groups to: " + viewAclsGroups.mkString(","))
logInfo(log"Changing view acls groups to: ${MDC(LogKeys.VIEW_ACLS, viewAcls.mkString(","))}")
}

/**
Expand Down Expand Up @@ -163,7 +163,7 @@ private[spark] class SecurityManager(
*/
def setModifyAcls(defaultUsers: Set[String], allowedUsers: Seq[String]): Unit = {
modifyAcls = adminAcls ++ defaultUsers ++ allowedUsers
logInfo("Changing modify acls to: " + modifyAcls.mkString(","))
logInfo(log"Changing modify acls to: ${MDC(LogKeys.MODIFY_ACLS, modifyAcls.mkString(","))}")
}

/**
Expand All @@ -172,7 +172,8 @@ private[spark] class SecurityManager(
*/
def setModifyAclsGroups(allowedUserGroups: Seq[String]): Unit = {
modifyAclsGroups = adminAclsGroups ++ allowedUserGroups
logInfo("Changing modify acls groups to: " + modifyAclsGroups.mkString(","))
logInfo(log"Changing modify acls groups to: ${MDC(LogKeys.MODIFY_ACLS,
modifyAcls.mkString(","))}")
}

/**
Expand Down Expand Up @@ -200,7 +201,7 @@ private[spark] class SecurityManager(
*/
def setAdminAcls(adminUsers: Seq[String]): Unit = {
adminAcls = adminUsers.toSet
logInfo("Changing admin acls to: " + adminAcls.mkString(","))
logInfo(log"Changing admin acls to: ${MDC(LogKeys.ADMIN_ACLS, adminAcls.mkString(","))}")
}

/**
Expand All @@ -209,7 +210,7 @@ private[spark] class SecurityManager(
*/
def setAdminAclsGroups(adminUserGroups: Seq[String]): Unit = {
adminAclsGroups = adminUserGroups.toSet
logInfo("Changing admin acls groups to: " + adminAclsGroups.mkString(","))
logInfo(log"Changing admin acls groups to: ${MDC(LogKeys.ADMIN_ACLS, adminAcls.mkString(","))}")
}

def setAcls(aclSetting: Boolean): Unit = {
Expand Down
24 changes: 15 additions & 9 deletions core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import scala.util.control.NonFatal

import org.apache.spark._
import org.apache.spark.api.python.PythonFunction.PythonAccumulator
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.TASK_NAME
import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES, Python}
import org.apache.spark.internal.config.Python._
Expand Down Expand Up @@ -131,19 +131,23 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
private val daemonModule =
conf.get(PYTHON_DAEMON_MODULE).map { value =>
logInfo(
s"Python daemon module in PySpark is set to [$value] in '${PYTHON_DAEMON_MODULE.key}', " +
"using this to start the daemon up. Note that this configuration only has an effect when " +
s"'${PYTHON_USE_DAEMON.key}' is enabled and the platform is not Windows.")
log"Python daemon module in PySpark is set to " +
log"[${MDC(LogKeys.VALUE, value)}] in '${MDC(LogKeys.CONFIG,
PYTHON_DAEMON_MODULE.key)}', using this to start the daemon up. Note that this " +
log"configuration only has an effect when '${MDC(LogKeys.CONFIG2,
PYTHON_USE_DAEMON.key)}' is enabled and the platform is not Windows.")
value
}.getOrElse("pyspark.daemon")

// This configuration indicates the module to run each Python worker.
private val workerModule =
conf.get(PYTHON_WORKER_MODULE).map { value =>
logInfo(
s"Python worker module in PySpark is set to [$value] in '${PYTHON_WORKER_MODULE.key}', " +
"using this to start the worker up. Note that this configuration only has an effect when " +
s"'${PYTHON_USE_DAEMON.key}' is disabled or the platform is Windows.")
log"Python worker module in PySpark is set to ${MDC(LogKeys.VALUE, value)} " +
log"in ${MDC(LogKeys.CONFIG, PYTHON_WORKER_MODULE.key)}, " +
log"using this to start the worker up. Note that this configuration only has " +
log"an effect when ${MDC(LogKeys.CONFIG2, PYTHON_USE_DAEMON.key)} " +
log"is disabled or the platform is Windows.")
value
}.getOrElse("pyspark.worker")

Expand Down Expand Up @@ -509,8 +513,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
val init = initTime - bootTime
val finish = finishTime - initTime
val total = finishTime - startTime
logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot,
init, finish))
logInfo(log"Times: total = ${MDC(LogKeys.TOTAL_TIME, total)}, " +
log"boot = ${MDC(LogKeys.BOOT_TIME, boot)}, " +
log"init = ${MDC(LogKeys.INIT_TIME, init)}, " +
log"finish = ${MDC(LogKeys.FINISH_TIME, finish)}")
val memoryBytesSpilled = stream.readLong()
val diskBytesSpilled = stream.readLong()
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark._
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.errors.SparkCoreErrors
import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics}
import org.apache.spark.internal.{config, Logging, MDC}
import org.apache.spark.internal.{config, Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.internal.config.{LEGACY_ABORT_STAGE_AFTER_KILL_TASKS, RDD_CACHE_VISIBILITY_TRACKING_ENABLED}
import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY
Expand Down Expand Up @@ -998,11 +998,13 @@ private[spark] class DAGScheduler(
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
waiter.completionFuture.value.get match {
case scala.util.Success(_) =>
logInfo("Job %d finished: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} finished: " +
log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " +
log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e6)} ms")
case scala.util.Failure(exception) =>
logInfo("Job %d failed: %s, took %f s".format
(waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} failed: " +
log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " +
log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e6)} ms")
// SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
val callerStackTrace = Thread.currentThread().getStackTrace.tail
exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private[spark] object ShutdownHookManager extends Logging {
}
}
if (retval) {
logInfo("path = " + file + ", already present as root for deletion.")
logInfo(log"path = ${MDC(LogKeys.FILE_NAME, file)}, already present as root for deletion.")
}
retval
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.util.collection

import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.config._
import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager}

Expand Down Expand Up @@ -143,8 +143,9 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager)
*/
@inline private def logSpillage(size: Long): Unit = {
val threadId = Thread.currentThread().getId
logInfo("Thread %d spilling in-memory map of %s to disk (%d time%s so far)"
.format(threadId, org.apache.spark.util.Utils.bytesToString(size),
_spillCount, if (_spillCount > 1) "s" else ""))
logInfo(log"Thread ${MDC(LogKeys.THREAD_ID, threadId)} " +
log"spilling in-memory map of ${MDC(LogKeys.BYTE_SIZE,
org.apache.spark.util.Utils.bytesToString(size))} to disk " +
log"(${MDC(LogKeys.SPILL_TIMES, _spillCount)} times so far)")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import scala.reflect.ClassTag

import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaSparkContext._
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys.{CLUSTER_CENTROIDS, CLUSTER_LABEL, CLUSTER_WEIGHT, LARGEST_CLUSTER_INDEX, SMALLEST_CLUSTER_INDEX}
import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors}
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -222,7 +222,7 @@ class StreamingKMeans @Since("1.2.0") (
throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit)
}
this.decayFactor = math.exp(math.log(0.5) / halfLife)
logInfo("Setting decay factor to: %g ".format (this.decayFactor))
logInfo(log"Setting decay factor to: ${MDC(LogKeys.VALUE, this.decayFactor)}")
this.timeUnit = timeUnit
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ object GradientDescent extends Logging {
i += 1
}

logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
stochasticLossHistory.takeRight(10).mkString(", ")))
logInfo(log"GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses " +
log"${MDC(LogKeys.LOSSES, stochasticLossHistory.takeRight(10).mkString(", "))}")

(weights, stochasticLossHistory.toArray)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable
import breeze.linalg.{DenseVector => BDV}
import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS}

import org.apache.spark.internal.Logging
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.mllib.linalg.BLAS.axpy
import org.apache.spark.rdd.RDD
Expand Down Expand Up @@ -217,8 +217,8 @@ object LBFGS extends Logging {

val lossHistoryArray = lossHistory.result()

logInfo("LBFGS.runLBFGS finished. Last 10 losses %s".format(
lossHistoryArray.takeRight(10).mkString(", ")))
logInfo(log"LBFGS.runLBFGS finished. Last 10 losses ${MDC(LogKeys.LOSSES,
lossHistoryArray.takeRight(10).mkString(", "))}")

(weights, lossHistoryArray)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import org.apache.hadoop.security.UserGroupInformation

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.internal.LogKeys._
import org.apache.spark.metrics.source.HiveCatalogMetrics
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -906,11 +906,11 @@ private[hive] class HiveClientImpl(
} catch {
case e: Exception =>
logError(
s"""
log"""
|======================
|HIVE FAILURE OUTPUT
|======================
|${outputBuffer.toString}
|${MDC(LogKeys.OUTPUT_BUFFER, outputBuffer.toString)}
|======================
|END HIVE FAILURE OUTPUT
|======================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T])
// Create RDDs from the checkpoint data
currentCheckpointFiles.foreach {
case(time, file) =>
logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'")
logInfo(log"Restoring checkpointed RDD for time ${MDC(LogKeys.TIME, time)} from file " +
log"'${MDC(LogKeys.FILE_NAME, file)}'")
dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file)))
}
}
Expand Down
Loading

0 comments on commit 71cf25e

Please sign in to comment.