Skip to content

Commit

Permalink
get ColumnWithDefaultExprUtils tp x-compile
Browse files Browse the repository at this point in the history
  • Loading branch information
scottsand-db committed Mar 22, 2024
1 parent 1727d98 commit 35a73df
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.expressions.EqualNullSafe
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns._
import org.apache.spark.sql.execution.QueryExecution
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, StreamExecution}
import org.apache.spark.sql.execution.streaming.{IncrementalExecution, IncrementalExecutionShim, StreamExecution}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{MetadataBuilder, StructField, StructType}

Expand Down Expand Up @@ -210,18 +210,10 @@ object ColumnWithDefaultExprUtils extends DeltaLogging {
df: DataFrame,
cols: Column*): DataFrame = {
val newMicroBatch = df.select(cols: _*)
val newIncrementalExecution = new IncrementalExecution(
val newIncrementalExecution = IncrementalExecutionShim.newInstance(
newMicroBatch.sparkSession,
newMicroBatch.queryExecution.logical,
incrementalExecution.outputMode,
incrementalExecution.checkpointLocation,
incrementalExecution.queryId,
incrementalExecution.runId,
incrementalExecution.currentBatchId,
incrementalExecution.prevOffsetSeqMetadata,
incrementalExecution.offsetSeqMetadata,
incrementalExecution.watermarkPropagator
)
incrementalExecution)
newIncrementalExecution.executedPlan // Force the lazy generation of execution plan


Expand Down
38 changes: 38 additions & 0 deletions spark/src/shims/spark-3.5/IncrementalExecutionShim.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

object IncrementalExecutionShim {
def newInstance(
sparkSession: SparkSession,
logicalPlan: LogicalPlan,
incrementalExecution: IncrementalExecution): IncrementalExecution = new IncrementalExecution(
sparkSession,
logicalPlan,
incrementalExecution.outputMode,
incrementalExecution.checkpointLocation,
incrementalExecution.queryId,
incrementalExecution.runId,
incrementalExecution.currentBatchId,
incrementalExecution.prevOffsetSeqMetadata,
incrementalExecution.offsetSeqMetadata,
incrementalExecution.watermarkPropagator
)
}
40 changes: 40 additions & 0 deletions spark/src/shims/spark-4.0/IncrementalExecutionShim.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright (2024) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan

object IncrementalExecutionShim {
def newInstance(
sparkSession: SparkSession,
logicalPlan: LogicalPlan,
incrementalExecution: IncrementalExecution): IncrementalExecution = new IncrementalExecution(
sparkSession,
logicalPlan,
incrementalExecution.outputMode,
incrementalExecution.checkpointLocation,
incrementalExecution.queryId,
incrementalExecution.runId,
incrementalExecution.currentBatchId,
incrementalExecution.prevOffsetSeqMetadata,
incrementalExecution.offsetSeqMetadata,
incrementalExecution.watermarkPropagator,
incrementalExecution.isFirstBatch // Spark 4.0 API
)

}

0 comments on commit 35a73df

Please sign in to comment.