Skip to content

Commit

Permalink
[SPARK-34055][SQL][3.1] Refresh cache in ALTER TABLE .. ADD PARTITION
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?
Invoke `refreshTable()` from `CatalogImpl` which refreshes the cache in v1 `ALTER TABLE .. ADD PARTITION`.

### Why are the changes needed?
This fixes the issues portrayed by the example:
```sql
spark-sql> create table tbl (col int, part int) using parquet partitioned by (part);
spark-sql> insert into tbl partition (part=0) select 0;
spark-sql> cache table tbl;
spark-sql> select * from tbl;
0	0
spark-sql> show table extended like 'tbl' partition(part=0);
default	tbl	false	Partition Values: [part=0]
Location: file:/Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=0
...
```
Create new partition by copying the existing one:
```
$ cp -r /Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=0 /Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=1
```
```sql
spark-sql> alter table tbl add partition (part=1) location '/Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=1';
spark-sql> select * from tbl;
0	0
```

The last query must return `0	1` since it has been added by `ALTER TABLE .. ADD PARTITION`.

### Does this PR introduce _any_ user-facing change?
Yes. After the changes for the example above:
```sql
...
spark-sql> alter table tbl add partition (part=1) location '/Users/maximgekk/proj/add-partition-refresh-cache-2/spark-warehouse/tbl/part=1';
spark-sql> select * from tbl;
0	0
0	1
```

### How was this patch tested?
By running the affected test suite:
```
$ build/sbt -Phive-2.3 -Phive-thriftserver "test:testOnly *CachedTableSuite"
```

Closes #31115 from MaxGekk/add-partition-refresh-cache-2-3.1.

Authored-by: Max Gekk <max.gekk@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
  • Loading branch information
MaxGekk authored and HyukjinKwon committed Jan 11, 2021
1 parent 6ef4dd4 commit e0edce7
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ case class AlterTableAddPartitionCommand(
catalog.createPartitions(table.identifier, batch, ignoreIfExists = ifNotExists)
}

sparkSession.catalog.refreshTable(table.identifier.quotedString)
if (table.stats.nonEmpty) {
if (sparkSession.sessionState.conf.autoSizeUpdateEnabled) {
val addedSize = CommandUtils.calculateMultipleLocationSizes(sparkSession, table.identifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import java.nio.file.{Files, Paths}
import scala.collection.mutable.HashSet
import scala.concurrent.duration._

import org.apache.commons.io.FileUtils

import org.apache.spark.CleanerListener
import org.apache.spark.executor.DataReadMethod._
import org.apache.spark.executor.DataReadMethod.DataReadMethod
Expand Down Expand Up @@ -1335,4 +1337,31 @@ class CachedTableSuite extends QueryTest with SQLTestUtils
QueryTest.checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 2), Row(1, 1)))
}
}

test("SPARK-34055: refresh cache in partition adding") {
withTable("t") {
sql("CREATE TABLE t (id int, part int) USING parquet PARTITIONED BY (part)")
sql("INSERT INTO t PARTITION (part=0) SELECT 0")
assert(!spark.catalog.isCached("t"))
sql("CACHE TABLE t")
assert(spark.catalog.isCached("t"))
checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0)))

// Create new partition (part = 1) in the filesystem
val information = sql("SHOW TABLE EXTENDED LIKE 't' PARTITION (part = 0)")
.select("information")
.first().getString(0)
val part0Loc = information
.split("\\r?\\n")
.filter(_.startsWith("Location:"))
.head
.replace("Location: file:", "")
val part1Loc = part0Loc.replace("part=0", "part=1")
FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))

sql(s"ALTER TABLE t ADD PARTITION (part=1) LOCATION '$part1Loc'")
assert(spark.catalog.isCached("t"))
checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1)))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.hive

import java.io.File

import org.apache.commons.io.FileUtils

import org.apache.spark.sql.{AnalysisException, Dataset, QueryTest, Row, SaveMode}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
Expand Down Expand Up @@ -469,4 +471,31 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
QueryTest.checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 2), Row(1, 1)))
}
}

test("SPARK-34055: refresh cache in partition adding") {
withTable("t") {
sql("CREATE TABLE t (id int, part int) USING hive PARTITIONED BY (part)")
sql("INSERT INTO t PARTITION (part=0) SELECT 0")
assert(!spark.catalog.isCached("t"))
sql("CACHE TABLE t")
assert(spark.catalog.isCached("t"))
checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0)))

// Create new partition (part = 1) in the filesystem
val information = sql("SHOW TABLE EXTENDED LIKE 't' PARTITION (part = 0)")
.select("information")
.first().getString(0)
val part0Loc = information
.split("\\r?\\n")
.filter(_.startsWith("Location:"))
.head
.replace("Location: file:", "")
val part1Loc = part0Loc.replace("part=0", "part=1")
FileUtils.copyDirectory(new File(part0Loc), new File(part1Loc))

sql(s"ALTER TABLE t ADD PARTITION (part=1) LOCATION '$part1Loc'")
assert(spark.catalog.isCached("t"))
checkAnswer(sql("SELECT * FROM t"), Seq(Row(0, 0), Row(0, 1)))
}
}
}

0 comments on commit e0edce7

Please sign in to comment.