diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala index fe61fe3db8786..8ec711b2757f5 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/v2/avro/AvroTable.scala @@ -37,7 +37,7 @@ case class AvroTable( fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { override def newScanBuilder(options: CaseInsensitiveStringMap): AvroScanBuilder = - new AvroScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + AvroScanBuilder(sparkSession, fileIndex, schema, dataSchema, mergedOptions(options)) override def inferSchema(files: Seq[FileStatus]): Option[StructType] = AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala index f56f9436d9437..4eee731e0b2d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileTable.scala @@ -146,6 +146,19 @@ abstract class FileTable( val entry = options.get(DataSource.GLOB_PATHS_KEY) Option(entry).map(_ == "true").getOrElse(true) } + + /** + * Merge the options of FileTable and the table operation while respecting the + * keys of the table operation. + * + * @param options The options of the table operation. + * @return + */ + protected def mergedOptions(options: CaseInsensitiveStringMap): CaseInsensitiveStringMap = { + val finalOptions = this.options.asCaseSensitiveMap().asScala ++ + options.asCaseSensitiveMap().asScala + new CaseInsensitiveStringMap(finalOptions.asJava) + } } object FileTable { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala index 8b4fd3af6ded7..4c201ca66cf6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/csv/CSVTable.scala @@ -38,7 +38,7 @@ case class CSVTable( fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { override def newScanBuilder(options: CaseInsensitiveStringMap): CSVScanBuilder = - CSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + CSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, mergedOptions(options)) override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { val parsedOptions = new CSVOptions( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala index c567e87e7d767..54244c4d95e77 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala @@ -38,7 +38,7 @@ case class JsonTable( fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { override def newScanBuilder(options: CaseInsensitiveStringMap): JsonScanBuilder = - new JsonScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + JsonScanBuilder(sparkSession, fileIndex, schema, dataSchema, mergedOptions(options)) override def inferSchema(files: Seq[FileStatus]): Option[StructType] = { val parsedOptions = new JSONOptionsInRead( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala index ca4b83b3c58f1..1037370967c87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/orc/OrcTable.scala @@ -38,7 +38,7 @@ case class OrcTable( extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { override def newScanBuilder(options: CaseInsensitiveStringMap): OrcScanBuilder = - new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, mergedOptions(options)) override def inferSchema(files: Seq[FileStatus]): Option[StructType] = OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala index e593ad7d0c0cd..8463a05569c05 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetTable.scala @@ -38,7 +38,7 @@ case class ParquetTable( extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { override def newScanBuilder(options: CaseInsensitiveStringMap): ParquetScanBuilder = - new ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema, mergedOptions(options)) override def inferSchema(files: Seq[FileStatus]): Option[StructType] = ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala index 046bdcb69846e..87ae34532f88a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/text/TextTable.scala @@ -34,7 +34,7 @@ case class TextTable( fallbackFileFormat: Class[_ <: FileFormat]) extends FileTable(sparkSession, options, paths, userSpecifiedSchema) { override def newScanBuilder(options: CaseInsensitiveStringMap): TextScanBuilder = - TextScanBuilder(sparkSession, fileIndex, schema, dataSchema, options) + TextScanBuilder(sparkSession, fileIndex, schema, dataSchema, mergedOptions(options)) override def inferSchema(files: Seq[FileStatus]): Option[StructType] = Some(StructType(Array(StructField("value", StringType)))) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala index 4160516deece5..0316f09e42ce3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/FileTableSuite.scala @@ -23,8 +23,15 @@ import org.apache.hadoop.fs.FileStatus import org.apache.spark.sql.{QueryTest, SparkSession} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder} +import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.FileFormat import org.apache.spark.sql.execution.datasources.text.TextFileFormat +import org.apache.spark.sql.execution.datasources.v2.csv.CSVScanBuilder +import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder +import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder +import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder +import org.apache.spark.sql.execution.datasources.v2.text.TextScanBuilder +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -53,6 +60,8 @@ class DummyFileTable( class FileTableSuite extends QueryTest with SharedSparkSession { + private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text") + test("Data type validation should check data schema only") { withTempPath { dir => val df = spark.createDataFrame(Seq(("a", 1), ("b", 2))).toDF("v", "p") @@ -85,4 +94,38 @@ class FileTableSuite extends QueryTest with SharedSparkSession { assert(table.dataSchema == expectedDataSchema) } } + + allFileBasedDataSources.foreach { format => + test(s"SPARK-49519: Merge options of table and relation when constructing FileScanBuilder" + + s" - $format") { + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") { + val userSpecifiedSchema = StructType(Seq(StructField("c1", StringType))) + + DataSource.lookupDataSourceV2(format, spark.sessionState.conf) match { + case Some(provider) => + val dsOptions = new CaseInsensitiveStringMap( + Map("k1" -> "v1", "k2" -> "ds_v2").asJava) + val table = provider.getTable( + userSpecifiedSchema, + Array.empty, + dsOptions.asCaseSensitiveMap()) + val tableOptions = new CaseInsensitiveStringMap( + Map("k2" -> "table_v2", "k3" -> "v3").asJava) + val mergedOptions = table.asInstanceOf[FileTable].newScanBuilder(tableOptions) match { + case csv: CSVScanBuilder => csv.options + case json: JsonScanBuilder => json.options + case orc: OrcScanBuilder => orc.options + case parquet: ParquetScanBuilder => parquet.options + case text: TextScanBuilder => text.options + } + assert(mergedOptions.size() == 3) + assert("v1".equals(mergedOptions.get("k1"))) + assert("table_v2".equals(mergedOptions.get("k2"))) + assert("v3".equals(mergedOptions.get("k3"))) + case _ => + throw new IllegalArgumentException(s"Failed to get table provider for $format") + } + } + } + } }