diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 268a65b81ff68..57651684070f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -239,6 +239,14 @@ class JDBCOptions( .get(JDBC_PREFER_TIMESTAMP_NTZ) .map(_.toBoolean) .getOrElse(SQLConf.get.timestampType == TimestampNTZType) + + override def hashCode: Int = this.parameters.hashCode() + + override def equals(other: Any): Boolean = other match { + case otherOption: JDBCOptions => + otherOption.parameters.equals(this.parameters) + case _ => false + } } class JdbcOptionsInWrite( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 6b85911dca773..eed64b873c451 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -512,4 +513,18 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { assert(t.schema === replaced) } } + + test("SPARK-45449: Cache Invalidation Issue with JDBC Table") { + withTable("h2.test.cache_t") { + withConnection { conn => + conn.prepareStatement( + """CREATE TABLE "test"."cache_t" (id decimal(25) PRIMARY KEY NOT NULL, + |name TEXT(32) NOT NULL)""".stripMargin).executeUpdate() + } + sql("INSERT OVERWRITE h2.test.cache_t SELECT 1 AS id, 'a' AS name") + sql("CACHE TABLE t1 SELECT id, name FROM h2.test.cache_t") + val plan = sql("select * from t1").queryExecution.sparkPlan + assert(plan.isInstanceOf[InMemoryTableScanExec]) + } + } }