Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/spark into SPARK-4…
Browse files Browse the repository at this point in the history
…3665
  • Loading branch information
itholic committed Jul 11, 2023
2 parents 36eb6c6 + eb07110 commit 02204d3
Show file tree
Hide file tree
Showing 93 changed files with 2,953 additions and 1,222 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ jobs:
- ${{ inputs.java }}
modules:
- >-
pyspark-sql, pyspark-mllib, pyspark-resource
pyspark-sql, pyspark-mllib, pyspark-resource, pyspark-testing
- >-
pyspark-core, pyspark-streaming, pyspark-ml
- >-
Expand Down
126 changes: 75 additions & 51 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,38 @@
"Failed to set permissions on created path <path> back to <permission>."
]
},
"CANNOT_UPDATE_FIELD" : {
"message" : [
"Cannot update <table> field <fieldName> type:"
],
"subClass" : {
"ARRAY_TYPE" : {
"message" : [
"Update the element by updating <fieldName>.element."
]
},
"INTERVAL_TYPE" : {
"message" : [
"Update an interval by updating its fields."
]
},
"MAP_TYPE" : {
"message" : [
"Update a map by updating <fieldName>.key or <fieldName>.value."
]
},
"STRUCT_TYPE" : {
"message" : [
"Update a struct by updating its fields."
]
},
"USER_DEFINED_TYPE" : {
"message" : [
"Update a UserDefinedType[<udtSql>] by updating its fields."
]
}
}
},
"CANNOT_UP_CAST_DATATYPE" : {
"message" : [
"Cannot up cast <expression> from <sourceType> to <targetType>.",
Expand Down Expand Up @@ -338,6 +370,28 @@
],
"sqlState" : "42710"
},
"CREATE_VIEW_COLUMN_ARITY_MISMATCH" : {
"message" : [
"Cannot create view <viewName>, the reason is"
],
"subClass" : {
"NOT_ENOUGH_DATA_COLUMNS" : {
"message" : [
"not enough data columns:",
"View columns: <viewColumns>.",
"Data columns: <dataColumns>."
]
},
"TOO_MANY_DATA_COLUMNS" : {
"message" : [
"too many data columns:",
"View columns: <viewColumns>.",
"Data columns: <dataColumns>."
]
}
},
"sqlState" : "21S01"
},
"DATATYPE_MISMATCH" : {
"message" : [
"Cannot resolve <sqlExpr> due to data type mismatch:"
Expand Down Expand Up @@ -1215,7 +1269,7 @@
},
"MISMATCH_INPUT" : {
"message" : [
"The input <inputType> '<input>' does not match the format."
"The input <inputType> <input> does not match the format."
]
},
"THOUSANDS_SEPS_MUST_BEFORE_DEC" : {
Expand Down Expand Up @@ -1740,6 +1794,11 @@
"The join condition <joinCondition> has the invalid type <conditionType>, expected \"BOOLEAN\"."
]
},
"LOAD_DATA_PATH_NOT_EXISTS" : {
"message" : [
"LOAD DATA input path does not exist: <path>."
]
},
"LOCAL_MUST_WITH_SCHEMA_FILE" : {
"message" : [
"LOCAL must be used together with the schema of `file`, but got: `<actualSchema>`."
Expand Down Expand Up @@ -2512,6 +2571,11 @@
"The direct query on files does not support the data source type: <className>. Please try a different data source type or consider using a different query method."
]
},
"UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE" : {
"message" : [
"The <format> datasource doesn't support the column <columnName> of the type <columnType>."
]
},
"UNSUPPORTED_DEFAULT_VALUE" : {
"message" : [
"DEFAULT column values is not supported."
Expand Down Expand Up @@ -2619,11 +2683,21 @@
"DESC TABLE COLUMN for a specific partition."
]
},
"DROP_DATABASE" : {
"message" : [
"Drop the default database <database>."
]
},
"DROP_NAMESPACE" : {
"message" : [
"Drop the namespace <namespace>."
]
},
"HIVE_TABLE_TYPE" : {
"message" : [
"The <tableName> is hive <tableType>."
]
},
"HIVE_WITH_ANSI_INTERVALS" : {
"message" : [
"Hive table <tableName> with ANSI intervals."
Expand Down Expand Up @@ -2965,11 +3039,6 @@
"3. set \"spark.sql.legacy.allowUntypedScalaUDF\" to \"true\" and use this API with caution."
]
},
"UPDATE_FIELD_WITH_STRUCT_UNSUPPORTED" : {
"message" : [
"Cannot update <table> field <fieldName> type: update a struct by updating its fields."
]
},
"VIEW_ALREADY_EXISTS" : {
"message" : [
"Cannot create view <relationName> because it already exists.",
Expand Down Expand Up @@ -3465,11 +3534,6 @@
"<database> is a system preserved database, you cannot create a database with this name."
]
},
"_LEGACY_ERROR_TEMP_1067" : {
"message" : [
"Can not drop default database."
]
},
"_LEGACY_ERROR_TEMP_1068" : {
"message" : [
"<database> is a system preserved database, you cannot use it as current database. To access global temporary views, you should use qualified name with the GLOBAL_TEMP_DATABASE, e.g. SELECT * FROM <database>.viewName."
Expand Down Expand Up @@ -3770,11 +3834,6 @@
"Fail to rebuild expression: missing key <filter> in `translatedFilterToExpr`."
]
},
"_LEGACY_ERROR_TEMP_1150" : {
"message" : [
"Column `<field>` has a data type of <fieldType>, which is not supported by <format>."
]
},
"_LEGACY_ERROR_TEMP_1151" : {
"message" : [
"Fail to resolve data source for the table <table> since the table serde property has the duplicated key <key> with extra options specified for this scan operation. To fix this, you can rollback to the legacy behavior of ignoring the extra options by setting the config <config> to `false`, or address the conflicts of the same config."
Expand Down Expand Up @@ -4040,11 +4099,6 @@
"Hive metastore does not support altering database location."
]
},
"_LEGACY_ERROR_TEMP_1220" : {
"message" : [
"Hive <tableType> is not supported."
]
},
"_LEGACY_ERROR_TEMP_1221" : {
"message" : [
"Hive 0.12 doesn't support creating permanent functions. Please use Hive 0.13 or higher."
Expand Down Expand Up @@ -4185,11 +4239,6 @@
"LOAD DATA target table <tableIdentWithDB> is not partitioned, but a partition spec was provided."
]
},
"_LEGACY_ERROR_TEMP_1265" : {
"message" : [
"LOAD DATA input path does not exist: <path>."
]
},
"_LEGACY_ERROR_TEMP_1266" : {
"message" : [
"Operation not allowed: TRUNCATE TABLE on external tables: <tableIdentWithDB>."
Expand Down Expand Up @@ -4245,11 +4294,6 @@
"The logical plan that represents the view is not analyzed."
]
},
"_LEGACY_ERROR_TEMP_1277" : {
"message" : [
"The number of columns produced by the SELECT clause (num: `<analyzedPlanLength>`) does not match the number of column names specified by CREATE VIEW (num: `<userSpecifiedColumnsLength>`)."
]
},
"_LEGACY_ERROR_TEMP_1278" : {
"message" : [
"<name> is not a view."
Expand Down Expand Up @@ -5688,26 +5732,6 @@
"Number of dynamic partitions created is <numWrittenParts>, which is more than <maxDynamicPartitions>. To solve this try to set <maxDynamicPartitionsKey> to at least <numWrittenParts>."
]
},
"_LEGACY_ERROR_TEMP_2325" : {
"message" : [
"Cannot update <table> field <fieldName> type: update a map by updating <fieldName>.key or <fieldName>.value."
]
},
"_LEGACY_ERROR_TEMP_2326" : {
"message" : [
"Cannot update <table> field <fieldName> type: update the element by updating <fieldName>.element."
]
},
"_LEGACY_ERROR_TEMP_2327" : {
"message" : [
"Cannot update <table> field <fieldName> type: update a UserDefinedType[<udtSql>] by updating its fields."
]
},
"_LEGACY_ERROR_TEMP_2328" : {
"message" : [
"Cannot update <table> field <fieldName> to interval type."
]
},
"_LEGACY_ERROR_TEMP_2330" : {
"message" : [
"Cannot change nullable column to non-nullable: <fieldName>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.io._
import java.net.URL
import java.nio.file.{Files, Paths, StandardCopyOption}
import java.sql.{Date, Timestamp}
import java.util.{Locale, UUID}
import java.util.UUID

import scala.collection.JavaConverters._

Expand Down Expand Up @@ -1585,20 +1585,24 @@ abstract class AvroSuite
withSQLConf(SQLConf.LEGACY_INTERVAL_ENABLED.key -> "true") {
withTempDir { dir =>
val tempDir = new File(dir, "files").getCanonicalPath
var msg = intercept[AnalysisException] {
sql("select interval 1 days").write.format("avro").mode("overwrite").save(tempDir)
}.getMessage
assert(msg.contains("Cannot save interval data type into external storage.") ||
msg.contains("Column `INTERVAL '1' DAY` has a data type of interval day, " +
"which is not supported by Avro."))

msg = intercept[AnalysisException] {
spark.udf.register("testType", () => new IntervalData())
sql("select testType()").write.format("avro").mode("overwrite").save(tempDir)
}.getMessage
assert(msg.toLowerCase(Locale.ROOT)
.contains("column `testtype()` has a data type of interval, " +
"which is not supported by avro."))
checkError(
exception = intercept[AnalysisException] {
sql("select interval 1 days").write.format("avro").mode("overwrite").save(tempDir)
},
errorClass = "_LEGACY_ERROR_TEMP_1136",
parameters = Map.empty
)
checkError(
exception = intercept[AnalysisException] {
spark.udf.register("testType", () => new IntervalData())
sql("select testType()").write.format("avro").mode("overwrite").save(tempDir)
},
errorClass = "UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE",
parameters = Map(
"columnName" -> "`testType()`",
"columnType" -> "\"INTERVAL\"",
"format" -> "Avro")
)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import scala.reflect.runtime.universe.TypeTag

import com.google.common.cache.{CacheBuilder, CacheLoader}
import io.grpc.ClientInterceptor
import org.apache.arrow.memory.RootAllocator

import org.apache.spark.annotation.{DeveloperApi, Experimental}
Expand Down Expand Up @@ -629,7 +630,7 @@ object SparkSession extends Logging {
* Create a new [[SparkSession]] based on the connect client [[Configuration]].
*/
private[sql] def create(configuration: Configuration): SparkSession = {
new SparkSession(new SparkConnectClient(configuration), cleaner, planIdGenerator)
new SparkSession(configuration.toSparkConnectClient, cleaner, planIdGenerator)
}

/**
Expand All @@ -656,6 +657,18 @@ object SparkSession extends Logging {
this
}

/**
* Add an interceptor [[ClientInterceptor]] to be used during channel creation.
*
* Note that interceptors added last are executed first by gRPC.
*
* @since 3.5.0
*/
def interceptor(interceptor: ClientInterceptor): Builder = {
builder.interceptor(interceptor)
this
}

private[sql] def client(client: SparkConnectClient): Builder = {
this.client = client
this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ private[sql] class SparkConnectClient(
private[sql] val configuration: SparkConnectClient.Configuration,
private val channel: ManagedChannel) {

def this(configuration: SparkConnectClient.Configuration) =
this(configuration, configuration.createChannel())

private val userContext: UserContext = configuration.userContext

private[this] val bstub = new CustomSparkConnectBlockingStub(channel, configuration.retryPolicy)
Expand Down Expand Up @@ -198,7 +195,7 @@ private[sql] class SparkConnectClient(
bstub.interrupt(request)
}

def copy(): SparkConnectClient = new SparkConnectClient(configuration)
def copy(): SparkConnectClient = configuration.toSparkConnectClient

/**
* Add a single artifact to the client session.
Expand Down Expand Up @@ -463,7 +460,18 @@ object SparkConnectClient {
this
}

def build(): SparkConnectClient = new SparkConnectClient(_configuration)
/**
* Add an interceptor to be used during channel creation.
*
* Note that interceptors added last are executed first by gRPC.
*/
def interceptor(interceptor: ClientInterceptor): Builder = {
val interceptors = _configuration.interceptors ++ List(interceptor)
_configuration = _configuration.copy(interceptors = interceptors)
this
}

def build(): SparkConnectClient = _configuration.toSparkConnectClient
}

/**
Expand All @@ -478,7 +486,8 @@ object SparkConnectClient {
isSslEnabled: Option[Boolean] = None,
metadata: Map[String, String] = Map.empty,
userAgent: String = DEFAULT_USER_AGENT,
retryPolicy: GrpcRetryHandler.RetryPolicy = GrpcRetryHandler.RetryPolicy()) {
retryPolicy: GrpcRetryHandler.RetryPolicy = GrpcRetryHandler.RetryPolicy(),
interceptors: List[ClientInterceptor] = List.empty) {

def userContext: proto.UserContext = {
val builder = proto.UserContext.newBuilder()
Expand Down Expand Up @@ -509,12 +518,18 @@ object SparkConnectClient {

def createChannel(): ManagedChannel = {
val channelBuilder = Grpc.newChannelBuilderForAddress(host, port, credentials)

if (metadata.nonEmpty) {
channelBuilder.intercept(new MetadataHeaderClientInterceptor(metadata))
}

interceptors.foreach(channelBuilder.intercept(_))

channelBuilder.maxInboundMessageSize(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)
channelBuilder.build()
}

def toSparkConnectClient: SparkConnectClient = new SparkConnectClient(this, createChannel())
}

/**
Expand All @@ -540,10 +555,6 @@ object SparkConnectClient {
}
})
}

override def thisUsesUnstableApi(): Unit = {
// Marks this API is not stable. Left empty on purpose.
}
}

/**
Expand Down
Loading

0 comments on commit 02204d3

Please sign in to comment.