Skip to content

Commit

Permalink
[SPARK-47963][CORE] Make the external Spark ecosystem can use structu…
Browse files Browse the repository at this point in the history
…red logging mechanisms

### What changes were proposed in this pull request?
The pr aims to make `the external Spark ecosystem` can use `structured logging mechanisms`.

### Why are the changes needed?
Currently, the class of MDC's key is `LogKey`, while LogKey is an `enumeration`. If we want to use structured logging mechanism in a `third-party Spark ecosystem`, we cannot currently use it (we cannot expand and `add a new LogKey enumeration value`)
So I propose changing the `LogKey` from `enumeration` to the following,
<img width="550" alt="image" src="https://github.com/apache/spark/assets/15246973/ffb0f41e-4f2c-4537-b43f-d55353fe03f7">
as we already have a similar way of writing it in the Spark code, eg:
https://github.com/apache/spark/blob/6f01982094f6cb6e55a9edfbe06f9bfa9e3bce01/core/src/main/scala/org/apache/spark/TaskEndReason.scala#L39-L46

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Add new UT (Give an example to developers who use the LogKey mechanism in external systems related to Spark.)
- Manually test.
- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#46193 from panbingkun/SPARK-47963.

Lead-authored-by: panbingkun <panbingkun@baidu.com>
Co-authored-by: panbingkun <pbk1982@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
  • Loading branch information
2 people authored and gengliangwang committed Apr 26, 2024
1 parent 675f5f0 commit 9cf6dc8
Show file tree
Hide file tree
Showing 310 changed files with 1,163 additions and 1,109 deletions.
875 changes: 439 additions & 436 deletions common/utils/src/main/scala/org/apache/spark/internal/LogKey.scala

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import org.apache.logging.log4j.core.filter.AbstractFilter
import org.slf4j.{Logger, LoggerFactory}

import org.apache.spark.internal.Logging.SparkShellLoggingFilter
import org.apache.spark.internal.LogKey.LogKey
import org.apache.spark.util.SparkClassUtils

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

## LogKey

LogKeys serve as identifiers for mapped diagnostic contexts (MDC) within logs. Follow these guidelines when adding new LogKeys:
`LogKey`s serve as identifiers for mapped diagnostic contexts (MDC) within logs. Follow these guidelines when adding a new LogKey:
* Define all structured logging keys in `LogKey.scala`, and sort them alphabetically for ease of search.
* Use `UPPER_SNAKE_CASE` for key names.
* Key names should be both simple and broad, yet include specific identifiers like `STAGE_ID`, `TASK_ID`, and `JOB_ID` when needed for clarity. For instance, use `MAX_ATTEMPTS` as a general key instead of creating separate keys for each scenario such as `EXECUTOR_STATE_SYNC_MAX_ATTEMPTS` and `MAX_TASK_FAILURES`. This balances simplicity with the detail needed for effective logging.
Expand Down
33 changes: 20 additions & 13 deletions common/utils/src/test/scala/org/apache/spark/util/LogKeySuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,16 @@ import java.nio.file.{Files, Path}
import java.util.{ArrayList => JList}

import scala.jdk.CollectionConverters._
import scala.reflect.runtime.universe._

import org.apache.commons.io.FileUtils
import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite

import org.apache.spark.internal.{Logging, LogKey}
import org.apache.spark.internal.LogKey.LogKey
import org.apache.spark.internal.{Logging, LogKeys}

