From 7e84771b726004d6e29a781a4ca657b14e3feb67 Mon Sep 17 00:00:00 2001 From: liangyongyuan Date: Sat, 7 Oct 2023 11:48:07 +0800 Subject: [PATCH 1/6] [SPARK-45449][SQL] Cache Invalidation Issue with JDBC Table --- .../datasources/jdbc/JDBCOptions.scala | 9 +++++++++ .../v2/jdbc/JDBCTableCatalogSuite.scala | 17 +++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 268a65b81ff68..07a5f80f5440b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -46,6 +46,15 @@ class JDBCOptions( JDBCOptions.JDBC_TABLE_NAME -> table))) } + override def hashCode: Int = this.parameters.hashCode() + + /** Returns true if the members of this AttributeSet and other are the same. */ + override def equals(other: Any): Boolean = other match { + case otherOption: JDBCOptions => + otherOption.parameters.equals(this.parameters) + case _ => false + } + /** * Returns a property with all options. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 6b85911dca773..5340a39b201d6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.analysis.{NoSuchNamespaceException, TableAlreadyExistsException} import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.util.CharVarcharUtils +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types._ @@ -512,4 +513,20 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { assert(t.schema === replaced) } } + + test("SPARK-45449: Cache Invalidation Issue with JDBC Table") { + withConnection { conn => + conn.prepareStatement( + """CREATE TABLE "test"."cache_t" (id decimal(25) PRIMARY KEY NOT NULL, + |name TEXT(32) NOT NULL)""".stripMargin).executeUpdate() + } + val ss = spark.cloneSession() + // the upper bound exceeds the maximum value of long + ss.sql("insert overwrite h2.test.cache_t select 1 as id, 'a' as name") + + sql("cache table ct1 select id, name from h2.test.cache_t") + val plan = sql("select * from ct1").queryExecution.sparkPlan + assert(plan.isInstanceOf[InMemoryTableScanExec]) + } + } From 2332c7d6375f0e876680b187808c43bef3e2bb02 Mon Sep 17 00:00:00 2001 From: liangyongyuan Date: Sat, 7 Oct 2023 11:56:35 +0800 Subject: [PATCH 2/6] remove comments --- .../spark/sql/execution/datasources/jdbc/JDBCOptions.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index 07a5f80f5440b..f0c2eeec79c67 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -48,7 +48,6 @@ class JDBCOptions( override def hashCode: Int = this.parameters.hashCode() - /** Returns true if the members of this AttributeSet and other are the same. */ override def equals(other: Any): Boolean = other match { case otherOption: JDBCOptions => otherOption.parameters.equals(this.parameters) From 92c047ff9e9f7a4a1b4582ef7cff33623edebb84 Mon Sep 17 00:00:00 2001 From: liangyongyuan Date: Sat, 7 Oct 2023 14:06:47 +0800 Subject: [PATCH 3/6] move function to tail --- .../execution/datasources/jdbc/JDBCOptions.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index f0c2eeec79c67..57651684070f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -46,14 +46,6 @@ class JDBCOptions( JDBCOptions.JDBC_TABLE_NAME -> table))) } - override def hashCode: Int = this.parameters.hashCode() - - override def equals(other: Any): Boolean = other match { - case otherOption: JDBCOptions => - otherOption.parameters.equals(this.parameters) - case _ => false - } - /** * Returns a property with all options. */ @@ -247,6 +239,14 @@ class JDBCOptions( .get(JDBC_PREFER_TIMESTAMP_NTZ) .map(_.toBoolean) .getOrElse(SQLConf.get.timestampType == TimestampNTZType) + + override def hashCode: Int = this.parameters.hashCode() + + override def equals(other: Any): Boolean = other match { + case otherOption: JDBCOptions => + otherOption.parameters.equals(this.parameters) + case _ => false + } } class JdbcOptionsInWrite( From b25978e56caadbee3a53562e432bf7b0e66741d9 Mon Sep 17 00:00:00 2001 From: liangyongyuan Date: Sat, 7 Oct 2023 14:09:37 +0800 Subject: [PATCH 4/6] remove comments --- .../execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 5340a39b201d6..d18f8ac8de079 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -521,7 +521,6 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { |name TEXT(32) NOT NULL)""".stripMargin).executeUpdate() } val ss = spark.cloneSession() - // the upper bound exceeds the maximum value of long ss.sql("insert overwrite h2.test.cache_t select 1 as id, 'a' as name") sql("cache table ct1 select id, name from h2.test.cache_t") From 5d0a24284ae58748eff5eff86905038452520385 Mon Sep 17 00:00:00 2001 From: liangyongyuan Date: Sat, 7 Oct 2023 15:19:15 +0800 Subject: [PATCH 5/6] Remove session clone, uncache table and drop table at finally --- .../v2/jdbc/JDBCTableCatalogSuite.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index d18f8ac8de079..4402d1fd575a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -515,17 +515,17 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { } test("SPARK-45449: Cache Invalidation Issue with JDBC Table") { - withConnection { conn => - conn.prepareStatement( - """CREATE TABLE "test"."cache_t" (id decimal(25) PRIMARY KEY NOT NULL, - |name TEXT(32) NOT NULL)""".stripMargin).executeUpdate() + withTable("h2.test.cache_t") { + withConnection { conn => + conn.prepareStatement( + """CREATE TABLE "test"."cache_t" (id decimal(25) PRIMARY KEY NOT NULL, + |name TEXT(32) NOT NULL)""".stripMargin).executeUpdate() + } + sql("INSERT OVERWRITE h2.test.cache_t SELECT 1 AS id, 'a' AS name") + sql("CACHE TABLE t1 SELECT id, name FROM h2.test.cache_t") + val plan = sql("select * from t1").queryExecution.sparkPlan + assert(plan.isInstanceOf[InMemoryTableScanExec]) + sql("UNCACHE TABLE IF EXISTS t1") } - val ss = spark.cloneSession() - ss.sql("insert overwrite h2.test.cache_t select 1 as id, 'a' as name") - - sql("cache table ct1 select id, name from h2.test.cache_t") - val plan = sql("select * from ct1").queryExecution.sparkPlan - assert(plan.isInstanceOf[InMemoryTableScanExec]) } - } From 84a6bb5c19e5479b27225ee8f6165d514d713cef Mon Sep 17 00:00:00 2001 From: liangyongyuan Date: Mon, 9 Oct 2023 15:19:40 +0800 Subject: [PATCH 6/6] remove unuse cache --- .../execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala index 4402d1fd575a5..eed64b873c451 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalogSuite.scala @@ -525,7 +525,6 @@ class JDBCTableCatalogSuite extends QueryTest with SharedSparkSession { sql("CACHE TABLE t1 SELECT id, name FROM h2.test.cache_t") val plan = sql("select * from t1").queryExecution.sparkPlan assert(plan.isInstanceOf[InMemoryTableScanExec]) - sql("UNCACHE TABLE IF EXISTS t1") } } }