Skip to content

Commit

Permalink
Merge pull request #1517 from apache/master
Browse files Browse the repository at this point in the history
Create a new pull request by comparing changes across two branches
  • Loading branch information
GulajavaMinistudio authored Jul 5, 2023
2 parents 1166ae6 + 4b77aad commit cfaa36e
Show file tree
Hide file tree
Showing 27 changed files with 269 additions and 115 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_java21.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ jobs:
jobs: >-
{
"build": "true",
"pyspark": "true",
"pyspark": "false",
"sparkr": "true",
"tpcds-1g": "true",
"docker-integration-tests": "true"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ public MergedBlockMeta getMergedBlockMeta(
int size = (int) indexFile.length();
// First entry is the zero offset
int numChunks = (size / Long.BYTES) - 1;
if (numChunks <= 0) {
throw new RuntimeException(String.format(
"Merged shuffle index file %s is empty", indexFile.getPath()));
}
File metaFile = appShuffleInfo.getMergedShuffleMetaFile(shuffleId, shuffleMergeId, reduceId);
if (!metaFile.exists()) {
throw new RuntimeException(String.format("Merged shuffle meta file %s not found",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,20 +281,24 @@ public void testDuplicateBlocksAreIgnoredWhenPrevStreamIsInProgress() throws IOE
verifyMetrics(4, 0, 0, 0, 0, 0, 4);
}

@Test
@Test(expected = RuntimeException.class)
public void testFailureAfterData() throws IOException {
StreamCallbackWithID stream =
pushResolver.receiveBlockDataAsStream(
new PushBlockStream(TEST_APP, NO_ATTEMPT_ID, 0, 0, 0, 0, 0));
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
assertEquals("num-chunks", 0, blockMeta.getNumChunks());
verifyMetrics(4, 0, 0, 0, 0, 0, 4);
try {
pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
} catch (RuntimeException e) {
assertTrue(e.getMessage().contains("is empty"));
verifyMetrics(4, 0, 0, 0, 0, 0, 4);
throw e;
}
}

@Test
@Test(expected = RuntimeException.class)
public void testFailureAfterMultipleDataBlocks() throws IOException {
StreamCallbackWithID stream =
pushResolver.receiveBlockDataAsStream(
Expand All @@ -304,9 +308,13 @@ public void testFailureAfterMultipleDataBlocks() throws IOException {
stream.onData(stream.getID(), ByteBuffer.wrap(new byte[4]));
stream.onFailure(stream.getID(), new RuntimeException("Forced Failure"));
pushResolver.finalizeShuffleMerge(new FinalizeShuffleMerge(TEST_APP, NO_ATTEMPT_ID, 0, 0));
MergedBlockMeta blockMeta = pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
assertEquals("num-chunks", 0, blockMeta.getNumChunks());
verifyMetrics(9, 0, 0, 0, 0, 0, 9);
try {
pushResolver.getMergedBlockMeta(TEST_APP, 0, 0, 0);
} catch (RuntimeException e) {
assertTrue(e.getMessage().contains("is empty"));
verifyMetrics(9, 0, 0, 0, 0, 0, 9);
throw e;
}
}

@Test
Expand Down
57 changes: 32 additions & 25 deletions common/utils/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -558,6 +558,11 @@
"The data type of the column (<columnIndex>) do not have the same type: <leftType> (<leftParamIndex>) <> <rightType> (<rightParamIndex>)."
]
},
"TYPE_CHECK_FAILURE_WITH_HINT" : {
"message" : [
"<msg><hint>."
]
},
"UNEXPECTED_CLASS_TYPE" : {
"message" : [
"class <className> not found."
Expand Down Expand Up @@ -1331,6 +1336,33 @@
"Numeric literal <rawStrippedQualifier> is outside the valid range for <typeName> with minimum value of <minValue> and maximum value of <maxValue>. Please adjust the value accordingly."
]
},
"INVALID_OBSERVED_METRICS" : {
"message" : [
"Invalid observed metrics."
],
"subClass" : {
"MISSING_NAME" : {
"message" : [
"The observed metrics should be named: <operator>."
]
},
"NESTED_AGGREGATES_UNSUPPORTED" : {
"message" : [
"Nested aggregates are not allowed in observed metrics, but found: <expr>."
]
},
"NON_AGGREGATE_FUNC_ARG_IS_NON_DETERMINISTIC" : {
"message" : [
"Non-deterministic expression <expr> can only be used as an argument to an aggregate function."
]
},
"WINDOW_EXPRESSIONS_UNSUPPORTED" : {
"message" : [
"Window expressions are not allowed in observed metrics, but found: <expr>."
]
}
}
},
"INVALID_OPTIONS" : {
"message" : [
"Invalid options:"
Expand Down Expand Up @@ -5614,31 +5646,6 @@
"The input <valueType> '<input>' does not match the given number format: '<format>'."
]
},
"_LEGACY_ERROR_TEMP_2315" : {
"message" : [
"cannot resolve '<sqlExpr>' due to data type mismatch: <msg><hint>."
]
},
"_LEGACY_ERROR_TEMP_2316" : {
"message" : [
"observed metrics should be named: <operator>."
]
},
"_LEGACY_ERROR_TEMP_2317" : {
"message" : [
"window expressions are not allowed in observed metrics, but found: <sqlExpr>."
]
},
"_LEGACY_ERROR_TEMP_2318" : {
"message" : [
"non-deterministic expression <sqlExpr> can only be used as an argument to an aggregate function."
]
},
"_LEGACY_ERROR_TEMP_2319" : {
"message" : [
"nested aggregates are not allowed in observed metrics, but found: <sqlExpr>."
]
},
"_LEGACY_ERROR_TEMP_2320" : {
"message" : [
"distinct aggregates are not allowed in observed metrics, but found: <sqlExpr>."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,14 @@ private[sql] class AvroOptions(

/**
* Top level record name in write result, which is required in Avro spec.
* See https://avro.apache.org/docs/1.11.1/specification/#schema-record .
* See https://avro.apache.org/docs/1.11.2/specification/#schema-record .
* Default value is "topLevelRecord"
*/
val recordName: String = parameters.getOrElse(RECORD_NAME, "topLevelRecord")

/**
* Record namespace in write result. Default value is "".
* See Avro spec for details: https://avro.apache.org/docs/1.11.1/specification/#schema-record .
* See Avro spec for details: https://avro.apache.org/docs/1.11.2/specification/#schema-record .
*/
val recordNamespace: String = parameters.getOrElse(RECORD_NAMESPACE, "")

Expand Down
File renamed without changes.
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
package org.apache.spark.sql.application

import java.io.{PipedInputStream, PipedOutputStream}
import java.nio.file.Paths
import java.util.concurrent.{Executors, Semaphore, TimeUnit}

import scala.util.Properties

import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.sql.connect.client.util.RemoteSparkSession
import org.apache.spark.sql.connect.client.util.{IntegrationTestUtils, RemoteSparkSession}

class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach {

Expand All @@ -35,33 +39,41 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach {
private var ammoniteIn: PipedInputStream = _
private val semaphore: Semaphore = new Semaphore(0)

private val scalaVersion = Properties.versionNumberString
.split("\\.")
.take(2)
.mkString(".")

private def getCleanString(out: ByteArrayOutputStream): String = {
// Remove ANSI colour codes
// Regex taken from https://stackoverflow.com/a/25189932
out.toString("UTF-8").replaceAll("\u001B\\[[\\d;]*[^\\d;]", "")
}

override def beforeAll(): Unit = {
super.beforeAll()
ammoniteOut = new ByteArrayOutputStream()
testSuiteOut = new PipedOutputStream()
// Connect the `testSuiteOut` and `ammoniteIn` pipes
ammoniteIn = new PipedInputStream(testSuiteOut)
errorStream = new ByteArrayOutputStream()

val args = Array("--port", serverPort.toString)
val task = new Runnable {
override def run(): Unit = {
ConnectRepl.doMain(
args = args,
semaphore = Some(semaphore),
inputStream = ammoniteIn,
outputStream = ammoniteOut,
errorStream = errorStream)
// TODO(SPARK-44121) Remove this check condition
if (SystemUtils.isJavaVersionAtMost(JavaVersion.JAVA_17)) {
super.beforeAll()
ammoniteOut = new ByteArrayOutputStream()
testSuiteOut = new PipedOutputStream()
// Connect the `testSuiteOut` and `ammoniteIn` pipes
ammoniteIn = new PipedInputStream(testSuiteOut)
errorStream = new ByteArrayOutputStream()

val args = Array("--port", serverPort.toString)
val task = new Runnable {
override def run(): Unit = {
ConnectRepl.doMain(
args = args,
semaphore = Some(semaphore),
inputStream = ammoniteIn,
outputStream = ammoniteOut,
errorStream = errorStream)
}
}
}

executorService.submit(task)
executorService.submit(task)
}
}

override def afterAll(): Unit = {
Expand Down Expand Up @@ -96,7 +108,10 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach {

def assertContains(message: String, output: String): Unit = {
val isContain = output.contains(message)
assert(isContain, "Ammonite output did not contain '" + message + "':\n" + output)
assert(
isContain,
"Ammonite output did not contain '" + message + "':\n" + output +
s"\nError Output: ${getCleanString(errorStream)}")
}

test("Simple query") {
Expand Down Expand Up @@ -151,4 +166,33 @@ class ReplE2ESuite extends RemoteSparkSession with BeforeAndAfterEach {
assertContains("Array[java.lang.Long] = Array(0L, 2L, 4L, 6L, 8L)", output)
}

test("Client-side JAR") {
// scalastyle:off classforname line.size.limit
val sparkHome = IntegrationTestUtils.sparkHome
val testJar = Paths
.get(
s"$sparkHome/connector/connect/client/jvm/src/test/resources/TestHelloV2_$scalaVersion.jar")
.toFile

assert(testJar.exists(), "Missing TestHelloV2 jar!")
val input = s"""
|import java.nio.file.Paths
|def classLoadingTest(x: Int): Int = {
| val classloader =
| Option(Thread.currentThread().getContextClassLoader).getOrElse(getClass.getClassLoader)
| val cls = Class.forName("com.example.Hello$$", true, classloader)
| val module = cls.getField("MODULE$$").get(null)
| cls.getMethod("test").invoke(module).asInstanceOf[Int]
|}
|val classLoaderUdf = udf(classLoadingTest _)
|
|val jarPath = Paths.get("${testJar.toString}").toUri
|spark.addArtifact(jarPath)
|
|spark.range(5).select(classLoaderUdf(col("id"))).as[Int].collect()
""".stripMargin
val output = runCommandsInShell(input)
assertContains("Array[Int] = Array(2, 2, 2, 2, 2)", output)
// scalastyle:on classforname line.size.limit
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging
Files.move(serverLocalStagingPath, target)
if (remoteRelativePath.startsWith(s"jars${File.separator}")) {
jarsList.add(target)
jarsURI.add(artifactURI + "/" + target.toString)
jarsURI.add(artifactURI + "/" + remoteRelativePath.toString)
} else if (remoteRelativePath.startsWith(s"pyfiles${File.separator}")) {
sessionHolder.session.sparkContext.addFile(target.toString)
val stringRemotePath = remoteRelativePath.toString
Expand Down Expand Up @@ -188,7 +188,7 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging
blockManager.removeCache(sessionHolder.userId, sessionHolder.sessionId)

// Clean up artifacts folder
FileUtils.deleteDirectory(artifactRootPath.toFile)
FileUtils.deleteDirectory(artifactPath.toFile)
}

private[connect] def uploadArtifactToFs(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,24 @@ class ArtifactManagerSuite extends SharedSparkSession with ResourceHelper {
assert(result.forall(_.getString(0).contains("Ahri")))
}
}

test("SPARK-44300: Cleaning up resources only deletes session-specific resources") {
val copyDir = Utils.createTempDir().toPath
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
val stagingPath = copyDir.resolve("Hello.class")
val remotePath = Paths.get("classes/Hello.class")

val sessionHolder = SparkConnectService.getOrCreateIsolatedSession("c1", "session")
sessionHolder.addArtifact(remotePath, stagingPath, None)

val sessionDirectory =
SparkConnectArtifactManager.getArtifactDirectoryAndUriForSession(sessionHolder)._1.toFile
assert(sessionDirectory.exists())

sessionHolder.artifactManager.cleanUpResources()
assert(!sessionDirectory.exists())
assert(SparkConnectArtifactManager.artifactRootPath.toFile.exists())
}
}

class ArtifactUriSuite extends SparkFunSuite with LocalSparkContext {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ function totalDurationColor(totalGCTime, totalDuration) {
}

var sumOptionalColumns = [3, 4];
var execOptionalColumns = [5, 6, 7, 8, 9, 10, 13, 14, 25];
var execOptionalColumns = [5, 6, 7, 8, 9, 10, 13, 14, 26];
var execDataTable;
var sumDataTable;

Expand Down Expand Up @@ -572,15 +572,7 @@ $(document).ready(function () {
],
"order": [[0, "asc"]],
"columnDefs": [
{"visible": false, "targets": 5},
{"visible": false, "targets": 6},
{"visible": false, "targets": 7},
{"visible": false, "targets": 8},
{"visible": false, "targets": 9},
{"visible": false, "targets": 10},
{"visible": false, "targets": 13},
{"visible": false, "targets": 14},
{"visible": false, "targets": 26}
{"visible": false, "targets": execOptionalColumns}
],
"deferRender": true
};
Expand Down Expand Up @@ -712,10 +704,8 @@ $(document).ready(function () {
"searching": false,
"info": false,
"columnDefs": [
{"visible": false, "targets": 3},
{"visible": false, "targets": 4}
{"visible": false, "targets": sumOptionalColumns}
]

};

sumDataTable = $(sumSelector).DataTable(sumConf);
Expand Down
Binary file added core/src/test/resources/TestHelloV2_2.12.jar
Binary file not shown.
Binary file added core/src/test/resources/TestHelloV2_2.13.jar
Binary file not shown.
File renamed without changes.
Binary file added core/src/test/resources/TestHelloV3_2.13.jar
Binary file not shown.
Loading

0 comments on commit cfaa36e

Please sign in to comment.