From 71cf25ec7b0a234fe5570e952a76afb8eb1d1704 Mon Sep 17 00:00:00 2001 From: Amanda Liu Date: Thu, 11 Jul 2024 14:12:32 -0700 Subject: [PATCH] [SPARK-48623][CORE] Structured logging migrations [Part 3] ### What changes were proposed in this pull request? This PR makes additional Scala logging migrations to comply with the scala style changes in https://github.com/apache/spark/pull/46947 ### Why are the changes needed? This makes development and PR review of the structured logging migration easier. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Tested by ensuring dev/scalastyle checks pass ### Was this patch authored or co-authored using generative AI tooling? No Closes #47275 from asl3/formatstructuredlogmigrations. Lead-authored-by: Amanda Liu Co-authored-by: Gengliang Wang Signed-off-by: Gengliang Wang --- .../org/apache/spark/internal/LogKey.scala | 7 ++++++ .../org/apache/spark/util/MavenUtils.scala | 5 ++-- .../sql/connect/service/SessionHolder.scala | 25 +++++++++++-------- .../SparkConnectListenerBusListener.scala | 10 ++++---- .../org/apache/spark/SecurityManager.scala | 13 +++++----- .../spark/api/python/PythonRunner.scala | 24 +++++++++++------- .../apache/spark/scheduler/DAGScheduler.scala | 12 +++++---- .../spark/util/ShutdownHookManager.scala | 2 +- .../spark/util/collection/Spillable.scala | 9 ++++--- .../mllib/clustering/StreamingKMeans.scala | 4 +-- .../mllib/optimization/GradientDescent.scala | 4 +-- .../spark/mllib/optimization/LBFGS.scala | 6 ++--- .../sql/hive/client/HiveClientImpl.scala | 6 ++--- .../dstream/DStreamCheckpointData.scala | 3 ++- .../streaming/dstream/FileInputDStream.scala | 3 ++- .../streaming/dstream/RawInputDStream.scala | 2 +- .../streaming/scheduler/JobGenerator.scala | 4 +-- 17 files changed, 81 insertions(+), 58 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala index 59e467402aa8d..51ef112a677d4 100644 --- a/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala +++ b/common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala @@ -101,6 +101,7 @@ private[spark] object LogKeys { case object BLOCK_TYPE extends LogKey case object BOOT extends LogKey case object BOOTSTRAP_TIME extends LogKey + case object BOOT_TIME extends LogKey case object BROADCAST extends LogKey case object BROADCAST_ID extends LogKey case object BROADCAST_OUTPUT_STATUS_SIZE extends LogKey @@ -110,6 +111,7 @@ private[spark] object LogKeys { case object BYTE_SIZE extends LogKey case object CACHED_TABLE_PARTITION_METADATA_SIZE extends LogKey case object CACHE_AUTO_REMOVED_SIZE extends LogKey + case object CACHE_SIZE extends LogKey case object CACHE_UNTIL_HIGHEST_CONSUMED_SIZE extends LogKey case object CACHE_UNTIL_LAST_PRODUCED_SIZE extends LogKey case object CALL_SITE_LONG_FORM extends LogKey @@ -282,6 +284,7 @@ private[spark] object LogKeys { case object FINAL_CONTEXT extends LogKey case object FINAL_OUTPUT_PATH extends LogKey case object FINAL_PATH extends LogKey + case object FINISH_TIME extends LogKey case object FINISH_TRIGGER_DURATION extends LogKey case object FREE_MEMORY_SIZE extends LogKey case object FROM_OFFSET extends LogKey @@ -320,10 +323,12 @@ private[spark] object LogKeys { case object INITIAL_CAPACITY extends LogKey case object INITIAL_HEARTBEAT_INTERVAL extends LogKey case object INIT_MODE extends LogKey + case object INIT_TIME extends LogKey case object INPUT extends LogKey case object INPUT_SPLIT extends LogKey case object INTEGRAL extends LogKey case object INTERVAL extends LogKey + case object INVALID_PARAMS extends LogKey case object ISOLATION_LEVEL extends LogKey case object ISSUE_DATE extends LogKey case object IS_NETWORK_REQUEST_DONE extends LogKey @@ -369,6 +374,7 @@ private[spark] object LogKeys { case object LOG_LEVEL extends LogKey case object LOG_OFFSET extends LogKey case object LOG_TYPE extends LogKey + case object LOSSES extends LogKey case object LOWER_BOUND extends LogKey case object MALFORMATTED_STRING extends LogKey case object MAP_ID extends LogKey @@ -566,6 +572,7 @@ private[spark] object LogKeys { case object OS_NAME extends LogKey case object OS_VERSION extends LogKey case object OUTPUT extends LogKey + case object OUTPUT_BUFFER extends LogKey case object OVERHEAD_MEMORY_SIZE extends LogKey case object PAGE_SIZE extends LogKey case object PARENT_STAGES extends LogKey diff --git a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala index 546981c8b5435..42a1d1612aeeb 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/MavenUtils.scala @@ -650,8 +650,9 @@ private[spark] object MavenUtils extends Logging { val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq if (invalidParams.nonEmpty) { logWarning( - s"Invalid parameters `${invalidParams.sorted.mkString(",")}` found " + - s"in Ivy URI query `$uriQuery`.") + log"Invalid parameters `${MDC(LogKeys.INVALID_PARAMS, + invalidParams.sorted.mkString(","))}` " + + log"found in Ivy URI query `${MDC(LogKeys.URI, uriQuery)}`.") } (transitive, exclusionList, repos) diff --git a/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala b/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala index 681f7e29630ff..fbae94afc43df 100644 --- a/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala +++ b/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala @@ -33,8 +33,7 @@ import com.google.common.cache.{Cache, CacheBuilder} import org.apache.spark.{SparkEnv, SparkException, SparkSQLException} import org.apache.spark.api.python.PythonFunction.PythonAccumulator import org.apache.spark.connect.proto -import org.apache.spark.internal.{Logging, MDC} -import org.apache.spark.internal.LogKeys._ +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.DataFrame import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -63,9 +62,11 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private lazy val planCache: Option[Cache[proto.Relation, LogicalPlan]] = { if (SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE) <= 0) { logWarning( - s"Session plan cache is disabled due to non-positive cache size." + - s" Current value of '${Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key}' is" + - s" ${SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE)}.") + log"Session plan cache is disabled due to non-positive cache size." + + log" Current value of " + + log"'${MDC(LogKeys.CONFIG, Connect.CONNECT_SESSION_PLAN_CACHE_SIZE.key)}' is ${MDC( + LogKeys.CACHE_SIZE, + SparkEnv.get.conf.get(Connect.CONNECT_SESSION_PLAN_CACHE_SIZE))}") None } else { Some( @@ -248,15 +249,17 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio private[connect] def updateAccessTime(): Unit = { lastAccessTimeMs = System.currentTimeMillis() logInfo( - log"Session ${MDC(SESSION_KEY, key)} accessed, " + - log"time ${MDC(LAST_ACCESS_TIME, lastAccessTimeMs)} ms.") + log"Session with userId: ${MDC(LogKeys.USER_ID, userId)} and " + + log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)} accessed," + + log"time ${MDC(LogKeys.LAST_ACCESS_TIME, lastAccessTimeMs)} ms.") } private[connect] def setCustomInactiveTimeoutMs(newInactiveTimeoutMs: Option[Long]): Unit = { customInactiveTimeoutMs = newInactiveTimeoutMs logInfo( - log"Session ${MDC(SESSION_KEY, key)} " + - log"inactive timeout set to ${MDC(TIMEOUT, customInactiveTimeoutMs)} ms.") + log"Session with userId: ${MDC(LogKeys.USER_ID, userId)} and " + + log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)} inactive timeout set to " + + log"${MDC(LogKeys.TIMEOUT, customInactiveTimeoutMs)} ms") } /** @@ -282,8 +285,8 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio throw new IllegalStateException(s"Session $key is already closed.") } logInfo( - log"Closing session with userId: ${MDC(USER_ID, userId)} and " + - log"sessionId: ${MDC(SESSION_ID, sessionId)}") + log"Closing session with userId: ${MDC(LogKeys.USER_ID, userId)} and " + + log"sessionId: ${MDC(LogKeys.SESSION_ID, sessionId)}") closedTimeMs = Some(System.currentTimeMillis()) if (Utils.isTesting && eventManager.status == SessionStatus.Pending) { diff --git a/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala b/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala index 56d0d920e95b4..c6baad72ee181 100644 --- a/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala +++ b/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectListenerBusListener.scala @@ -28,7 +28,7 @@ import org.apache.spark.connect.proto.ExecutePlanResponse import org.apache.spark.connect.proto.StreamingQueryEventType import org.apache.spark.connect.proto.StreamingQueryListenerEvent import org.apache.spark.connect.proto.StreamingQueryListenerEventsResult -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.sql.streaming.StreamingQueryListener import org.apache.spark.util.ArrayImplicits._ @@ -131,10 +131,10 @@ private[sql] class SparkConnectListenerBusListener( .build()) } catch { case NonFatal(e) => - logError( - s"[SessionId: ${sessionHolder.sessionId}][UserId: ${sessionHolder.userId}] " + - s"Removing SparkConnectListenerBusListener and terminating the long-running thread " + - s"because of exception: $e") + logError(log"[SessionId: ${MDC(LogKeys.SESSION_ID, sessionHolder.sessionId)}]" + + log"[UserId: ${MDC(LogKeys.USER_ID, sessionHolder.userId)}] " + + log"Removing SparkConnectListenerBusListener and terminating the long-running thread " + + log"because of exception: ${MDC(LogKeys.EXCEPTION, e)}") // This likely means that the client is not responsive even with retry, we should // remove this listener and cleanup resources. serverSideListenerHolder.cleanUp() diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 5a40afc76728e..c951876e62034 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -122,7 +122,7 @@ private[spark] class SecurityManager( */ def setViewAcls(defaultUsers: Set[String], allowedUsers: Seq[String]): Unit = { viewAcls = adminAcls ++ defaultUsers ++ allowedUsers - logInfo("Changing view acls to: " + viewAcls.mkString(",")) + logInfo(log"Changing view acls to: ${MDC(LogKeys.VIEW_ACLS, viewAcls.mkString(","))}") } def setViewAcls(defaultUser: String, allowedUsers: Seq[String]): Unit = { @@ -135,7 +135,7 @@ private[spark] class SecurityManager( */ def setViewAclsGroups(allowedUserGroups: Seq[String]): Unit = { viewAclsGroups = adminAclsGroups ++ allowedUserGroups - logInfo("Changing view acls groups to: " + viewAclsGroups.mkString(",")) + logInfo(log"Changing view acls groups to: ${MDC(LogKeys.VIEW_ACLS, viewAcls.mkString(","))}") } /** @@ -163,7 +163,7 @@ private[spark] class SecurityManager( */ def setModifyAcls(defaultUsers: Set[String], allowedUsers: Seq[String]): Unit = { modifyAcls = adminAcls ++ defaultUsers ++ allowedUsers - logInfo("Changing modify acls to: " + modifyAcls.mkString(",")) + logInfo(log"Changing modify acls to: ${MDC(LogKeys.MODIFY_ACLS, modifyAcls.mkString(","))}") } /** @@ -172,7 +172,8 @@ private[spark] class SecurityManager( */ def setModifyAclsGroups(allowedUserGroups: Seq[String]): Unit = { modifyAclsGroups = adminAclsGroups ++ allowedUserGroups - logInfo("Changing modify acls groups to: " + modifyAclsGroups.mkString(",")) + logInfo(log"Changing modify acls groups to: ${MDC(LogKeys.MODIFY_ACLS, + modifyAcls.mkString(","))}") } /** @@ -200,7 +201,7 @@ private[spark] class SecurityManager( */ def setAdminAcls(adminUsers: Seq[String]): Unit = { adminAcls = adminUsers.toSet - logInfo("Changing admin acls to: " + adminAcls.mkString(",")) + logInfo(log"Changing admin acls to: ${MDC(LogKeys.ADMIN_ACLS, adminAcls.mkString(","))}") } /** @@ -209,7 +210,7 @@ private[spark] class SecurityManager( */ def setAdminAclsGroups(adminUserGroups: Seq[String]): Unit = { adminAclsGroups = adminUserGroups.toSet - logInfo("Changing admin acls groups to: " + adminAclsGroups.mkString(",")) + logInfo(log"Changing admin acls groups to: ${MDC(LogKeys.ADMIN_ACLS, adminAcls.mkString(","))}") } def setAcls(aclSetting: Boolean): Unit = { diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index b2571ffddc577..6a67587fbd80c 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -31,7 +31,7 @@ import scala.util.control.NonFatal import org.apache.spark._ import org.apache.spark.api.python.PythonFunction.PythonAccumulator -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.TASK_NAME import org.apache.spark.internal.config.{BUFFER_SIZE, EXECUTOR_CORES, Python} import org.apache.spark.internal.config.Python._ @@ -131,9 +131,11 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val daemonModule = conf.get(PYTHON_DAEMON_MODULE).map { value => logInfo( - s"Python daemon module in PySpark is set to [$value] in '${PYTHON_DAEMON_MODULE.key}', " + - "using this to start the daemon up. Note that this configuration only has an effect when " + - s"'${PYTHON_USE_DAEMON.key}' is enabled and the platform is not Windows.") + log"Python daemon module in PySpark is set to " + + log"[${MDC(LogKeys.VALUE, value)}] in '${MDC(LogKeys.CONFIG, + PYTHON_DAEMON_MODULE.key)}', using this to start the daemon up. Note that this " + + log"configuration only has an effect when '${MDC(LogKeys.CONFIG2, + PYTHON_USE_DAEMON.key)}' is enabled and the platform is not Windows.") value }.getOrElse("pyspark.daemon") @@ -141,9 +143,11 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( private val workerModule = conf.get(PYTHON_WORKER_MODULE).map { value => logInfo( - s"Python worker module in PySpark is set to [$value] in '${PYTHON_WORKER_MODULE.key}', " + - "using this to start the worker up. Note that this configuration only has an effect when " + - s"'${PYTHON_USE_DAEMON.key}' is disabled or the platform is Windows.") + log"Python worker module in PySpark is set to ${MDC(LogKeys.VALUE, value)} " + + log"in ${MDC(LogKeys.CONFIG, PYTHON_WORKER_MODULE.key)}, " + + log"using this to start the worker up. Note that this configuration only has " + + log"an effect when ${MDC(LogKeys.CONFIG2, PYTHON_USE_DAEMON.key)} " + + log"is disabled or the platform is Windows.") value }.getOrElse("pyspark.worker") @@ -509,8 +513,10 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val init = initTime - bootTime val finish = finishTime - initTime val total = finishTime - startTime - logInfo("Times: total = %s, boot = %s, init = %s, finish = %s".format(total, boot, - init, finish)) + logInfo(log"Times: total = ${MDC(LogKeys.TOTAL_TIME, total)}, " + + log"boot = ${MDC(LogKeys.BOOT_TIME, boot)}, " + + log"init = ${MDC(LogKeys.INIT_TIME, init)}, " + + log"finish = ${MDC(LogKeys.FINISH_TIME, finish)}") val memoryBytesSpilled = stream.readLong() val diskBytesSpilled = stream.readLong() context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index f50e8bd25fec8..78ae5bd39d230 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -36,7 +36,7 @@ import org.apache.spark._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.errors.SparkCoreErrors import org.apache.spark.executor.{ExecutorMetrics, TaskMetrics} -import org.apache.spark.internal.{config, Logging, MDC} +import org.apache.spark.internal.{config, Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.internal.config.{LEGACY_ABORT_STAGE_AFTER_KILL_TASKS, RDD_CACHE_VISIBILITY_TRACKING_ENABLED} import org.apache.spark.internal.config.Tests.TEST_NO_STAGE_RETRY @@ -998,11 +998,13 @@ private[spark] class DAGScheduler( ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => - logInfo("Job %d finished: %s, took %f s".format - (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) + logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} finished: " + + log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " + + log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e6)} ms") case scala.util.Failure(exception) => - logInfo("Job %d failed: %s, took %f s".format - (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) + logInfo(log"Job ${MDC(LogKeys.JOB_ID, waiter.jobId)} failed: " + + log"${MDC(LogKeys.CALL_SITE_SHORT_FORM, callSite.shortForm)}, took " + + log"${MDC(LogKeys.TIME, (System.nanoTime - start) / 1e6)} ms") // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index 02556528ce38b..ad9c3d3f10cfe 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -110,7 +110,7 @@ private[spark] object ShutdownHookManager extends Logging { } } if (retval) { - logInfo("path = " + file + ", already present as root for deletion.") + logInfo(log"path = ${MDC(LogKeys.FILE_NAME, file)}, already present as root for deletion.") } retval } diff --git a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala index fe488f9cf0daf..c3d648dccea73 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Spillable.scala @@ -18,7 +18,7 @@ package org.apache.spark.util.collection import org.apache.spark.SparkEnv -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.config._ import org.apache.spark.memory.{MemoryConsumer, MemoryMode, TaskMemoryManager} @@ -143,8 +143,9 @@ private[spark] abstract class Spillable[C](taskMemoryManager: TaskMemoryManager) */ @inline private def logSpillage(size: Long): Unit = { val threadId = Thread.currentThread().getId - logInfo("Thread %d spilling in-memory map of %s to disk (%d time%s so far)" - .format(threadId, org.apache.spark.util.Utils.bytesToString(size), - _spillCount, if (_spillCount > 1) "s" else "")) + logInfo(log"Thread ${MDC(LogKeys.THREAD_ID, threadId)} " + + log"spilling in-memory map of ${MDC(LogKeys.BYTE_SIZE, + org.apache.spark.util.Utils.bytesToString(size))} to disk " + + log"(${MDC(LogKeys.SPILL_TIMES, _spillCount)} times so far)") } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala index 85a7350078101..641b4fa4048a6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/StreamingKMeans.scala @@ -21,7 +21,7 @@ import scala.reflect.ClassTag import org.apache.spark.annotation.Since import org.apache.spark.api.java.JavaSparkContext._ -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys.{CLUSTER_CENTROIDS, CLUSTER_LABEL, CLUSTER_WEIGHT, LARGEST_CLUSTER_INDEX, SMALLEST_CLUSTER_INDEX} import org.apache.spark.mllib.linalg.{BLAS, Vector, Vectors} import org.apache.spark.rdd.RDD @@ -222,7 +222,7 @@ class StreamingKMeans @Since("1.2.0") ( throw new IllegalArgumentException("Invalid time unit for decay: " + timeUnit) } this.decayFactor = math.exp(math.log(0.5) / halfLife) - logInfo("Setting decay factor to: %g ".format (this.decayFactor)) + logInfo(log"Setting decay factor to: ${MDC(LogKeys.VALUE, this.decayFactor)}") this.timeUnit = timeUnit this } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala index a288d13e57f7b..8da2b0e27fb87 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala @@ -299,8 +299,8 @@ object GradientDescent extends Logging { i += 1 } - logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format( - stochasticLossHistory.takeRight(10).mkString(", "))) + logInfo(log"GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses " + + log"${MDC(LogKeys.LOSSES, stochasticLossHistory.takeRight(10).mkString(", "))}") (weights, stochasticLossHistory.toArray) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala index 4fc297560c088..28c997f5301c6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/LBFGS.scala @@ -22,7 +22,7 @@ import scala.collection.mutable import breeze.linalg.{DenseVector => BDV} import breeze.optimize.{CachedDiffFunction, DiffFunction, LBFGS => BreezeLBFGS} -import org.apache.spark.internal.Logging +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.mllib.linalg.{Vector, Vectors} import org.apache.spark.mllib.linalg.BLAS.axpy import org.apache.spark.rdd.RDD @@ -217,8 +217,8 @@ object LBFGS extends Logging { val lossHistoryArray = lossHistory.result() - logInfo("LBFGS.runLBFGS finished. Last 10 losses %s".format( - lossHistoryArray.takeRight(10).mkString(", "))) + logInfo(log"LBFGS.runLBFGS finished. Last 10 losses ${MDC(LogKeys.LOSSES, + lossHistoryArray.takeRight(10).mkString(", "))}") (weights, lossHistoryArray) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 11e077e891bd7..735814c9ae084 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -46,7 +46,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil.SOURCE_SPARK -import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.{Logging, LogKeys, MDC} import org.apache.spark.internal.LogKeys._ import org.apache.spark.metrics.source.HiveCatalogMetrics import org.apache.spark.sql.catalyst.TableIdentifier @@ -906,11 +906,11 @@ private[hive] class HiveClientImpl( } catch { case e: Exception => logError( - s""" + log""" |====================== |HIVE FAILURE OUTPUT |====================== - |${outputBuffer.toString} + |${MDC(LogKeys.OUTPUT_BUFFER, outputBuffer.toString)} |====================== |END HIVE FAILURE OUTPUT |====================== diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala index 6fd6597d4f14c..128a5fded49a9 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStreamCheckpointData.scala @@ -117,7 +117,8 @@ class DStreamCheckpointData[T: ClassTag](dstream: DStream[T]) // Create RDDs from the checkpoint data currentCheckpointFiles.foreach { case(time, file) => - logInfo("Restoring checkpointed RDD for time " + time + " from file '" + file + "'") + logInfo(log"Restoring checkpointed RDD for time ${MDC(LogKeys.TIME, time)} from file " + + log"'${MDC(LogKeys.FILE_NAME, file)}'") dstream.generatedRDDs += ((time, dstream.context.sparkContext.checkpointFile[T](file))) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index b39cd43ce1a66..b067c505da0dd 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -147,7 +147,8 @@ class FileInputDStream[K, V, F <: NewInputFormat[K, V]]( override def compute(validTime: Time): Option[RDD[(K, V)]] = { // Find new files val newFiles = findNewFiles(validTime.milliseconds) - logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) + logInfo(log"New files at time ${MDC(LogKeys.BATCH_TIMESTAMP, validTime)}:\n" + + log"${MDC(LogKeys.FILE_NAME, newFiles.mkString("\n"))}") batchTimeToSelectedFiles.synchronized { batchTimeToSelectedFiles += ((validTime, newFiles)) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala index efda751c1e43e..de7882032122a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/RawInputDStream.scala @@ -87,7 +87,7 @@ class RawNetworkReceiver(host: String, port: Int, storageLevel: StorageLevel) val dataBuffer = ByteBuffer.allocate(length) readFully(channel, dataBuffer) dataBuffer.flip() - logInfo("Read a block with " + length + " bytes") + logInfo(log"Read a block with ${MDC(LogKeys.BYTE_SIZE, length)} bytes") queue.put(dataBuffer) } } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala index 91d15e7956a65..1dde435a913c8 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala @@ -221,8 +221,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging { val checkpointTime = ssc.initialCheckpoint.checkpointTime val restartTime = new Time(timer.getRestartTime(graph.zeroTime.milliseconds)) val downTimes = checkpointTime.until(restartTime, batchDuration) - logInfo("Batches during down time (" + downTimes.size + " batches): " - + downTimes.mkString(", ")) + logInfo(log"Batches during down time (${MDC(LogKeys.NUM_BATCHES, downTimes.size)} batches): " + + log"${MDC(LogKeys.BATCH_TIMES, downTimes.mkString(","))}") // Batches that were unprocessed before failure val pendingTimes = ssc.initialCheckpoint.pendingTimes.sorted(Time.ordering)