// scalastyle:off line.size.limit
/**
* To re-generate the LogKey class file, run:
* To re-generate the file `LogKey.scala`, run:
* {{{
* SPARK_GENERATE_GOLDEN_FILES=1 build/sbt "common-utils/testOnly org.apache.spark.util.LogKeySuite"
* }}}
Expand All @@ -57,20 +57,20 @@ class LogKeySuite
private val logKeyFilePath = getWorkspaceFilePath("common", "utils", "src", "main", "scala",
"org", "apache", "spark", "internal", "LogKey.scala")

// regenerate the file `LogKey.scala` with its enumeration fields sorted alphabetically
// regenerate the file `LogKey.scala` with its members sorted alphabetically
private def regenerateLogKeyFile(
originalKeys: Seq[LogKey], sortedKeys: Seq[LogKey]): Unit = {
originalKeys: Seq[String], sortedKeys: Seq[String]): Unit = {
if (originalKeys != sortedKeys) {
val logKeyFile = logKeyFilePath.toFile
logInfo(s"Regenerating LogKey file $logKeyFile")
logInfo(s"Regenerating the file $logKeyFile")
val originalContents = FileUtils.readLines(logKeyFile, StandardCharsets.UTF_8)
val sortedContents = new JList[String]()
var firstMatch = false
originalContents.asScala.foreach { line =>
if (line.trim.startsWith("val ") && line.trim.endsWith(" = Value")) {
if (line.trim.startsWith("case object ") && line.trim.endsWith(" extends LogKey")) {
if (!firstMatch) {
sortedKeys.foreach { logKey =>
sortedContents.add(s" val ${logKey.toString} = Value")
sortedKeys.foreach { key =>
sortedContents.add(s" case object $key extends LogKey")
}
firstMatch = true
}
Expand All @@ -83,14 +83,21 @@ class LogKeySuite
}
}

test("LogKey enumeration fields are correctly sorted") {
val originalKeys = LogKey.values.toSeq
val sortedKeys = originalKeys.sortBy(_.toString)
test("The members of LogKeys are correctly sorted") {
val originalKeys = getAllLogKeys.reverse
val sortedKeys = originalKeys.sorted
if (regenerateGoldenFiles) {
regenerateLogKeyFile(originalKeys, sortedKeys)
} else {
assert(originalKeys === sortedKeys,
"LogKey enumeration fields must be sorted alphabetically")
"The members of LogKeys must be sorted alphabetically")
}
}

private def getAllLogKeys: Seq[String] = {
val logKeysType = typeOf[LogKeys.type]
val classSymbol = logKeysType.typeSymbol.asClass
val members = classSymbol.typeSignature.members
members.filter(m => m.isTerm && !m.isMethod).map(_.name.toString).toSeq
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.jdk.CollectionConverters._
import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{EXIT_CODE, OFFSET, RANGE}
import org.apache.spark.internal.LogKeys.{EXIT_CODE, OFFSET, RANGE}

class MDCSuite
extends AnyFunSuite // scalastyle:ignore funsuite
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ class PatternLoggingSuite extends LoggingSuiteBase with BeforeAndAfterAll {
override def expectedPatternForMsgWithMDCAndException(level: Level): String =
s""".*$level $className: Error in executor 1.\njava.lang.RuntimeException: OOM\n[\\s\\S]*"""

override def expectedPatternForExternalSystemCustomLogKey(level: Level): String = {
s""".*$level $className: External system custom log message.\n"""
}

override def verifyMsgWithConcat(level: Level, logOutput: String): Unit = {
val pattern =
s""".*$level $className: Min Size: 2, Max Size: 4. Please double check.\n"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,7 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.logging.log4j.Level
import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite

import org.apache.spark.internal.{LogEntry, Logging, MDC}
import org.apache.spark.internal.LogKey.{EXECUTOR_ID, MAX_SIZE, MIN_SIZE}
import org.apache.spark.internal.{LogEntry, Logging, LogKey, LogKeys, MDC}

trait LoggingSuiteBase
extends AnyFunSuite // scalastyle:ignore funsuite
Expand Down Expand Up @@ -54,14 +53,14 @@ trait LoggingSuiteBase

def basicMsg: String = "This is a log message"

def msgWithMDC: LogEntry = log"Lost executor ${MDC(EXECUTOR_ID, "1")}."
def msgWithMDC: LogEntry = log"Lost executor ${MDC(LogKeys.EXECUTOR_ID, "1")}."

def msgWithMDCValueIsNull: LogEntry = log"Lost executor ${MDC(EXECUTOR_ID, null)}."
def msgWithMDCValueIsNull: LogEntry = log"Lost executor ${MDC(LogKeys.EXECUTOR_ID, null)}."

def msgWithMDCAndException: LogEntry = log"Error in executor ${MDC(EXECUTOR_ID, "1")}."
def msgWithMDCAndException: LogEntry = log"Error in executor ${MDC(LogKeys.EXECUTOR_ID, "1")}."

def msgWithConcat: LogEntry = log"Min Size: ${MDC(MIN_SIZE, "2")}, " +
log"Max Size: ${MDC(MAX_SIZE, "4")}. " +
def msgWithConcat: LogEntry = log"Min Size: ${MDC(LogKeys.MIN_SIZE, "2")}, " +
log"Max Size: ${MDC(LogKeys.MAX_SIZE, "4")}. " +
log"Please double check."

// test for basic message (without any mdc)
Expand All @@ -79,6 +78,9 @@ trait LoggingSuiteBase
// test for message and exception
def expectedPatternForMsgWithMDCAndException(level: Level): String

// test for external system custom LogKey
def expectedPatternForExternalSystemCustomLogKey(level: Level): String

def verifyMsgWithConcat(level: Level, logOutput: String): Unit

test("Basic logging") {
Expand Down Expand Up @@ -144,6 +146,21 @@ trait LoggingSuiteBase
}
}

private val externalSystemCustomLog =
log"${MDC(CUSTOM_LOG_KEY, "External system custom log message.")}"
test("Logging with external system custom LogKey") {
Seq(
(Level.ERROR, () => logError(externalSystemCustomLog)),
(Level.WARN, () => logWarning(externalSystemCustomLog)),
(Level.INFO, () => logInfo(externalSystemCustomLog)),
(Level.DEBUG, () => logDebug(externalSystemCustomLog)),
(Level.TRACE, () => logTrace(externalSystemCustomLog))).foreach {
case (level, logFunc) =>
val logOutput = captureLogOutput(logFunc)
assert(expectedPatternForExternalSystemCustomLogKey(level).r.matches(logOutput))
}
}

test("Logging with concat") {
Seq(
(Level.ERROR, () => logError(msgWithConcat)),
Expand Down Expand Up @@ -244,6 +261,21 @@ class StructuredLoggingSuite extends LoggingSuiteBase {
}""")
}

override def expectedPatternForExternalSystemCustomLogKey(level: Level): String = {
compactAndToRegexPattern(
s"""
{
"ts": "<timestamp>",
"level": "$level",
"msg": "External system custom log message.",
"context": {
"custom_log_key": "External system custom log message."
},
"logger": "$className"
}"""
)
}

override def verifyMsgWithConcat(level: Level, logOutput: String): Unit = {
val pattern1 = compactAndToRegexPattern(
s"""
Expand Down Expand Up @@ -273,3 +305,6 @@ class StructuredLoggingSuite extends LoggingSuiteBase {
assert(pattern1.r.matches(logOutput) || pattern2.r.matches(logOutput))
}
}

// External system custom LogKey must be `extends LogKey`
case object CUSTOM_LOG_KEY extends LogKey
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.hadoop.mapreduce.Job

import org.apache.spark.{SparkException, SparkIllegalArgumentException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{CODEC_LEVEL, CODEC_NAME, CONFIG, PATH}
import org.apache.spark.internal.LogKeys.{CODEC_LEVEL, CODEC_NAME, CONFIG, PATH}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.avro.AvroCompressionCodec._
import org.apache.spark.sql.avro.AvroOptions.IGNORE_EXTENSION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.control.NonFatal
import io.grpc.stub.StreamObserver

import org.apache.spark.internal.Logging
import org.apache.spark.internal.LogKey.{ERROR, POLICY, RETRY_COUNT, WAIT_TIME}
import org.apache.spark.internal.LogKeys.{ERROR, POLICY, RETRY_COUNT, WAIT_TIME}
import org.apache.spark.internal.MDC

private[sql] class GrpcRetryHandler(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import io.grpc.stub.{ServerCallStreamObserver, StreamObserver}
import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto.ExecutePlanResponse
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.catalyst.util.DateTimeConstants.NANOS_PER_MILLIS
import org.apache.spark.sql.connect.common.ProtoUtils
import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION, CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE, CONNECT_PROGRESS_REPORT_INTERVAL}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import io.grpc.stub.StreamObserver
import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey
import org.apache.spark.internal.LogKeys
import org.apache.spark.sql.connect.config.Connect.CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE
import org.apache.spark.sql.connect.service.ExecuteHolder

Expand Down Expand Up @@ -245,13 +245,13 @@ private[connect] class ExecuteResponseObserver[T <: Message](val executeHolder:
removeResponsesUntilIndex(lastProducedIndex)
// scalastyle:off line.size.limit
logInfo(
log"Release all for opId=${MDC(LogKey.OP_ID, executeHolder.operationId)}. Execution stats: " +
log"total=${MDC(LogKey.TOTAL, totalSize)} " +
log"autoRemoved=${MDC(LogKey.CACHE_AUTO_REMOVED_SIZE, autoRemovedSize)} " +
log"cachedUntilConsumed=${MDC(LogKey.CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed)} " +
log"cachedUntilProduced=${MDC(LogKey.CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced)} " +
log"maxCachedUntilConsumed=${MDC(LogKey.MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed.max)} " +
log"maxCachedUntilProduced=${MDC(LogKey.MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced.max)}")
log"Release all for opId=${MDC(LogKeys.OP_ID, executeHolder.operationId)}. Execution stats: " +
log"total=${MDC(LogKeys.TOTAL, totalSize)} " +
log"autoRemoved=${MDC(LogKeys.CACHE_AUTO_REMOVED_SIZE, autoRemovedSize)} " +
log"cachedUntilConsumed=${MDC(LogKeys.CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed)} " +
log"cachedUntilProduced=${MDC(LogKeys.CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced)} " +
log"maxCachedUntilConsumed=${MDC(LogKeys.MAX_CACHE_UNTIL_HIGHEST_CONSUMED_SIZE, cachedSizeUntilHighestConsumed.max)} " +
log"maxCachedUntilProduced=${MDC(LogKeys.MAX_CACHE_UNTIL_LAST_PRODUCED_SIZE, cachedSizeUntilLastProduced.max)}")
// scalastyle:on line.size.limit
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.connect.proto.Parse.ParseFormat
import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult.StreamingQueryInstance
import org.apache.spark.connect.proto.WriteStreamOperationStart.TriggerCase
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.SESSION_ID
import org.apache.spark.internal.LogKeys.SESSION_ID
import org.apache.spark.ml.{functions => MLFunctions}
import org.apache.spark.resource.{ExecutorResourceRequest, ResourceProfile, TaskResourceProfile, TaskResourceRequest}
import org.apache.spark.sql.{Column, Dataset, Encoders, ForeachWriter, Observation, RelationalGroupedDataset, SparkSession}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.util.control.NonFatal
import org.apache.spark.SparkException
import org.apache.spark.api.python.{PythonException, PythonWorkerUtils, SimplePythonFunction, SpecialLengths, StreamingPythonRunner}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{DATAFRAME_ID, QUERY_ID, RUN_ID, SESSION_ID}
import org.apache.spark.internal.LogKeys.{DATAFRAME_ID, QUERY_ID, RUN_ID, SESSION_ID}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.connect.service.SessionHolder
import org.apache.spark.sql.connect.service.SparkConnectService
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.io.EOFException
import org.apache.spark.SparkException
import org.apache.spark.api.python.{PythonException, PythonWorkerUtils, SimplePythonFunction, SpecialLengths, StreamingPythonRunner}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.FUNCTION_NAME
import org.apache.spark.internal.LogKeys.FUNCTION_NAME
import org.apache.spark.sql.connect.service.{SessionHolder, SparkConnectService}
import org.apache.spark.sql.streaming.StreamingQueryListener

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import io.grpc.ServerCallHandler
import io.grpc.ServerInterceptor

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{DESCRIPTION, MESSAGE}
import org.apache.spark.internal.LogKeys.{DESCRIPTION, MESSAGE}

/**
* A gRPC interceptor to log RPC requests and responses. It logs the protobufs as JSON. Useful for
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import org.apache.spark.{SparkEnv, SparkException, SparkSQLException}
import org.apache.spark.api.python.PythonFunction.PythonAccumulator
import org.apache.spark.connect.proto
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey._
import org.apache.spark.internal.LogKeys._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import com.google.common.cache.CacheBuilder

import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.connect.proto
import org.apache.spark.internal.{Logging, LogKey, MDC}
import org.apache.spark.internal.{Logging, LogKeys, MDC}
import org.apache.spark.sql.connect.config.Connect.{CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE, CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT, CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL}
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -95,7 +95,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
sessionHolder.addExecuteHolder(executeHolder)
executions.put(executeHolder.key, executeHolder)
lastExecutionTimeMs = None
logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_KEY, executeHolder.key)} is created.")
logInfo(log"ExecuteHolder ${MDC(LogKeys.EXECUTE_KEY, executeHolder.key)} is created.")
}

schedulePeriodicChecks() // Starts the maintenance thread if it hasn't started.
Expand All @@ -122,7 +122,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
if (executions.isEmpty) {
lastExecutionTimeMs = Some(System.currentTimeMillis())
}
logInfo(log"ExecuteHolder ${MDC(LogKey.EXECUTE_KEY, key)} is removed.")
logInfo(log"ExecuteHolder ${MDC(LogKeys.EXECUTE_KEY, key)} is removed.")
}
// close the execution outside the lock
executeHolder.foreach { e =>
Expand All @@ -147,7 +147,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
sessionExecutionHolders.foreach { case (_, executeHolder) =>
val info = executeHolder.getExecuteInfo
logInfo(
log"Execution ${MDC(LogKey.EXECUTE_INFO, info)} removed in removeSessionExecutions.")
log"Execution ${MDC(LogKeys.EXECUTE_INFO, info)} removed in removeSessionExecutions.")
removeExecuteHolder(executeHolder.key, abandoned = true)
}
}
Expand Down Expand Up @@ -202,7 +202,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
val interval = SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL)
logInfo(
log"Starting thread for cleanup of abandoned executions every " +
log"${MDC(LogKey.INTERVAL, interval)} ms")
log"${MDC(LogKeys.INTERVAL, interval)} ms")
scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
scheduledExecutor.get.scheduleAtFixedRate(
() => {
Expand Down Expand Up @@ -242,7 +242,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging {
toRemove.foreach { executeHolder =>
val info = executeHolder.getExecuteInfo
logInfo(
log"Found execution ${MDC(LogKey.EXECUTE_INFO, info)} that was abandoned " +
log"Found execution ${MDC(LogKeys.EXECUTE_INFO, info)} that was abandoned " +
log"and expired and will be removed.")
removeExecuteHolder(executeHolder.key, abandoned = true)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.net.InetSocketAddress
import scala.jdk.CollectionConverters._

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{HOST, PORT}
import org.apache.spark.internal.LogKeys.{HOST, PORT}
import org.apache.spark.sql.SparkSession

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, SparkConnectServiceGrpc}
import org.apache.spark.connect.proto.SparkConnectServiceGrpc.AsyncService
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.HOST
import org.apache.spark.internal.LogKeys.HOST
import org.apache.spark.internal.config.UI.UI_ENABLED
import org.apache.spark.sql.connect.config.Connect.{CONNECT_GRPC_BINDING_ADDRESS, CONNECT_GRPC_BINDING_PORT, CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT, CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE}
import org.apache.spark.sql.connect.execution.ConnectProgressExecutionListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import com.google.common.cache.CacheBuilder

import org.apache.spark.{SparkEnv, SparkSQLException}
import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{INTERVAL, SESSION_HOLD_INFO}
import org.apache.spark.internal.LogKeys.{INTERVAL, SESSION_HOLD_INFO}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.config.Connect.{CONNECT_SESSION_MANAGER_CLOSED_SESSIONS_TOMBSTONES_SIZE, CONNECT_SESSION_MANAGER_DEFAULT_SESSION_TIMEOUT, CONNECT_SESSION_MANAGER_MAINTENANCE_INTERVAL}
import org.apache.spark.util.ThreadUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import scala.concurrent.duration.{Duration, DurationInt, FiniteDuration}
import scala.util.control.NonFatal

import org.apache.spark.internal.{Logging, MDC}
import org.apache.spark.internal.LogKey.{DURATION, NEW_VALUE, OLD_VALUE, QUERY_CACHE_VALUE, QUERY_ID, SESSION_ID}
import org.apache.spark.internal.LogKeys.{DURATION, NEW_VALUE, OLD_VALUE, QUERY_CACHE_VALUE, QUERY_ID, SESSION_ID}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils}
Expand Down
Loading

0 comments on commit 9cf6dc8

Please sign in to comment.