Skip to content

Commit

Permalink
Fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
boneanxs committed Oct 27, 2023
1 parent f7c1981 commit f462cf7
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,73 +19,77 @@

package org.apache.spark.sql.hudi.command.index

import org.apache.hudi.HoodieSparkUtils
import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase

class TestSecondaryIndex extends HoodieSparkSqlTestBase {

test("Test Create/Show/Drop Secondary Index") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
if (HoodieSparkUtils.gteqSpark3_2) {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
spark.sql(
s"""
|create table $tableName (
| id int,
| name string,
| price double,
| ts long
|) using hudi
| options (
| primaryKey ='id',
| type = '$tableType',
| preCombineField = 'ts'
| )
| partitioned by(ts)
| location '$basePath'
""".stripMargin)
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
checkAnswer(s"show indexes from default.$tableName")()
spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)")
spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)")
checkAnswer(s"show indexes from default.$tableName")()

checkAnswer(s"create index idx_name on $tableName using lucene (name) options(block_size=1024)")()
checkAnswer(s"create index idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)")()
checkAnswer(s"create index idx_name on $tableName using lucene (name) options(block_size=1024)")()
checkAnswer(s"create index idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)")()

// Create an index with multiple columns
checkException(s"create index idx_id_ts on $tableName using lucene (id, ts)")("Lucene index only support single column")
// Create an index with multiple columns
checkException(s"create index idx_id_ts on $tableName using lucene (id, ts)")("Lucene index only support single column")

// Create an index with the occupied name
checkException(s"create index idx_price on $tableName using lucene (price)")(
"Secondary index already exists: idx_price"
)
// Create an index with the occupied name
checkException(s"create index idx_price on $tableName using lucene (price)")(
"Secondary index already exists: idx_price"
)

// Create indexes repeatedly on columns(index name is different, but the index type and involved column is same)
checkException(s"create index idx_price_1 on $tableName using lucene (price)")(
"Secondary index already exists: idx_price_1"
)
// Create indexes repeatedly on columns(index name is different, but the index type and involved column is same)
checkException(s"create index idx_price_1 on $tableName using lucene (price)")(
"Secondary index already exists: idx_price_1"
)

spark.sql(s"show indexes from $tableName").show()
checkAnswer(s"show indexes from $tableName")(
Seq("idx_name", "name", "lucene", "", "{\"block_size\":\"1024\"}"),
Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
)
spark.sql(s"show indexes from $tableName").show()
checkAnswer(s"show indexes from $tableName")(
Seq("idx_name", "name", "lucene", "", "{\"block_size\":\"1024\"}"),
Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
)

checkAnswer(s"drop index idx_name on $tableName")()
checkException(s"drop index idx_name on $tableName")("Secondary index not exists: idx_name")
checkAnswer(s"drop index idx_name on $tableName")()
checkException(s"drop index idx_name on $tableName")("Secondary index not exists: idx_name")

spark.sql(s"show indexes from $tableName").show()
checkAnswer(s"show indexes from $tableName")(
Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
)
spark.sql(s"show indexes from $tableName").show()
checkAnswer(s"show indexes from $tableName")(
Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}")
)

checkAnswer(s"drop index idx_price on $tableName")()
checkAnswer(s"show indexes from $tableName")()
checkAnswer(s"drop index idx_price on $tableName")()
checkAnswer(s"show indexes from $tableName")()

checkException(s"drop index idx_price on $tableName")("Secondary index not exists: idx_price")
checkException(s"drop index idx_price on $tableName")("Secondary index not exists: idx_price")

checkException(s"create index idx_price_1 on $tableName using lucene (field_not_exist)")(
"Field not exists: field_not_exist"
)
checkExceptionContain(s"create index idx_price_1 on $tableName using lucene (field_not_exist)")(
"Missing field field_not_exist"
)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ case class HoodieSpark32PlusResolveReferences(spark: SparkSession) extends Rule[

private def missingFieldError(fieldName: Seq[String], table: LogicalPlan, context: Origin): Throwable = {
throw new AnalysisException(
s"Missing field $fieldName with schema:\n" +
s"Missing field ${fieldName.mkString(".")} with schema:\n" +
table.schema.treeString,
context.line,
context.startPosition)
Expand Down

0 comments on commit f462cf7

Please sign in to comment.