Skip to content

Commit

Permalink
Merge pull request #1516 from apache/master
Browse files Browse the repository at this point in the history
Create a new pull request by comparing changes across two branches
  • Loading branch information
GulajavaMinistudio authored Jul 4, 2023
2 parents 37ec005 + 7fcabef commit 1166ae6
Show file tree
Hide file tree
Showing 47 changed files with 1,322 additions and 308 deletions.
30 changes: 10 additions & 20 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@
],
"sqlState" : "22003"
},
"CALL_ON_STREAMING_DATASET_UNSUPPORTED" : {
"message" : [
"The method <methodName> can not be called on streaming Dataset/DataFrame."
]
},
"CANNOT_CAST_DATATYPE" : {
"message" : [
"Cannot cast <sourceType> to <targetType>."
Expand Down Expand Up @@ -2198,6 +2203,11 @@
],
"sqlState" : "42P01"
},
"TABLE_VALUED_FUNCTION_TOO_MANY_TABLE_ARGUMENTS" : {
"message" : [
"There are too many table arguments for table-valued function. It allows one table argument, but got: <num>. If you want to allow it, please set \"spark.sql.allowMultipleTableArguments.enabled\" to \"true\""
]
},
"TASK_WRITE_FAILED" : {
"message" : [
"Task failed while writing rows to <path>."
Expand Down Expand Up @@ -5604,26 +5614,6 @@
"The input <valueType> '<input>' does not match the given number format: '<format>'."
]
},
"_LEGACY_ERROR_TEMP_2311" : {
"message" : [
"'writeTo' can not be called on streaming Dataset/DataFrame."
]
},
"_LEGACY_ERROR_TEMP_2312" : {
"message" : [
"'write' can not be called on streaming Dataset/DataFrame."
]
},
"_LEGACY_ERROR_TEMP_2313" : {
"message" : [
"Hint not found: <name>."
]
},
"_LEGACY_ERROR_TEMP_2314" : {
"message" : [
"cannot resolve '<sqlExpr>' due to argument data type mismatch: <msg>"
]
},
"_LEGACY_ERROR_TEMP_2315" : {
"message" : [
"cannot resolve '<sqlExpr>' due to data type mismatch: <msg><hint>."
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util

import java.io.File
import java.net.{URI, URISyntaxException}

private[spark] object SparkFileUtils {
/**
* Return a well-formed URI for the file described by a user input string.
*
* If the supplied path does not contain a scheme, or is a relative path, it will be
* converted into an absolute path with a file:// scheme.
*/
def resolveURI(path: String): URI = {
try {
val uri = new URI(path)
if (uri.getScheme() != null) {
return uri
}
// make sure to handle if the path has a fragment (applies to yarn
// distributed cache)
if (uri.getFragment() != null) {
val absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI()
return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), absoluteURI.getPath(),
uri.getFragment())
}
} catch {
case e: URISyntaxException =>
}
new File(path).getCanonicalFile().toURI()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util

import java.util.concurrent.TimeoutException

import scala.concurrent.Awaitable
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import org.apache.spark.SparkException

private[spark] object SparkThreadUtils {
// scalastyle:off awaitresult
/**
* Preferred alternative to `Await.result()`.
*
* This method wraps and re-throws any exceptions thrown by the underlying `Await` call, ensuring
* that this thread's stack trace appears in logs.
*
* In addition, it calls `Awaitable.result` directly to avoid using `ForkJoinPool`'s
* `BlockingContext`. Codes running in the user's thread may be in a thread of Scala ForkJoinPool.
* As concurrent executions in ForkJoinPool may see some [[ThreadLocal]] value unexpectedly, this
* method basically prevents ForkJoinPool from running other tasks in the current waiting thread.
* In general, we should use this method because many places in Spark use [[ThreadLocal]] and it's
* hard to debug when [[ThreadLocal]]s leak to other tasks.
*/
@throws(classOf[SparkException])
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
try {
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
// See SPARK-13747.
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
awaitable.result(atMost)(awaitPermission)
} catch {
case e: SparkFatalException =>
throw e.throwable
// TimeoutException and RpcAbortException is thrown in the current thread, so not need to warp
// the exception.
case NonFatal(t)
if !t.isInstanceOf[TimeoutException] =>
throw new SparkException("Exception thrown in awaitResult: ", t)
}
}
// scalastyle:on awaitresult
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import org.apache.commons.codec.digest.DigestUtils.sha256Hex
import org.apache.spark.connect.proto
import org.apache.spark.connect.proto.AddArtifactsResponse
import org.apache.spark.connect.proto.AddArtifactsResponse.ArtifactSummary
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.{SparkFileUtils, SparkThreadUtils}

/**
* The Artifact Manager is responsible for handling and transferring artifacts from the local
Expand Down Expand Up @@ -71,7 +71,7 @@ class ArtifactManager(
* Currently only local files with extensions .jar and .class are supported.
*/
def addArtifact(path: String): Unit = {
addArtifact(Utils.resolveURI(path))
addArtifact(SparkFileUtils.resolveURI(path))
}

private def parseArtifacts(uri: URI): Seq[Artifact] = {
Expand Down Expand Up @@ -201,7 +201,7 @@ class ArtifactManager(
writeBatch()
}
stream.onCompleted()
ThreadUtils.awaitResult(promise.future, Duration.Inf)
SparkThreadUtils.awaitResult(promise.future, Duration.Inf)
// TODO(SPARK-42658): Handle responses containing CRC failures.
}

Expand Down
15 changes: 1 addition & 14 deletions core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -307,20 +307,7 @@ private[spark] object ThreadUtils {
*/
@throws(classOf[SparkException])
def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
try {
// `awaitPermission` is not actually used anywhere so it's safe to pass in null here.
// See SPARK-13747.
val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
awaitable.result(atMost)(awaitPermission)
} catch {
case e: SparkFatalException =>
throw e.throwable
// TimeoutException and RpcAbortException is thrown in the current thread, so not need to warp
// the exception.
case NonFatal(t)
if !t.isInstanceOf[TimeoutException] =>
throw new SparkException("Exception thrown in awaitResult: ", t)
}
SparkThreadUtils.awaitResult(awaitable, atMost)
}
// scalastyle:on awaitresult

Expand Down
17 changes: 1 addition & 16 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2085,22 +2085,7 @@ private[spark] object Utils extends Logging with SparkClassUtils {
* converted into an absolute path with a file:// scheme.
*/
def resolveURI(path: String): URI = {
try {
val uri = new URI(path)
if (uri.getScheme() != null) {
return uri
}
// make sure to handle if the path has a fragment (applies to yarn
// distributed cache)
if (uri.getFragment() != null) {
val absoluteURI = new File(uri.getPath()).getAbsoluteFile().toURI()
return new URI(absoluteURI.getScheme(), absoluteURI.getHost(), absoluteURI.getPath(),
uri.getFragment())
}
} catch {
case e: URISyntaxException =>
}
new File(path).getCanonicalFile().toURI()
SparkFileUtils.resolveURI(path)
}

/** Resolve a comma-separated list of paths. */
Expand Down
12 changes: 12 additions & 0 deletions docs/sql-error-conditions.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@ Unable to find batch `<batchMetadataFile>`.

`<value1>` `<symbol>` `<value2>` caused overflow.

### CALL_ON_STREAMING_DATASET_UNSUPPORTED

SQLSTATE: none assigned

The method `<methodName>` can not be called on streaming Dataset/DataFrame.

### CANNOT_CAST_DATATYPE

[SQLSTATE: 42846](sql-error-conditions-sqlstates.html#class-42-syntax-error-or-access-rule-violation)
Expand Down Expand Up @@ -1568,6 +1574,12 @@ If you did not qualify the name with a schema, verify the current_schema() outpu

To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.

### TABLE_VALUED_FUNCTION_TOO_MANY_TABLE_ARGUMENTS

SQLSTATE: none assigned

There are too many table arguments for table-valued function. It allows one table argument, but got: `<num>`. If you want to allow it, please set "spark.sql.allowMultipleTableArguments.enabled" to "true"

### TASK_WRITE_FAILED

SQLSTATE: none assigned
Expand Down
6 changes: 6 additions & 0 deletions python/docs/source/reference/pyspark.rst
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,23 @@ Spark Context APIs
SparkContext.accumulator
SparkContext.addArchive
SparkContext.addFile
SparkContext.addJobTag
SparkContext.addPyFile
SparkContext.applicationId
SparkContext.binaryFiles
SparkContext.binaryRecords
SparkContext.broadcast
SparkContext.cancelAllJobs
SparkContext.cancelJobGroup
SparkContext.cancelJobsWithTag
SparkContext.clearJobTags
SparkContext.defaultMinPartitions
SparkContext.defaultParallelism
SparkContext.dump_profiles
SparkContext.emptyRDD
SparkContext.getCheckpointDir
SparkContext.getConf
SparkContext.getJobTags
SparkContext.getLocalProperty
SparkContext.getOrCreate
SparkContext.hadoopFile
Expand All @@ -80,9 +84,11 @@ Spark Context APIs
SparkContext.pickleFile
SparkContext.range
SparkContext.resources
SparkContext.removeJobTag
SparkContext.runJob
SparkContext.sequenceFile
SparkContext.setCheckpointDir
SparkContext.setInterruptOnCancel
SparkContext.setJobDescription
SparkContext.setJobGroup
SparkContext.setLocalProperty
Expand Down
Loading

0 comments on commit 1166ae6

Please sign in to comment.