From 63d434d7d63351541905614f228d0c90f550df84 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 1 Nov 2023 02:49:30 +0800 Subject: [PATCH 1/6] init --- .../apache/spark/util/ArrayImplicits.scala | 36 +++++++++++++++++++ .../org/apache/spark/sql/SparkSession.scala | 4 +-- .../client/GrpcExceptionConverter.scala | 4 +-- .../connect/planner/SparkConnectPlanner.scala | 27 +++++++------- .../spark/sql/connect/utils/ErrorUtils.scala | 32 ++++++++--------- .../python/GaussianMixtureModelWrapper.scala | 4 +-- .../mllib/api/python/LDAModelWrapper.scala | 8 ++--- 7 files changed, 76 insertions(+), 39 deletions(-) create mode 100644 common/utils/src/main/scala/org/apache/spark/util/ArrayImplicits.scala diff --git a/common/utils/src/main/scala/org/apache/spark/util/ArrayImplicits.scala b/common/utils/src/main/scala/org/apache/spark/util/ArrayImplicits.scala new file mode 100644 index 0000000000000..0b4572f89f68b --- /dev/null +++ b/common/utils/src/main/scala/org/apache/spark/util/ArrayImplicits.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.collection.immutable + +/** + * Implicit methods related to Scala Array. + */ +private[spark] object ArrayImplicits { + + implicit class SparkArrayOps[T](xs: Array[T]) { + + /** + * Wraps an Array[T] as an immutable.ArraySeq[T]. + */ + def toImmutableArraySeq: immutable.ArraySeq[T] = + if (xs eq null) null + else immutable.ArraySeq.unsafeWrapArray(xs) + } +} diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 969ac017ecb1d..0293241e7a516 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -21,7 +21,6 @@ import java.net.URI import java.util.concurrent.TimeUnit._ import java.util.concurrent.atomic.{AtomicLong, AtomicReference} -import scala.collection.immutable import scala.jdk.CollectionConverters._ import scala.reflect.runtime.universe.TypeTag @@ -45,6 +44,7 @@ import org.apache.spark.sql.internal.{CatalogImpl, SqlApiConf} import org.apache.spark.sql.streaming.DataStreamReader import org.apache.spark.sql.streaming.StreamingQueryManager import org.apache.spark.sql.types.StructType +import org.apache.spark.util.ArrayImplicits._ /** * The entry point to programming Spark with the Dataset and DataFrame API. @@ -248,7 +248,7 @@ class SparkSession private[sql] ( proto.SqlCommand .newBuilder() .setSql(sqlText) - .addAllPosArguments(immutable.ArraySeq.unsafeWrapArray(args.map(lit(_).expr)).asJava))) + .addAllPosArguments(args.map(lit(_).expr).toImmutableArraySeq.asJava))) val plan = proto.Plan.newBuilder().setCommand(cmd) // .toBuffer forces that the iterator is consumed and closed val responseSeq = client.execute(plan.build()).toBuffer.toSeq diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala index b2782442f4a53..b370f38e870d1 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.connect.client import java.time.DateTimeException -import scala.collection.immutable import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag @@ -37,6 +36,7 @@ import org.apache.spark.sql.catalyst.analysis.{NamespaceAlreadyExistsException, import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.trees.Origin import org.apache.spark.sql.streaming.StreamingQueryException +import org.apache.spark.util.ArrayImplicits._ /** * GrpcExceptionConverter handles the conversion of StatusRuntimeExceptions into Spark exceptions. @@ -372,7 +372,7 @@ private[client] object GrpcExceptionConverter { FetchErrorDetailsResponse.Error .newBuilder() .setMessage(message) - .addAllErrorTypeHierarchy(immutable.ArraySeq.unsafeWrapArray(classes).asJava) + .addAllErrorTypeHierarchy(classes.toImmutableArraySeq.asJava) .build())) } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index ec57909ad144e..018e293795e9d 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.connect.planner -import scala.collection.immutable import scala.collection.mutable import scala.jdk.CollectionConverters._ import scala.util.Try @@ -80,6 +79,7 @@ import org.apache.spark.sql.streaming.{GroupStateTimeout, OutputMode, StreamingQ import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.CacheId +import org.apache.spark.util.ArrayImplicits._ import org.apache.spark.util.Utils final case class InvalidCommandInput( @@ -3184,9 +3184,9 @@ class SparkConnectPlanner( case StreamingQueryManagerCommand.CommandCase.ACTIVE => val active_queries = session.streams.active respBuilder.getActiveBuilder.addAllActiveQueries( - immutable.ArraySeq - .unsafeWrapArray(active_queries - .map(query => buildStreamingQueryInstance(query))) + active_queries + .map(query => buildStreamingQueryInstance(query)) + .toImmutableArraySeq .asJava) case StreamingQueryManagerCommand.CommandCase.GET_QUERY => @@ -3265,15 +3265,16 @@ class SparkConnectPlanner( .setGetResourcesCommandResult( proto.GetResourcesCommandResult .newBuilder() - .putAllResources(session.sparkContext.resources.view - .mapValues(resource => - proto.ResourceInformation - .newBuilder() - .setName(resource.name) - .addAllAddresses(immutable.ArraySeq.unsafeWrapArray(resource.addresses).asJava) - .build()) - .toMap - .asJava) + .putAllResources( + session.sparkContext.resources.view + .mapValues(resource => + proto.ResourceInformation + .newBuilder() + .setName(resource.name) + .addAllAddresses(resource.addresses.toImmutableArraySeq.asJava) + .build()) + .toMap + .asJava) .build()) .build()) } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala index 741fa97f17878..d3b240987ae74 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql.connect.utils import java.util.UUID import scala.annotation.tailrec -import scala.collection.immutable import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ @@ -43,6 +42,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.connect.config.Connect import org.apache.spark.sql.connect.service.{ExecuteEventsManager, SessionHolder, SparkConnectService} import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.ArrayImplicits._ private[connect] object ErrorUtils extends Logging { @@ -91,21 +91,21 @@ private[connect] object ErrorUtils extends Logging { if (serverStackTraceEnabled) { builder.addAllStackTrace( - immutable.ArraySeq - .unsafeWrapArray(currentError.getStackTrace - .map { stackTraceElement => - val stackTraceBuilder = FetchErrorDetailsResponse.StackTraceElement - .newBuilder() - .setDeclaringClass(stackTraceElement.getClassName) - .setMethodName(stackTraceElement.getMethodName) - .setLineNumber(stackTraceElement.getLineNumber) - - if (stackTraceElement.getFileName != null) { - stackTraceBuilder.setFileName(stackTraceElement.getFileName) - } - - stackTraceBuilder.build() - }) + currentError.getStackTrace + .map { stackTraceElement => + val stackTraceBuilder = FetchErrorDetailsResponse.StackTraceElement + .newBuilder() + .setDeclaringClass(stackTraceElement.getClassName) + .setMethodName(stackTraceElement.getMethodName) + .setLineNumber(stackTraceElement.getLineNumber) + + if (stackTraceElement.getFileName != null) { + stackTraceBuilder.setFileName(stackTraceElement.getFileName) + } + + stackTraceBuilder.build() + } + .toImmutableArraySeq .asJava) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala index 1eed97a8d4f65..2f3f396730be2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/GaussianMixtureModelWrapper.scala @@ -17,12 +17,12 @@ package org.apache.spark.mllib.api.python -import scala.collection.immutable import scala.jdk.CollectionConverters._ import org.apache.spark.SparkContext import org.apache.spark.mllib.clustering.GaussianMixtureModel import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.util.ArrayImplicits._ /** * Wrapper around GaussianMixtureModel to provide helper methods in Python @@ -38,7 +38,7 @@ private[python] class GaussianMixtureModelWrapper(model: GaussianMixtureModel) { val modelGaussians = model.gaussians.map { gaussian => Array[Any](gaussian.mu, gaussian.sigma) } - SerDe.dumps(immutable.ArraySeq.unsafeWrapArray(modelGaussians).asJava) + SerDe.dumps(modelGaussians.toImmutableArraySeq.asJava) } def predictSoft(point: Vector): Vector = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala index b919b0a8c3f2e..6a6c6cf6bcfb3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/LDAModelWrapper.scala @@ -16,12 +16,12 @@ */ package org.apache.spark.mllib.api.python -import scala.collection.immutable import scala.jdk.CollectionConverters._ import org.apache.spark.SparkContext import org.apache.spark.mllib.clustering.LDAModel import org.apache.spark.mllib.linalg.Matrix +import org.apache.spark.util.ArrayImplicits._ /** * Wrapper around LDAModel to provide helper methods in Python @@ -36,11 +36,11 @@ private[python] class LDAModelWrapper(model: LDAModel) { def describeTopics(maxTermsPerTopic: Int): Array[Byte] = { val topics = model.describeTopics(maxTermsPerTopic).map { case (terms, termWeights) => - val jTerms = immutable.ArraySeq.unsafeWrapArray(terms).asJava - val jTermWeights = immutable.ArraySeq.unsafeWrapArray(termWeights).asJava + val jTerms = terms.toImmutableArraySeq.asJava + val jTermWeights = termWeights.toImmutableArraySeq.asJava Array[Any](jTerms, jTermWeights) } - SerDe.dumps(immutable.ArraySeq.unsafeWrapArray(topics).asJava) + SerDe.dumps(topics.toImmutableArraySeq.asJava) } def save(sc: SparkContext, path: String): Unit = model.save(sc, path) From 2ffcc196f34491b6a1783a1bfae0acb62424d39b Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 1 Nov 2023 03:04:47 +0800 Subject: [PATCH 2/6] comments --- .../src/main/scala/org/apache/spark/util/ArrayImplicits.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/utils/src/main/scala/org/apache/spark/util/ArrayImplicits.scala b/common/utils/src/main/scala/org/apache/spark/util/ArrayImplicits.scala index 0b4572f89f68b..08997a800c957 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/ArrayImplicits.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/ArrayImplicits.scala @@ -27,7 +27,7 @@ private[spark] object ArrayImplicits { implicit class SparkArrayOps[T](xs: Array[T]) { /** - * Wraps an Array[T] as an immutable.ArraySeq[T]. + * Wraps an Array[T] as an immutable.ArraySeq[T] without copying. */ def toImmutableArraySeq: immutable.ArraySeq[T] = if (xs eq null) null From 891be89f3f839cd8ed7c13d5fdc55f8e3ccd7b7f Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 1 Nov 2023 15:02:40 +0800 Subject: [PATCH 3/6] add tests --- .../spark/util/ArrayImplicitsSuite.scala | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) create mode 100644 common/utils/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala diff --git a/common/utils/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala new file mode 100644 index 0000000000000..1dad68827265b --- /dev/null +++ b/common/utils/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import scala.collection.immutable + +import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite + +import org.apache.spark.util.ArrayImplicits._ + +class ArrayImplicitsSuite extends AnyFunSuite { // scalastyle:ignore funsuite + + test("Normal Int Array") { + val data = Array(1, 2, 3) + val arraySeq = data.toImmutableArraySeq + assert(arraySeq.getClass === classOf[immutable.ArraySeq.ofInt]) + assert(arraySeq.length === 3) + assert(arraySeq.unsafeArray.sameElements(data)) + } + + test("Normal TestClass Array") { + val data = Array(TestClass(1), TestClass(2), TestClass(3)) + val arraySeq = data.toImmutableArraySeq + assert(arraySeq.getClass === classOf[immutable.ArraySeq.ofRef[TestClass]]) + assert(arraySeq.length === 3) + assert(arraySeq.unsafeArray.sameElements(data)) + } + + test("Null Array") { + val data: Array[Int] = null + val arraySeq = data.toImmutableArraySeq + assert(arraySeq == null) + } + + case class TestClass(i: Int) +} From c999433e894ca7f83822e1948156ec8aa923ebcd Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 1 Nov 2023 15:03:58 +0800 Subject: [PATCH 4/6] test name --- .../scala/org/apache/spark/util/ArrayImplicitsSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/utils/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala b/common/utils/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala index 1dad68827265b..a887bd5ca9f23 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala +++ b/common/utils/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.util.ArrayImplicits._ class ArrayImplicitsSuite extends AnyFunSuite { // scalastyle:ignore funsuite - test("Normal Int Array") { + test("Int Array") { val data = Array(1, 2, 3) val arraySeq = data.toImmutableArraySeq assert(arraySeq.getClass === classOf[immutable.ArraySeq.ofInt]) @@ -33,7 +33,7 @@ class ArrayImplicitsSuite extends AnyFunSuite { // scalastyle:ignore funsuite assert(arraySeq.unsafeArray.sameElements(data)) } - test("Normal TestClass Array") { + test("TestClass Array") { val data = Array(TestClass(1), TestClass(2), TestClass(3)) val arraySeq = data.toImmutableArraySeq assert(arraySeq.getClass === classOf[immutable.ArraySeq.ofRef[TestClass]]) From 30bbe73bae28657b1ca69d928d73edd85053a40a Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 1 Nov 2023 15:23:15 +0800 Subject: [PATCH 5/6] move to core --- .../scala/org/apache/spark/util/ArrayImplicitsSuite.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) rename {common/utils => core}/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala (91%) diff --git a/common/utils/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala b/core/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala similarity index 91% rename from common/utils/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala rename to core/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala index a887bd5ca9f23..2c9b78c42ba61 100644 --- a/common/utils/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala @@ -19,11 +19,10 @@ package org.apache.spark.util import scala.collection.immutable -import org.scalatest.funsuite.AnyFunSuite // scalastyle:ignore funsuite - +import org.apache.spark.SparkFunSuite import org.apache.spark.util.ArrayImplicits._ -class ArrayImplicitsSuite extends AnyFunSuite { // scalastyle:ignore funsuite +class ArrayImplicitsSuite extends SparkFunSuite { test("Int Array") { val data = Array(1, 2, 3) From 0ee7fea6025565fdfe99dd3ed230a3482b5774e2 Mon Sep 17 00:00:00 2001 From: yangjie01 Date: Wed, 1 Nov 2023 15:35:11 +0800 Subject: [PATCH 6/6] fomrat --- .../test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala b/core/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala index 2c9b78c42ba61..135af550c4b39 100644 --- a/core/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ArrayImplicitsSuite.scala @@ -10,7 +10,7 @@ * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */