Skip to content

Commit

Permalink
Make delta-sharing-client compatible with Spark 4.0 (#582)
Browse files Browse the repository at this point in the history
  • Loading branch information
charlenelyu-db authored Oct 2, 2024
1 parent dd42330 commit faf6912
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 6 deletions.
25 changes: 20 additions & 5 deletions client/src/main/scala/io/delta/sharing/spark/RemoteDeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,34 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.SparkException
import org.apache.spark.delta.sharing.{CachedTableManager, TableRefreshResult}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Column, Encoder, SparkSession}
import org.apache.spark.sql.{DeltaSharingScanUtils, Encoder, SparkSession}
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Cast, Expression, Literal, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.{
And,
Attribute,
Cast,
Expression,
Literal,
SubqueryExpression
}
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation}
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types.{DataType, StructField, StructType}

import io.delta.sharing.client.{DeltaSharingClient, DeltaSharingRestClient}
import io.delta.sharing.client.model.{AddFile, CDFColumnInfo, DeltaTableMetadata, Metadata, Protocol, Table => DeltaSharingTable}
import io.delta.sharing.client.model.{
AddFile,
CDFColumnInfo,
DeltaTableMetadata,
Metadata,
Protocol,
Table => DeltaSharingTable
}
import io.delta.sharing.client.util.ConfUtils
import io.delta.sharing.spark.perf.DeltaSharingLimitPushDown


/**
* Used to query the current state of the transaction logs of a remote shared Delta table.
*
Expand Down Expand Up @@ -298,7 +311,9 @@ class RemoteSnapshot(
tableFiles.files.toDS()
}

val columnFilter = new Column(rewrittenFilters.reduceLeftOption(And).getOrElse(Literal(true)))
val columnFilter = DeltaSharingScanUtils.toColumn(
rewrittenFilters.reduceLeftOption(And).getOrElse(Literal(true))
)
remoteFiles.filter(columnFilter).as[AddFile].collect()
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.spark.sql

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.StructType

Expand All @@ -27,9 +28,15 @@ object DeltaSharingScanUtils {
Dataset.ofRows(spark, plan)
}

// A wraper to expose sqlContext.internalCreateDataFrame
// A wrapper to expose sqlContext.internalCreateDataFrame
def internalCreateDataFrame(spark: SparkSession, schema: StructType): DataFrame = {
spark.sqlContext.internalCreateDataFrame(
spark.sparkContext.emptyRDD[InternalRow], schema, isStreaming = true)
}

// A wrapper to expose Column.apply(expr: Expression) function.
// This is needed because the Column object is in private[sql] scope.
def toColumn(expr: Expression): Column = {
Column(expr)
}
}

0 comments on commit faf6912

Please sign in to comment.