diff --git a/.github/workflows/build_java21.yml b/.github/workflows/build_java21.yml index 1e7c998027d7e..760d47ccb7928 100644 --- a/.github/workflows/build_java21.yml +++ b/.github/workflows/build_java21.yml @@ -42,7 +42,7 @@ jobs: jobs: >- { "build": "true", - "pyspark": "true", + "pyspark": "false", "sparkr": "true", "tpcds-1g": "true", "docker-integration-tests": "true" diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java index 7f0862fcef435..b95a8700109f1 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/RemoteBlockPushResolver.java @@ -328,6 +328,10 @@ public MergedBlockMeta getMergedBlockMeta( int size = (int) indexFile.length(); // First entry is the zero offset int numChunks = (size / Long.BYTES) - 1; + if (numChunks <= 0) { + throw new RuntimeException(String.format( + "Merged shuffle index file %s is empty", indexFile.getPath())); + } File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId); if (!metaFile.exists()) { throw new RuntimeException(String.format("Merged shuffle meta file %s not found", diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java index 2526a94f42940..0847121b0ccb0 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/RemoteBlockPushResolverSuite.java @@ -281,7 +281,7 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE verifyMetrics(4, 0, 0, 0, 0, 0, 4); } - @Test + @Test(expected = RuntimeException.class) public void testFailureAfterData() throws IOException { StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream( @@ -289,12 +289,16 @@ public void testFailureAfterData() throws IOException { stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); - MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); - assertEquals("num-chunks", 0, blockMeta.getNumChunks()); - verifyMetrics(4, 0, 0, 0, 0, 0, 4); + try { + pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); + } catch (RuntimeException e) { + assertTrue(e.getMessage().contains("is empty")); + verifyMetrics(4, 0, 0, 0, 0, 0, 4); + throw e; + } } - @Test + @Test(expected = RuntimeException.class) public void testFailureAfterMultipleDataBlocks() throws IOException { StreamCallbackWithID stream = pushResolver.receiveBlockDataAsStream( @@ -304,9 +308,13 @@ public void testFailureAfterMultipleDataBlocks() throws IOException { stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4])); stream.onFailure(stream.getID(), new RuntimeException("Forced Failure")); pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0)); - MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); - assertEquals("num-chunks", 0, blockMeta.getNumChunks()); - verifyMetrics(9, 0, 0, 0, 0, 0, 9); + try { + pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0); + } catch (RuntimeException e) { + assertTrue(e.getMessage().contains("is empty")); + verifyMetrics(9, 0, 0, 0, 0, 0, 9); + throw e; + } } @Test diff --git a/common/utils/src/main/resources/error/error-classes.json b/common/utils/src/main/resources/error/error-classes.json index 6a72fc5449eed..3126fb9519b16 100644 --- a/common/utils/src/main/resources/error/error-classes.json +++ b/common/utils/src/main/resources/error/error-classes.json @@ -558,6 +558,11 @@ "The data type of the column () do not have the same type: () <> ()." ] }, + "TYPE_CHECK_FAILURE_WITH_HINT" : { + "message" : [ + "." + ] + }, "UNEXPECTED_CLASS_TYPE" : { "message" : [ "class not found." @@ -1331,6 +1336,33 @@ "Numeric literal is outside the valid range for with minimum value of and maximum value of . Please adjust the value accordingly." ] }, + "INVALID_OBSERVED_METRICS" : { + "message" : [ + "Invalid observed metrics." + ], + "subClass" : { + "MISSING_NAME" : { + "message" : [ + "The observed metrics should be named: ." + ] + }, + "NESTED_AGGREGATES_UNSUPPORTED" : { + "message" : [ + "Nested aggregates are not allowed in observed metrics, but found: ." + ] + }, + "NON_AGGREGATE_FUNC_ARG_IS_NON_DETERMINISTIC" : { + "message" : [ + "Non-deterministic expression can only be used as an argument to an aggregate function." + ] + }, + "WINDOW_EXPRESSIONS_UNSUPPORTED" : { + "message" : [ + "Window expressions are not allowed in observed metrics, but found: ." + ] + } + } + }, "INVALID_OPTIONS" : { "message" : [ "Invalid options:" @@ -5614,31 +5646,6 @@ "The input '' does not match the given number format: ''." ] }, - "_LEGACY_ERROR_TEMP_2315" : { - "message" : [ - "cannot resolve '' due to data type mismatch: ." - ] - }, - "_LEGACY_ERROR_TEMP_2316" : { - "message" : [ - "observed metrics should be named: ." - ] - }, - "_LEGACY_ERROR_TEMP_2317" : { - "message" : [ - "window expressions are not allowed in observed metrics, but found: ." - ] - }, - "_LEGACY_ERROR_TEMP_2318" : { - "message" : [ - "non-deterministic expression can only be used as an argument to an aggregate function." - ] - }, - "_LEGACY_ERROR_TEMP_2319" : { - "message" : [ - "nested aggregates are not allowed in observed metrics, but found: ." - ] - }, "_LEGACY_ERROR_TEMP_2320" : { "message" : [ "distinct aggregates are not allowed in observed metrics, but found: ." diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala index c8057ca58798e..edaaa8835cc01 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroOptions.scala @@ -81,14 +81,14 @@ private[sql] class AvroOptions( /** * Top level record name in write result, which is required in Avro spec. - * See https://avro.apache.org/docs/1.11.1/specification/#schema-record . + * See https://avro.apache.org/docs/1.11.2/specification/#schema-record . * Default value is "topLevelRecord" */ val recordName: String = parameters.getOrElse(RECORD_NAME, "topLevelRecord") /** * Record namespace in write result. Default value is "". - * See Avro spec for details: https://avro.apache.org/docs/1.11.1/specification/#schema-record . + * See Avro spec for details: https://avro.apache.org/docs/1.11.2/specification/#schema-record . */ val recordNamespace: String = parameters.getOrElse(RECORD_NAMESPACE, "") diff --git a/core/src/test/resources/TestHelloV2.jar b/connector/connect/client/jvm/src/test/resources/TestHelloV2_2.12.jar similarity index 100% rename from core/src/test/resources/TestHelloV2.jar rename to connector/connect/client/jvm/src/test/resources/TestHelloV2_2.12.jar diff --git a/connector/connect/client/jvm/src/test/resources/TestHelloV2_2.13.jar b/connector/connect/client/jvm/src/test/resources/TestHelloV2_2.13.jar new file mode 100644 index 0000000000000..6dee8fcd9c957 Binary files /dev/null and b/connector/connect/client/jvm/src/test/resources/TestHelloV2_2.13.jar differ diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala index 61959234c8790..676ad6b090ed8 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/application/ReplE2ESuite.scala @@ -17,12 +17,16 @@ package org.apache.spark.sql.application import java.io.{PipedInputStream, PipedOutputStream} +import java.nio.file.Paths import java.util.concurrent.{Executors, Semaphore, TimeUnit} +import scala.util.Properties + import org.apache.commons.io.output.ByteArrayOutputStream +import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.scalatest.BeforeAndAfterEach -import org.apache.spark.sql.connect.client.util.RemoteSparkSession +import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession} class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { @@ -35,6 +39,11 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { private var ammoniteIn: PipedInputStream = _ private val semaphore: Semaphore = new Semaphore(0) + private val scalaVersion = Properties.versionNumberString + .split("\\.") + .take(2) + .mkString(".") + private def getCleanString(out: ByteArrayOutputStream): String = { // Remove ANSI colour codes // Regex taken from https://stackoverflow.com/a/25189932 @@ -42,26 +51,29 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { } override def beforeAll(): Unit = { - super.beforeAll() - ammoniteOut = new ByteArrayOutputStream() - testSuiteOut = new PipedOutputStream() - // Connect the `testSuiteOut` and `ammoniteIn` pipes - ammoniteIn = new PipedInputStream(testSuiteOut) - errorStream = new ByteArrayOutputStream() - - val args = Array("--port", serverPort.toString) - val task = new Runnable { - override def run(): Unit = { - ConnectRepl.doMain( - args = args, - semaphore = Some(semaphore), - inputStream = ammoniteIn, - outputStream = ammoniteOut, - errorStream = errorStream) + // TODO(SPARK-44121) Remove this check condition + if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) { + super.beforeAll() + ammoniteOut = new ByteArrayOutputStream() + testSuiteOut = new PipedOutputStream() + // Connect the `testSuiteOut` and `ammoniteIn` pipes + ammoniteIn = new PipedInputStream(testSuiteOut) + errorStream = new ByteArrayOutputStream() + + val args = Array("--port", serverPort.toString) + val task = new Runnable { + override def run(): Unit = { + ConnectRepl.doMain( + args = args, + semaphore = Some(semaphore), + inputStream = ammoniteIn, + outputStream = ammoniteOut, + errorStream = errorStream) + } } - } - executorService.submit(task) + executorService.submit(task) + } } override def afterAll(): Unit = { @@ -96,7 +108,10 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { def assertContains(message: String, output: String): Unit = { val isContain = output.contains(message) - assert(isContain, "Ammonite output did not contain '" + message + "':\n" + output) + assert( + isContain, + "Ammonite output did not contain '" + message + "':\n" + output + + s"\nError Output: ${getCleanString(errorStream)}") } test("Simple query") { @@ -151,4 +166,33 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach { assertContains("Array[java.lang.Long] = Array(0L, 2L, 4L, 6L, 8L)", output) } + test("Client-side JAR") { + // scalastyle:off classforname line.size.limit + val sparkHome = IntegrationTestUtils.sparkHome + val testJar = Paths + .get( + s"$sparkHome/connector/connect/client/jvm/src/test/resources/TestHelloV2_$scalaVersion.jar") + .toFile + + assert(testJar.exists(), "Missing TestHelloV2 jar!") + val input = s""" + |import java.nio.file.Paths + |def classLoadingTest(x: Int): Int = { + | val classloader = + | Option(Thread.currentThread().getContextClassLoader).getOrElse(getClass.getClassLoader) + | val cls = Class.forName("com.example.Hello$$", true, classloader) + | val module = cls.getField("MODULE$$").get(null) + | cls.getMethod("test").invoke(module).asInstanceOf[Int] + |} + |val classLoaderUdf = udf(classLoadingTest _) + | + |val jarPath = Paths.get("${testJar.toString}").toUri + |spark.addArtifact(jarPath) + | + |spark.range(5).select(classLoaderUdf(col("id"))).as[Int].collect() + """.stripMargin + val output = runCommandsInShell(input) + assertContains("Array[Int] = Array(2, 2, 2, 2, 2)", output) + // scalastyle:on classforname line.size.limit + } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala index 0a91c6b955023..449ba011c2196 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala @@ -133,7 +133,7 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging Files.move(serverLocalStagingPath, target) if (remoteRelativePath.startsWith(s"jars${File.separator}")) { jarsList.add(target) - jarsURI.add(artifactURI + "/" + target.toString) + jarsURI.add(artifactURI + "/" + remoteRelativePath.toString) } else if (remoteRelativePath.startsWith(s"pyfiles${File.separator}")) { sessionHolder.session.sparkContext.addFile(target.toString) val stringRemotePath = remoteRelativePath.toString @@ -188,7 +188,7 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging blockManager.removeCache(sessionHolder.userId, sessionHolder.sessionId) // Clean up artifacts folder - FileUtils.deleteDirectory(artifactRootPath.toFile) + FileUtils.deleteDirectory(artifactPath.toFile) } private[connect] def uploadArtifactToFs( diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala index 612bf096b22bd..345e458cd2f04 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala @@ -274,6 +274,24 @@ class ArtifactManagerSuite extends SharedSparkSession with ResourceHelper { assert(result.forall(_.getString(0).contains("Ahri"))) } } + + test("SPARK-44300: Cleaning up resources only deletes session-specific resources") { + val copyDir = Utils.createTempDir().toPath + FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile) + val stagingPath = copyDir.resolve("Hello.class") + val remotePath = Paths.get("classes/Hello.class") + + val sessionHolder = SparkConnectService.getOrCreateIsolatedSession("c1", "session") + sessionHolder.addArtifact(remotePath, stagingPath, None) + + val sessionDirectory = + SparkConnectArtifactManager.getArtifactDirectoryAndUriForSession(sessionHolder)._1.toFile + assert(sessionDirectory.exists()) + + sessionHolder.artifactManager.cleanUpResources() + assert(!sessionDirectory.exists()) + assert(SparkConnectArtifactManager.artifactRootPath.toFile.exists()) + } } class ArtifactUriSuite extends SparkFunSuite with LocalSparkContext { diff --git a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js index b52ece87ba125..1ea0adf5467be 100644 --- a/core/src/main/resources/org/apache/spark/ui/static/executorspage.js +++ b/core/src/main/resources/org/apache/spark/ui/static/executorspage.js @@ -147,7 +147,7 @@ function totalDurationColor(totalGCTime, totalDuration) { } var sumOptionalColumns = [3, 4]; -var execOptionalColumns = [5, 6, 7, 8, 9, 10, 13, 14, 25]; +var execOptionalColumns = [5, 6, 7, 8, 9, 10, 13, 14, 26]; var execDataTable; var sumDataTable; @@ -572,15 +572,7 @@ $(document).ready(function () { ], "order": [[0, "asc"]], "columnDefs": [ - {"visible": false, "targets": 5}, - {"visible": false, "targets": 6}, - {"visible": false, "targets": 7}, - {"visible": false, "targets": 8}, - {"visible": false, "targets": 9}, - {"visible": false, "targets": 10}, - {"visible": false, "targets": 13}, - {"visible": false, "targets": 14}, - {"visible": false, "targets": 26} + {"visible": false, "targets": execOptionalColumns} ], "deferRender": true }; @@ -712,10 +704,8 @@ $(document).ready(function () { "searching": false, "info": false, "columnDefs": [ - {"visible": false, "targets": 3}, - {"visible": false, "targets": 4} + {"visible": false, "targets": sumOptionalColumns} ] - }; sumDataTable = $(sumSelector).DataTable(sumConf); diff --git a/core/src/test/resources/TestHelloV2_2.12.jar b/core/src/test/resources/TestHelloV2_2.12.jar new file mode 100644 index 0000000000000..d89cf6543a200 Binary files /dev/null and b/core/src/test/resources/TestHelloV2_2.12.jar differ diff --git a/core/src/test/resources/TestHelloV2_2.13.jar b/core/src/test/resources/TestHelloV2_2.13.jar new file mode 100644 index 0000000000000..6dee8fcd9c957 Binary files /dev/null and b/core/src/test/resources/TestHelloV2_2.13.jar differ diff --git a/core/src/test/resources/TestHelloV3.jar b/core/src/test/resources/TestHelloV3_2.12.jar similarity index 100% rename from core/src/test/resources/TestHelloV3.jar rename to core/src/test/resources/TestHelloV3_2.12.jar diff --git a/core/src/test/resources/TestHelloV3_2.13.jar b/core/src/test/resources/TestHelloV3_2.13.jar new file mode 100644 index 0000000000000..0c292e7d81ad7 Binary files /dev/null and b/core/src/test/resources/TestHelloV3_2.13.jar differ diff --git a/core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala b/core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala index 33c1baccd7298..72ee0e96fd014 100644 --- a/core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/ClassLoaderIsolationSuite.scala @@ -17,21 +17,31 @@ package org.apache.spark.executor +import scala.util.Properties + import org.apache.spark.{JobArtifactSet, LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.util.Utils class ClassLoaderIsolationSuite extends SparkFunSuite with LocalSparkContext { + + private val scalaVersion = Properties.versionNumberString + .split("\\.") + .take(2) + .mkString(".") + val jar1 = Thread.currentThread().getContextClassLoader.getResource("TestUDTF.jar").toString // package com.example // object Hello { def test(): Int = 2 } // case class Hello(x: Int, y: Int) - val jar2 = Thread.currentThread().getContextClassLoader.getResource("TestHelloV2.jar").toString + val jar2 = Thread.currentThread().getContextClassLoader + .getResource(s"TestHelloV2_$scalaVersion.jar").toString // package com.example // object Hello { def test(): Int = 3 } // case class Hello(x: String) - val jar3 = Thread.currentThread().getContextClassLoader.getResource("TestHelloV3.jar").toString + val jar3 = Thread.currentThread().getContextClassLoader + .getResource(s"TestHelloV3_$scalaVersion.jar").toString test("Executor classloader isolation with JobArtifactSet") { sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local")) diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3 index 3ac03fa64727f..1b91686ed4db0 100644 --- a/dev/deps/spark-deps-hadoop-3-hive-2.3 +++ b/dev/deps/spark-deps-hadoop-3-hive-2.3 @@ -21,9 +21,9 @@ arrow-memory-core/12.0.1//arrow-memory-core-12.0.1.jar arrow-memory-netty/12.0.1//arrow-memory-netty-12.0.1.jar arrow-vector/12.0.1//arrow-vector-12.0.1.jar audience-annotations/0.5.0//audience-annotations-0.5.0.jar -avro-ipc/1.11.1//avro-ipc-1.11.1.jar -avro-mapred/1.11.1//avro-mapred-1.11.1.jar -avro/1.11.1//avro-1.11.1.jar +avro-ipc/1.11.2//avro-ipc-1.11.2.jar +avro-mapred/1.11.2//avro-mapred-1.11.2.jar +avro/1.11.2//avro-1.11.2.jar aws-java-sdk-bundle/1.12.367//aws-java-sdk-bundle-1.12.367.jar azure-data-lake-store-sdk/2.3.9//azure-data-lake-store-sdk-2.3.9.jar azure-keyvault-core/1.0.0//azure-keyvault-core-1.0.0.jar @@ -177,11 +177,11 @@ log4j-slf4j2-impl/2.20.0//log4j-slf4j2-impl-2.20.0.jar logging-interceptor/3.12.12//logging-interceptor-3.12.12.jar lz4-java/1.8.0//lz4-java-1.8.0.jar mesos/1.4.3/shaded-protobuf/mesos-1.4.3-shaded-protobuf.jar -metrics-core/4.2.18//metrics-core-4.2.18.jar -metrics-graphite/4.2.18//metrics-graphite-4.2.18.jar -metrics-jmx/4.2.18//metrics-jmx-4.2.18.jar -metrics-json/4.2.18//metrics-json-4.2.18.jar -metrics-jvm/4.2.18//metrics-jvm-4.2.18.jar +metrics-core/4.2.19//metrics-core-4.2.19.jar +metrics-graphite/4.2.19//metrics-graphite-4.2.19.jar +metrics-jmx/4.2.19//metrics-jmx-4.2.19.jar +metrics-json/4.2.19//metrics-json-4.2.19.jar +metrics-jvm/4.2.19//metrics-jvm-4.2.19.jar minlog/1.3.0//minlog-1.3.0.jar netty-all/4.1.93.Final//netty-all-4.1.93.Final.jar netty-buffer/4.1.93.Final//netty-buffer-4.1.93.Final.jar @@ -227,7 +227,7 @@ rocksdbjni/8.3.2//rocksdbjni-8.3.2.jar scala-collection-compat_2.12/2.7.0//scala-collection-compat_2.12-2.7.0.jar scala-compiler/2.12.18//scala-compiler-2.12.18.jar scala-library/2.12.18//scala-library-2.12.18.jar -scala-parser-combinators_2.12/2.2.0//scala-parser-combinators_2.12-2.2.0.jar +scala-parser-combinators_2.12/2.3.0//scala-parser-combinators_2.12-2.3.0.jar scala-reflect/2.12.18//scala-reflect-2.12.18.jar scala-xml_2.12/2.1.0//scala-xml_2.12-2.1.0.jar shims/0.9.45//shims-0.9.45.jar diff --git a/docs/sql-data-sources-avro.md b/docs/sql-data-sources-avro.md index 977886a6f34e2..b01174b918245 100644 --- a/docs/sql-data-sources-avro.md +++ b/docs/sql-data-sources-avro.md @@ -417,7 +417,7 @@ applications. Read the [Advanced Dependency Management](https://spark.apache Submission Guide for more details. ## Supported types for Avro -> Spark SQL conversion -Currently Spark supports reading all [primitive types](https://avro.apache.org/docs/1.11.1/specification/#primitive-types) and [complex types](https://avro.apache.org/docs/1.11.1/specification/#complex-types) under records of Avro. +Currently Spark supports reading all [primitive types](https://avro.apache.org/docs/1.11.2/specification/#primitive-types) and [complex types](https://avro.apache.org/docs/1.11.2/specification/#complex-types) under records of Avro. @@ -481,7 +481,7 @@ In addition to the types listed above, it supports reading `union` types. The fo 3. `union(something, null)`, where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true. All other union types are considered complex. They will be mapped to StructType where field names are member0, member1, etc., in accordance with members of the union. This is consistent with the behavior when converting between Avro and Parquet. -It also supports reading the following Avro [logical types](https://avro.apache.org/docs/1.11.1/specification/#logical-types): +It also supports reading the following Avro [logical types](https://avro.apache.org/docs/1.11.2/specification/#logical-types):
Avro typeSpark SQL type
diff --git a/docs/sql-error-conditions-datatype-mismatch-error-class.md b/docs/sql-error-conditions-datatype-mismatch-error-class.md index 7d203432562d3..6ec4f07895029 100644 --- a/docs/sql-error-conditions-datatype-mismatch-error-class.md +++ b/docs/sql-error-conditions-datatype-mismatch-error-class.md @@ -207,6 +207,10 @@ The lower bound of a window frame must be `` to the upper bound. The data type of the column (``) do not have the same type: `` (``) <> `` (``). +## TYPE_CHECK_FAILURE_WITH_HINT + +````. + ## UNEXPECTED_CLASS_TYPE class `` not found. diff --git a/docs/sql-error-conditions-invalid-observed-metrics-error-class.md b/docs/sql-error-conditions-invalid-observed-metrics-error-class.md new file mode 100644 index 0000000000000..10c144e06fa79 --- /dev/null +++ b/docs/sql-error-conditions-invalid-observed-metrics-error-class.md @@ -0,0 +1,42 @@ +--- +layout: global +title: INVALID_OBSERVED_METRICS error class +displayTitle: INVALID_OBSERVED_METRICS error class +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +Invalid observed metrics. + +This error class has the following derived error classes: + +## MISSING_NAME + +The observed metrics should be named: ``. + +## NESTED_AGGREGATES_UNSUPPORTED + +Nested aggregates are not allowed in observed metrics, but found: ``. + +## NON_AGGREGATE_FUNC_ARG_IS_NON_DETERMINISTIC + +Non-deterministic expression `` can only be used as an argument to an aggregate function. + +## WINDOW_EXPRESSIONS_UNSUPPORTED + +Window expressions are not allowed in observed metrics, but found: ``. + + diff --git a/pom.xml b/pom.xml index deccc904dd910..bc14cdd584e0e 100644 --- a/pom.xml +++ b/pom.xml @@ -152,9 +152,9 @@ If you changes codahale.metrics.version, you also need to change the link to metrics.dropwizard.io in docs/monitoring.md. --> - 4.2.18 + 4.2.19 - 1.11.1 + 1.11.2 1.12.0 1.11.655 @@ -1102,7 +1102,7 @@ org.scala-lang.modules scala-parser-combinators_${scala.binary.version} - 2.2.0 + 2.3.0 jline diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 7f9da32224f70..8f2f5d7878794 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -1110,7 +1110,7 @@ object DependencyOverrides { dependencyOverrides += "com.google.guava" % "guava" % guavaVersion, dependencyOverrides += "xerces" % "xercesImpl" % "2.12.2", dependencyOverrides += "jline" % "jline" % "2.14.6", - dependencyOverrides += "org.apache.avro" % "avro" % "1.11.1") + dependencyOverrides += "org.apache.avro" % "avro" % "1.11.2") } /** diff --git a/python/pyspark/pandas/tests/test_series_string.py b/python/pyspark/pandas/tests/test_series_string.py index 956567bc5a4ed..3c2bd58da1a28 100644 --- a/python/pyspark/pandas/tests/test_series_string.py +++ b/python/pyspark/pandas/tests/test_series_string.py @@ -246,6 +246,10 @@ def test_string_repeat(self): with self.assertRaises(TypeError): self.check_func(lambda x: x.str.repeat(repeats=[0, 1, 2, 3, 4, 5, 6, 7, 8, 9])) + @unittest.skipIf( + LooseVersion(pd.__version__) >= LooseVersion("2.0.0"), + "TODO(SPARK-43476): Enable SeriesStringTests.test_string_replace for pandas 2.0.0.", + ) def test_string_replace(self): self.check_func(lambda x: x.str.replace("a.", "xx", regex=True)) self.check_func(lambda x: x.str.replace("a.", "xx", regex=False)) @@ -255,11 +259,10 @@ def test_string_replace(self): def repl(m): return m.group(0)[::-1] - regex_pat = re.compile(r"[a-z]+") - self.check_func(lambda x: x.str.replace(regex_pat, repl, regex=True)) + self.check_func(lambda x: x.str.replace(r"[a-z]+", repl)) # compiled regex with flags regex_pat = re.compile(r"WHITESPACE", flags=re.IGNORECASE) - self.check_func(lambda x: x.str.replace(regex_pat, "---", regex=True)) + self.check_func(lambda x: x.str.replace(regex_pat, "---")) def test_string_rfind(self): self.check_func(lambda x: x.str.rfind("a")) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 046139f5952d3..99eb2a48bb21c 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -1873,11 +1873,11 @@ def sample( # type: ignore[misc] Examples -------- >>> df = spark.range(10) - >>> df.sample(0.5, 3).count() + >>> df.sample(0.5, 3).count() # doctest: +SKIP 7 - >>> df.sample(fraction=0.5, seed=3).count() + >>> df.sample(fraction=0.5, seed=3).count() # doctest: +SKIP 7 - >>> df.sample(withReplacement=True, fraction=0.5, seed=3).count() + >>> df.sample(withReplacement=True, fraction=0.5, seed=3).count() # doctest: +SKIP 1 >>> df.sample(1.0).count() 10 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 11387fde37e52..5507fa28bc4e2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -277,13 +277,13 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB e.dataTypeMismatch(e, checkRes) case TypeCheckResult.TypeCheckFailure(message) => e.setTagValue(DATA_TYPE_MISMATCH_ERROR, true) - extraHintForAnsiTypeCoercionExpression(operator) + val extraHint = extraHintForAnsiTypeCoercionExpression(operator) e.failAnalysis( - errorClass = "_LEGACY_ERROR_TEMP_2315", + errorClass = "TYPE_CHECK_FAILURE_WITH_HINT", messageParameters = Map( - "sqlExpr" -> e.sql, + "expr" -> toSQLExpr(e), "msg" -> message, - "hint" -> extraHintForAnsiTypeCoercionExpression(operator))) + "hint" -> extraHint)) case checkRes: TypeCheckResult.InvalidFormat => e.setTagValue(INVALID_FORMAT_ERROR, true) e.invalidFormat(checkRes) @@ -486,8 +486,8 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case CollectMetrics(name, metrics, _) => if (name == null || name.isEmpty) { operator.failAnalysis( - errorClass = "_LEGACY_ERROR_TEMP_2316", - messageParameters = Map("operator" -> operator.toString)) + errorClass = "INVALID_OBSERVED_METRICS.MISSING_NAME", + messageParameters = Map("operator" -> planToString(operator))) } // Check if an expression is a valid metric. A metric must meet the following criteria: // - Is not a window function; @@ -498,11 +498,17 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB def checkMetric(s: Expression, e: Expression, seenAggregate: Boolean = false): Unit = { e match { case _: WindowExpression => - e.failAnalysis("_LEGACY_ERROR_TEMP_2317", Map("sqlExpr" -> s.sql)) + e.failAnalysis( + "INVALID_OBSERVED_METRICS.WINDOW_EXPRESSIONS_UNSUPPORTED", + Map("expr" -> toSQLExpr(s))) case _ if !e.deterministic && !seenAggregate => - e.failAnalysis("_LEGACY_ERROR_TEMP_2318", Map("sqlExpr" -> s.sql)) + e.failAnalysis( + "INVALID_OBSERVED_METRICS.NON_AGGREGATE_FUNC_ARG_IS_NON_DETERMINISTIC", + Map("expr" -> toSQLExpr(s))) case a: AggregateExpression if seenAggregate => - e.failAnalysis("_LEGACY_ERROR_TEMP_2319", Map("sqlExpr" -> s.sql)) + e.failAnalysis( + "INVALID_OBSERVED_METRICS.NESTED_AGGREGATES_UNSUPPORTED", + Map("expr" -> toSQLExpr(s))) case a: AggregateExpression if a.isDistinct => e.failAnalysis("_LEGACY_ERROR_TEMP_2320", Map("sqlExpr" -> s.sql)) case a: AggregateExpression if a.filter.isDefined => diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala index 55005d87cdc03..61cb3e81c98a6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisSuite.scala @@ -770,8 +770,12 @@ class AnalysisSuite extends AnalysisTest with Matchers { // Bad name assert(!CollectMetrics("", sum :: Nil, testRelation).resolved) - assertAnalysisError(CollectMetrics("", sum :: Nil, testRelation), - "observed metrics should be named" :: Nil) + assertAnalysisErrorClass( + CollectMetrics("", sum :: Nil, testRelation), + expectedErrorClass = "INVALID_OBSERVED_METRICS.MISSING_NAME", + expectedMessageParameters = Map( + "operator" -> "'CollectMetrics , [sum(a#x) AS sum#xL]\n+- LocalRelation , [a#x]\n") + ) // No columns assert(!CollectMetrics("evt", Nil, testRelation).resolved) @@ -786,9 +790,11 @@ class AnalysisSuite extends AnalysisTest with Matchers { "Attribute", "can only be used as an argument to an aggregate function") // Unwrapped non-deterministic expression - checkAnalysisError( - Rand(10).as("rnd") :: Nil, - "non-deterministic expression", "can only be used as an argument to an aggregate function") + assertAnalysisErrorClass( + CollectMetrics("event", Rand(10).as("rnd") :: Nil, testRelation), + expectedErrorClass = "INVALID_OBSERVED_METRICS.NON_AGGREGATE_FUNC_ARG_IS_NON_DETERMINISTIC", + expectedMessageParameters = Map("expr" -> "\"rand(10) AS rnd\"") + ) // Distinct aggregate checkAnalysisError( @@ -796,18 +802,30 @@ class AnalysisSuite extends AnalysisTest with Matchers { "distinct aggregates are not allowed in observed metrics, but found") // Nested aggregate - checkAnalysisError( - Sum(Sum(a).toAggregateExpression()).toAggregateExpression().as("sum") :: Nil, - "nested aggregates are not allowed in observed metrics, but found") + assertAnalysisErrorClass( + CollectMetrics( + "event", + Sum(Sum(a).toAggregateExpression()).toAggregateExpression().as("sum") :: Nil, + testRelation), + expectedErrorClass = "INVALID_OBSERVED_METRICS.NESTED_AGGREGATES_UNSUPPORTED", + expectedMessageParameters = Map("expr" -> "\"sum(sum(a)) AS sum\"") + ) // Windowed aggregate val windowExpr = WindowExpression( RowNumber(), WindowSpecDefinition(Nil, a.asc :: Nil, SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow))) - checkAnalysisError( - windowExpr.as("rn") :: Nil, - "window expressions are not allowed in observed metrics, but found") + assertAnalysisErrorClass( + CollectMetrics("event", windowExpr.as("rn") :: Nil, testRelation), + expectedErrorClass = "INVALID_OBSERVED_METRICS.WINDOW_EXPRESSIONS_UNSUPPORTED", + expectedMessageParameters = Map( + "expr" -> + """ + |"row_number() OVER (ORDER BY a ASC NULLS FIRST ROWS + | BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS rn" + |""".stripMargin.replace("\n", "")) + ) } test("check CollectMetrics duplicates") { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala index 6f799bbe7d3f5..bd1b5b557896b 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientSuite.scala @@ -900,7 +900,7 @@ class HiveClientSuite(version: String, allVersions: Seq[String]) test("Decimal support of Avro Hive serde") { val tableName = "tab1" // TODO: add the other logical types. For details, see the link: - // https://avro.apache.org/docs/1.11.1/specification/#logical-types + // https://avro.apache.org/docs/1.11.2/specification/#logical-types val avroSchema = """{ | "name": "test_record",
Avro logical typeAvro typeSpark SQL type