Skip to content

Commit

Permalink
Merge pull request #1572 from apache/master
Browse files Browse the repository at this point in the history
Create a new pull request by comparing changes across two branches
  • Loading branch information
GulajavaMinistudio authored Oct 17, 2023
2 parents cff2ac8 + 28961a6 commit 47a0ddf
Show file tree
Hide file tree
Showing 13 changed files with 148 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ private[spark] case class ExecutorDeadException(message: String)
/**
* Exception thrown when Spark returns different result after upgrading to a new version.
*/
private[spark] class SparkUpgradeException private(
private[spark] class SparkUpgradeException(
message: String,
cause: Option[Throwable],
errorClass: Option[String],
Expand Down Expand Up @@ -169,7 +169,7 @@ private[spark] class SparkUpgradeException private(
/**
* Arithmetic exception thrown from Spark with an error class.
*/
private[spark] class SparkArithmeticException private(
private[spark] class SparkArithmeticException(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
Expand Down Expand Up @@ -207,7 +207,7 @@ private[spark] class SparkArithmeticException private(
/**
* Unsupported operation exception thrown from Spark with an error class.
*/
private[spark] class SparkUnsupportedOperationException private(
private[spark] class SparkUnsupportedOperationException(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String])
Expand Down Expand Up @@ -271,7 +271,7 @@ private[spark] class SparkConcurrentModificationException(
/**
* Datetime exception thrown from Spark with an error class.
*/
private[spark] class SparkDateTimeException private(
private[spark] class SparkDateTimeException(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
Expand Down Expand Up @@ -324,7 +324,7 @@ private[spark] class SparkFileNotFoundException(
/**
* Number format exception thrown from Spark with an error class.
*/
private[spark] class SparkNumberFormatException private(
private[spark] class SparkNumberFormatException private[spark](
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
Expand Down Expand Up @@ -363,7 +363,7 @@ private[spark] class SparkNumberFormatException private(
/**
* Illegal argument exception thrown from Spark with an error class.
*/
private[spark] class SparkIllegalArgumentException private(
private[spark] class SparkIllegalArgumentException(
message: String,
cause: Option[Throwable],
errorClass: Option[String],
Expand Down Expand Up @@ -403,7 +403,7 @@ private[spark] class SparkIllegalArgumentException private(
override def getQueryContext: Array[QueryContext] = context
}

private[spark] class SparkRuntimeException private(
private[spark] class SparkRuntimeException(
message: String,
cause: Option[Throwable],
errorClass: Option[String],
Expand Down Expand Up @@ -480,7 +480,7 @@ private[spark] class SparkSecurityException(
/**
* Array index out of bounds exception thrown from Spark with an error class.
*/
private[spark] class SparkArrayIndexOutOfBoundsException private(
private[spark] class SparkArrayIndexOutOfBoundsException(
message: String,
errorClass: Option[String],
messageParameters: Map[String, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ class StreamingQueryException private[sql](
/** Time when the exception occurred */
val time: Long = System.currentTimeMillis

override def getMessage: String = s"${message}\n${queryDebugString}"
override def getMessage: String =
if (queryDebugString.isEmpty) message else s"${message}\n${queryDebugString}"

override def toString(): String =
s"""${classOf[StreamingQueryException].getName}: ${cause.getMessage}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import io.grpc.netty.NettyServerBuilder
import io.grpc.stub.StreamObserver
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.SparkException
import org.apache.spark.{SparkException, SparkThrowable}
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.{AddArtifactsRequest, AddArtifactsResponse, AnalyzePlanRequest, AnalyzePlanResponse, ArtifactStatusesRequest, ArtifactStatusesResponse, ExecutePlanRequest, ExecutePlanResponse, SparkConnectServiceGrpc}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.connect.common.config.ConnectCommon
import org.apache.spark.sql.test.ConnectFunSuite

Expand Down Expand Up @@ -208,6 +209,31 @@ class SparkConnectClientSuite extends ConnectFunSuite with BeforeAndAfterEach {
}
}

for ((name, constructor) <- GrpcExceptionConverter.errorFactory) {
test(s"error framework parameters - ${name}") {
val testParams = GrpcExceptionConverter.ErrorParams(
message = "test message",
cause = None,
errorClass = Some("test error class"),
messageParameters = Map("key" -> "value"),
queryContext = Array.empty)
val error = constructor(testParams)
if (!error.isInstanceOf[ParseException]) {
assert(error.getMessage == testParams.message)
} else {
assert(error.getMessage == s"\n${testParams.message}")
}
assert(error.getCause == null)
error match {
case sparkThrowable: SparkThrowable =>
assert(sparkThrowable.getErrorClass == testParams.errorClass.get)
assert(sparkThrowable.getMessageParameters.asScala == testParams.messageParameters)
assert(sparkThrowable.getQueryContext.isEmpty)
case _ =>
}
}
}

private case class TestPackURI(
connectionString: String,
isCorrect: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,9 @@ private[client] class GrpcExceptionConverter(grpcStub: SparkConnectServiceBlocki
}
}

private object GrpcExceptionConverter {
private[client] object GrpcExceptionConverter {

private case class ErrorParams(
private[client] case class ErrorParams(
message: String,
cause: Option[Throwable],
// errorClass will only be set if the error is both enriched and SparkThrowable.
Expand All @@ -180,7 +180,7 @@ private object GrpcExceptionConverter {
(className, throwableCtr)
}

private val errorFactory = Map(
private[client] val errorFactory = Map(
errorConstructor(params =>
new StreamingQueryException(
params.message,
Expand All @@ -203,23 +203,84 @@ private object GrpcExceptionConverter {
errorClass = params.errorClass,
messageParameters = params.messageParameters,
context = params.queryContext)),
errorConstructor(params => new NamespaceAlreadyExistsException(params.message)),
errorConstructor(params => new TableAlreadyExistsException(params.message, params.cause)),
errorConstructor(params => new TempTableAlreadyExistsException(params.message, params.cause)),
errorConstructor(params => new NoSuchDatabaseException(params.message, params.cause)),
errorConstructor(params => new NoSuchTableException(params.message, params.cause)),
errorConstructor(params =>
new NamespaceAlreadyExistsException(
params.message,
params.errorClass,
params.messageParameters)),
errorConstructor(params =>
new TableAlreadyExistsException(
params.message,
params.cause,
params.errorClass,
params.messageParameters)),
errorConstructor(params =>
new TempTableAlreadyExistsException(
params.message,
params.cause,
params.errorClass,
params.messageParameters)),
errorConstructor(params =>
new NoSuchDatabaseException(
params.message,
params.cause,
params.errorClass,
params.messageParameters)),
errorConstructor(params =>
new NoSuchTableException(
params.message,
params.cause,
params.errorClass,
params.messageParameters)),
errorConstructor[NumberFormatException](params =>
new SparkNumberFormatException(params.message)),
new SparkNumberFormatException(
params.message,
params.errorClass,
params.messageParameters,
params.queryContext)),
errorConstructor[IllegalArgumentException](params =>
new SparkIllegalArgumentException(params.message, params.cause)),
errorConstructor[ArithmeticException](params => new SparkArithmeticException(params.message)),
new SparkIllegalArgumentException(
params.message,
params.cause,
params.errorClass,
params.messageParameters,
params.queryContext)),
errorConstructor[ArithmeticException](params =>
new SparkArithmeticException(
params.message,
params.errorClass,
params.messageParameters,
params.queryContext)),
errorConstructor[UnsupportedOperationException](params =>
new SparkUnsupportedOperationException(params.message)),
new SparkUnsupportedOperationException(
params.message,
params.errorClass,
params.messageParameters)),
errorConstructor[ArrayIndexOutOfBoundsException](params =>
new SparkArrayIndexOutOfBoundsException(params.message)),
errorConstructor[DateTimeException](params => new SparkDateTimeException(params.message)),
errorConstructor(params => new SparkRuntimeException(params.message, params.cause)),
errorConstructor(params => new SparkUpgradeException(params.message, params.cause)),
new SparkArrayIndexOutOfBoundsException(
params.message,
params.errorClass,
params.messageParameters,
params.queryContext)),
errorConstructor[DateTimeException](params =>
new SparkDateTimeException(
params.message,
params.errorClass,
params.messageParameters,
params.queryContext)),
errorConstructor(params =>
new SparkRuntimeException(
params.message,
params.cause,
params.errorClass,
params.messageParameters,
params.queryContext)),
errorConstructor(params =>
new SparkUpgradeException(
params.message,
params.cause,
params.errorClass,
params.messageParameters)),
errorConstructor(params =>
new SparkException(
message = params.message,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import scala.io.Source
import scala.jdk.CollectionConverters._

import com.google.common.io.Files
import kafka.api.Request
import kafka.server.{HostedPartition, KafkaConfig, KafkaServer}
import kafka.server.checkpoints.OffsetCheckpointFile
import kafka.zk.KafkaZkClient
Expand All @@ -39,7 +40,6 @@ import org.apache.kafka.clients.producer._
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.SaslConfigs
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.security.auth.SecurityProtocol.{PLAINTEXT, SASL_PLAINTEXT}
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.SystemTime
Expand Down Expand Up @@ -597,7 +597,7 @@ class KafkaTestUtils(
.getPartitionInfo(topic, partition) match {
case Some(partitionState) =>
zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
FetchRequest.isValidBrokerId(partitionState.leader) &&
Request.isValidBrokerId(partitionState.leader) &&
!partitionState.replicas.isEmpty

case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,12 @@ import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.util.Random

import kafka.log.{LogCleaner, UnifiedLog}
import kafka.server.BrokerTopicStats
import kafka.log.{CleanerConfig, LogCleaner, LogConfig, ProducerStateManagerConfig, UnifiedLog}
import kafka.server.{BrokerTopicStats, LogDirFailureChannel}
import kafka.utils.Pool
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, LogDirFailureChannel, ProducerStateManagerConfig}
import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}

import org.apache.spark._
Expand Down Expand Up @@ -92,13 +90,13 @@ class KafkaRDDSuite extends SparkFunSuite {
val dir = new File(logDir, topic + "-" + partition)
dir.mkdirs()
val logProps = new ju.Properties()
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
logProps.put(TopicConfig.MIN_CLEANABLE_DIRTY_RATIO_CONFIG, java.lang.Float.valueOf(0.1f))
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
logProps.put(LogConfig.MinCleanableDirtyRatioProp, java.lang.Float.valueOf(0.1f))
val logDirFailureChannel = new LogDirFailureChannel(1)
val topicPartition = new TopicPartition(topic, partition)
val producerIdExpirationMs = Int.MaxValue
val producerStateManagerConfig = new ProducerStateManagerConfig(producerIdExpirationMs, false)
val logConfig = new LogConfig(logProps)
val producerStateManagerConfig = new ProducerStateManagerConfig(producerIdExpirationMs)
val logConfig = LogConfig(logProps)
val log = UnifiedLog(
dir,
logConfig,
Expand All @@ -122,7 +120,7 @@ class KafkaRDDSuite extends SparkFunSuite {
log.roll()
logs.put(topicPartition, log)

val cleaner = new LogCleaner(new CleanerConfig(false), Array(dir), logs, logDirFailureChannel)
val cleaner = new LogCleaner(CleanerConfig(), Array(dir), logs, logDirFailureChannel)
cleaner.startup()
cleaner.awaitCleaned(new TopicPartition(topic, partition), log.activeSegment.baseOffset, 1000)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ import scala.annotation.tailrec
import scala.jdk.CollectionConverters._
import scala.util.control.NonFatal

import kafka.api.Request
import kafka.server.{KafkaConfig, KafkaServer}
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.requests.FetchRequest
import org.apache.kafka.common.serialization.StringSerializer
import org.apache.kafka.common.utils.{Time => KTime}
import org.apache.zookeeper.client.ZKClientConfig
Expand Down Expand Up @@ -304,7 +304,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
val leader = partitionState.leader
val isr = partitionState.isr
zkClient.getLeaderForPartition(new TopicPartition(topic, partition)).isDefined &&
FetchRequest.isValidBrokerId(leader) && !isr.isEmpty
Request.isValidBrokerId(leader) && !isr.isEmpty
case _ =>
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.streaming.kafka010.mocks

import java.util.concurrent.{ScheduledFuture, TimeUnit}

import kafka.utils.Scheduler
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.util.Scheduler
import org.jmock.lib.concurrent.DeterministicScheduler

/**
Expand All @@ -42,6 +42,8 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler {

val scheduler = new DeterministicScheduler()

def isStarted: Boolean = true

def startup(): Unit = {}

def shutdown(): Unit = synchronized {
Expand All @@ -54,18 +56,17 @@ private[kafka010] class MockScheduler(val time: Time) extends Scheduler {

def schedule(
name: String,
task: Runnable,
delayMs: Long = 0,
periodMs: Long = -1): ScheduledFuture[_] = synchronized {
if (periodMs >= 0) {
scheduler.scheduleAtFixedRate(task, delayMs, periodMs, TimeUnit.MILLISECONDS)
fun: () => Unit,
delay: Long = 0,
period: Long = -1,
unit: TimeUnit = TimeUnit.MILLISECONDS): ScheduledFuture[_] = synchronized {
val runnable = new Runnable {
override def run(): Unit = fun()
}
if (period >= 0) {
scheduler.scheduleAtFixedRate(runnable, delay, period, unit)
} else {
scheduler.schedule(task, delayMs, TimeUnit.MILLISECONDS)
scheduler.schedule(runnable, delay, unit)
}
}

override def resizeThreadPool(i: Int): Unit = {

}

}
4 changes: 2 additions & 2 deletions dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ jersey-container-servlet/2.40//jersey-container-servlet-2.40.jar
jersey-hk2/2.40//jersey-hk2-2.40.jar
jersey-server/2.40//jersey-server-2.40.jar
jettison/1.5.4//jettison-1.5.4.jar
jetty-util-ajax/9.4.52.v20230823//jetty-util-ajax-9.4.52.v20230823.jar
jetty-util/9.4.52.v20230823//jetty-util-9.4.52.v20230823.jar
jetty-util-ajax/9.4.53.v20231009//jetty-util-ajax-9.4.53.v20231009.jar
jetty-util/9.4.53.v20231009//jetty-util-9.4.53.v20231009.jar
jline/2.14.6//jline-2.14.6.jar
jline/3.22.0//jline-3.22.0.jar
jna/5.13.0//jna-5.13.0.jar
Expand Down
Loading

0 comments on commit 47a0ddf

Please sign in to comment.