Skip to content

Commit

Permalink
[SPARK-44535][CONNECT][SQL] Move required Streaming API to sql/api
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
This PR moves a bunch streaming classed to the SQL/API project.

### Why are the changes needed?
This is needed to disconnect the Spark Connect Scala Client from catalyst.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Existing Tests.

Closes apache#42140 from hvanhovell/SPARK-44535.

Authored-by: Herman van Hovell <herman@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
  • Loading branch information
hvanhovell committed Jul 26, 2023
1 parent fddd0c2 commit 03d0ecc
Show file tree
Hide file tree
Showing 13 changed files with 63 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import org.apache.spark.sql.execution.streaming.OneTimeTrigger
import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
import org.apache.spark.sql.types.NullType
import org.apache.spark.util.SparkSerDeUtils
import org.apache.spark.util.Utils

/**
* Interface used to write a streaming `Dataset` to external storage systems (e.g. file systems,
Expand Down Expand Up @@ -240,7 +239,7 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging {
*/
@Evolving
def foreachBatch(function: (Dataset[T], Long) => Unit): DataStreamWriter[T] = {
val serializedFn = Utils.serialize(function)
val serializedFn = SparkSerDeUtils.serialize(function)
sinkBuilder.getForeachBatchBuilder.getScalaFunctionBuilder
.setPayload(ByteString.copyFrom(serializedFn))
.setOutputType(DataTypeProtoConverter.toConnectProtoType(NullType)) // Unused.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import org.json4s.JsonDSL.{jobject2assoc, pair2Assoc}
import org.json4s.jackson.JsonMethods.{compact, render}

import org.apache.spark.annotation.Evolving
import org.apache.spark.scheduler.SparkListenerEvent

/**
* Interface for listening to events related to [[StreamingQuery StreamingQueries]].
Expand Down Expand Up @@ -116,7 +115,7 @@ object StreamingQueryListener extends Serializable {
* @since 3.5.0
*/
@Evolving
trait Event extends SparkListenerEvent
trait Event

/**
* Event representing the start of a query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.connect.proto.StreamingQueryManagerCommandResult
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connect.common.{InvalidPlanInput, StreamingListenerPacket}
import org.apache.spark.util.Utils
import org.apache.spark.util.SparkSerDeUtils

/**
* A class to manage all the [[StreamingQuery]] active in a `SparkSession`.
Expand Down Expand Up @@ -155,7 +155,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
cacheListenerById(id, listener)
executeManagerCmd(
_.getAddListenerBuilder
.setListenerPayload(ByteString.copyFrom(Utils
.setListenerPayload(ByteString.copyFrom(SparkSerDeUtils
.serialize(StreamingListenerPacket(id, listener)))))
}

Expand All @@ -168,7 +168,7 @@ class StreamingQueryManager private[sql] (sparkSession: SparkSession) extends Lo
val id = getIdByListener(listener)
executeManagerCmd(
_.getRemoveListenerBuilder
.setListenerPayload(ByteString.copyFrom(Utils
.setListenerPayload(ByteString.copyFrom(SparkSerDeUtils
.serialize(StreamingListenerPacket(id, listener)))))
removeCachedListener(id)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,24 @@ object CheckConnectJvmClientCompatibility {
"org.apache.spark.sql.streaming.PythonStreamingQueryListener"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.sql.streaming.PythonStreamingQueryListenerWrapper"),
ProblemFilters.exclude[MissingTypesProblem](
"org.apache.spark.sql.streaming.StreamingQueryListener$Event"),
ProblemFilters.exclude[MissingTypesProblem](
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryIdleEvent"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.streaming.StreamingQueryListener#QueryIdleEvent.logEvent"),
ProblemFilters.exclude[MissingTypesProblem](
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryProgressEvent"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.streaming.StreamingQueryListener#QueryProgressEvent.logEvent"),
ProblemFilters.exclude[MissingTypesProblem](
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryStartedEvent"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.streaming.StreamingQueryListener#QueryStartedEvent.logEvent"),
ProblemFilters.exclude[MissingTypesProblem](
"org.apache.spark.sql.streaming.StreamingQueryListener$QueryTerminatedEvent"),
ProblemFilters.exclude[DirectMissingMethodProblem](
"org.apache.spark.sql.streaming.StreamingQueryListener#QueryTerminatedEvent.logEvent"),

// SQLImplicits
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"),
Expand Down
4 changes: 2 additions & 2 deletions dev/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,9 @@
<suppress checks="MethodName"
files="src/main/java/org/apache/hive/service/auth/PasswdAuthenticationProvider.java"/>
<suppress checks="MethodName"
files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/OutputMode.java"/>
files="sql/api/src/main/java/org/apache/spark/sql/streaming/OutputMode.java"/>
<suppress checks="MethodName"
files="sql/catalyst/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/>
files="sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/>
<suppress checks="MethodName"
files="sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/>
<suppress checks="MethodName"
Expand Down
5 changes: 4 additions & 1 deletion project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ object MimaExcludes {
// [SPARK-43952][CORE][CONNECT][SQL] Add SparkContext APIs for query cancellation by tag
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.JobData.this"),
// [SPARK-44205][SQL] Extract Catalyst Code from DecimalType
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.types.DecimalType.unapply")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.types.DecimalType.unapply"),
// [SPARK-44535][CONNECT][SQL] Move required Streaming API to sql/api
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.GroupStateTimeout"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.OutputMode")
)

// Defulat exclude rules
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.StructType

/**
* A row implementation that uses an array of objects as the underlying storage. Note that, while
Expand All @@ -37,3 +38,12 @@ class GenericRow(protected[sql] val values: Array[Any]) extends Row {

override def copy(): GenericRow = this
}

class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
extends GenericRow(values) {

/** No-arg constructor for serialization. */
protected def this() = this(null, null)

override def fieldIndex(name: String): Int = schema.fieldIndex(name)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.streaming.GroupStateTimeout

/** Types of timeouts used in FlatMapGroupsWithState */
case object NoTimeout extends GroupStateTimeout
case object ProcessingTimeTimeout extends GroupStateTimeout
case object EventTimeTimeout extends GroupStateTimeout
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,6 @@ trait BaseGenericInternalRow extends InternalRow {
}
}

class GenericRowWithSchema(values: Array[Any], override val schema: StructType)
extends GenericRow(values) {

/** No-arg constructor for serialization. */
protected def this() = this(null, null)

override def fieldIndex(name: String): Int = schema.fieldIndex(name)
}

/**
* An internal row implementation that uses an array of objects as the underlying storage.
* Note that, while the array is not copied, and thus could technically be mutated after creation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -451,11 +451,6 @@ case class MapGroups(
/** Internal class representing State */
trait LogicalGroupState[S]

/** Types of timeouts used in FlatMapGroupsWithState */
case object NoTimeout extends GroupStateTimeout
case object ProcessingTimeTimeout extends GroupStateTimeout
case object EventTimeTimeout extends GroupStateTimeout

/** Factory for constructing new `MapGroupsWithState` nodes. */
object FlatMapGroupsWithState {
def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder](
Expand Down

0 comments on commit 03d0ecc

Please sign in to comment.