Skip to content

Commit

Permalink
[SPARK-49519][SQL] Merge options of table and relation when construct…
Browse files Browse the repository at this point in the history
…ing FileScanBuilder

### What changes were proposed in this pull request?
Merge the options of both `DataSourceV2Relation` and `FileTable` when constructing `FileScanBuilder`.

### Why are the changes needed?
Currently, the subclass of `FileTable` only uses the options from relation when constructing the `FileScanBuilder`, which leads to the omission of the contents in `FileTable.options`. For the `TableCatalog`, the `dsOptions` can be set into the `FileTable.options` returned by the `TableCatalog.loadTable` method. If only the relation options are used here, the `TableCatalog` will not be able to pass `dsOptions` that contains table options to `FileScan`.

Merge the two options is a better option.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#47996 from liujiayi771/csv-options.

Lead-authored-by: joey.ljy <joey.ljy@alibaba-inc.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
2 people authored and himadripal committed Oct 19, 2024
1 parent 789be90 commit f08ed50
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ case class AvroTable(
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
override def newScanBuilder(options: CaseInsensitiveStringMap): AvroScanBuilder =
new AvroScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
AvroScanBuilder(sparkSession, fileIndex, schema, dataSchema, mergedOptions(options))

override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
AvroUtils.inferSchema(sparkSession, options.asScala.toMap, files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,19 @@ abstract class FileTable(
val entry = options.get(DataSource.GLOB_PATHS_KEY)
Option(entry).map(_ == "true").getOrElse(true)
}

/**
* Merge the options of FileTable and the table operation while respecting the
* keys of the table operation.
*
* @param options The options of the table operation.
* @return
*/
protected def mergedOptions(options: CaseInsensitiveStringMap): CaseInsensitiveStringMap = {
val finalOptions = this.options.asCaseSensitiveMap().asScala ++
options.asCaseSensitiveMap().asScala
new CaseInsensitiveStringMap(finalOptions.asJava)
}
}

object FileTable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class CSVTable(
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
override def newScanBuilder(options: CaseInsensitiveStringMap): CSVScanBuilder =
CSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
CSVScanBuilder(sparkSession, fileIndex, schema, dataSchema, mergedOptions(options))

override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
val parsedOptions = new CSVOptions(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class JsonTable(
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
override def newScanBuilder(options: CaseInsensitiveStringMap): JsonScanBuilder =
new JsonScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
JsonScanBuilder(sparkSession, fileIndex, schema, dataSchema, mergedOptions(options))

override def inferSchema(files: Seq[FileStatus]): Option[StructType] = {
val parsedOptions = new JSONOptionsInRead(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class OrcTable(
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {

override def newScanBuilder(options: CaseInsensitiveStringMap): OrcScanBuilder =
new OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
OrcScanBuilder(sparkSession, fileIndex, schema, dataSchema, mergedOptions(options))

override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
OrcUtils.inferSchema(sparkSession, files, options.asScala.toMap)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ case class ParquetTable(
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {

override def newScanBuilder(options: CaseInsensitiveStringMap): ParquetScanBuilder =
new ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
ParquetScanBuilder(sparkSession, fileIndex, schema, dataSchema, mergedOptions(options))

override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
ParquetUtils.inferSchema(sparkSession, options.asScala.toMap, files)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ case class TextTable(
fallbackFileFormat: Class[_ <: FileFormat])
extends FileTable(sparkSession, options, paths, userSpecifiedSchema) {
override def newScanBuilder(options: CaseInsensitiveStringMap): TextScanBuilder =
TextScanBuilder(sparkSession, fileIndex, schema, dataSchema, options)
TextScanBuilder(sparkSession, fileIndex, schema, dataSchema, mergedOptions(options))

override def inferSchema(files: Seq[FileStatus]): Option[StructType] =
Some(StructType(Array(StructField("value", StringType))))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,15 @@ import org.apache.hadoop.fs.FileStatus
import org.apache.spark.sql.{QueryTest, SparkSession}
import org.apache.spark.sql.connector.read.ScanBuilder
import org.apache.spark.sql.connector.write.{LogicalWriteInfo, WriteBuilder}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.execution.datasources.text.TextFileFormat
import org.apache.spark.sql.execution.datasources.v2.csv.CSVScanBuilder
import org.apache.spark.sql.execution.datasources.v2.json.JsonScanBuilder
import org.apache.spark.sql.execution.datasources.v2.orc.OrcScanBuilder
import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScanBuilder
import org.apache.spark.sql.execution.datasources.v2.text.TextScanBuilder
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CaseInsensitiveStringMap
Expand Down Expand Up @@ -53,6 +60,8 @@ class DummyFileTable(

class FileTableSuite extends QueryTest with SharedSparkSession {

private val allFileBasedDataSources = Seq("orc", "parquet", "csv", "json", "text")

test("Data type validation should check data schema only") {
withTempPath { dir =>
val df = spark.createDataFrame(Seq(("a", 1), ("b", 2))).toDF("v", "p")
Expand Down Expand Up @@ -85,4 +94,38 @@ class FileTableSuite extends QueryTest with SharedSparkSession {
assert(table.dataSchema == expectedDataSchema)
}
}

allFileBasedDataSources.foreach { format =>
test(s"SPARK-49519: Merge options of table and relation when constructing FileScanBuilder" +
s" - $format") {
withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
val userSpecifiedSchema = StructType(Seq(StructField("c1", StringType)))

DataSource.lookupDataSourceV2(format, spark.sessionState.conf) match {
case Some(provider) =>
val dsOptions = new CaseInsensitiveStringMap(
Map("k1" -> "v1", "k2" -> "ds_v2").asJava)
val table = provider.getTable(
userSpecifiedSchema,
Array.empty,
dsOptions.asCaseSensitiveMap())
val tableOptions = new CaseInsensitiveStringMap(
Map("k2" -> "table_v2", "k3" -> "v3").asJava)
val mergedOptions = table.asInstanceOf[FileTable].newScanBuilder(tableOptions) match {
case csv: CSVScanBuilder => csv.options
case json: JsonScanBuilder => json.options
case orc: OrcScanBuilder => orc.options
case parquet: ParquetScanBuilder => parquet.options
case text: TextScanBuilder => text.options
}
assert(mergedOptions.size() == 3)
assert("v1".equals(mergedOptions.get("k1")))
assert("table_v2".equals(mergedOptions.get("k2")))
assert("v3".equals(mergedOptions.get("k3")))
case _ =>
throw new IllegalArgumentException(s"Failed to get table provider for $format")
}
}
}
}
}

0 comments on commit f08ed50

Please sign in to comment.