From c6a7cff77c49708cf135bd2bbee2cdb0db201578 Mon Sep 17 00:00:00 2001 From: huberylee Date: Fri, 17 Jun 2022 18:33:58 +0800 Subject: [PATCH] [HUDI-4165] Support Create/Drop/Show/Refresh Index Syntax for Spark SQL (#5761) * Support Create/Drop/Show/Refresh Index Syntax for Spark SQL --- .../apache/hudi/common/index/HoodieIndex.java | 119 +++++++++++++++ .../hudi/common/index/HoodieIndexType.java | 54 +++++++ .../hudi/spark/sql/parser/HoodieSqlCommon.g4 | 76 +++++++++- .../sql/catalyst/plans/logical/Index.scala | 111 ++++++++++++++ .../sql/hudi/analysis/HoodieAnalysis.scala | 22 +++ .../sql/hudi/command/IndexCommands.scala | 101 +++++++++++++ .../parser/HoodieSqlCommonAstBuilder.scala | 143 +++++++++++++++++- .../hudi/command/index/TestIndexSyntax.scala | 85 +++++++++++ 8 files changed, 709 insertions(+), 2 deletions(-) create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java create mode 100644 hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala create mode 100644 hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala diff --git a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java b/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java new file mode 100644 index 000000000000..6dabb1a41f8c --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndex.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.index; + +import java.util.Arrays; +import java.util.Map; + +public class HoodieIndex { + private String indexName; + private String[] colNames; + private HoodieIndexType indexType; + private Map> colOptions; + private Map options; + + public HoodieIndex() { + } + + public HoodieIndex( + String indexName, + String[] colNames, + HoodieIndexType indexType, + Map> colOptions, + Map options) { + this.indexName = indexName; + this.colNames = colNames; + this.indexType = indexType; + this.colOptions = colOptions; + this.options = options; + } + + public String getIndexName() { + return indexName; + } + + public String[] getColNames() { + return colNames; + } + + public HoodieIndexType getIndexType() { + return indexType; + } + + public Map> getColOptions() { + return colOptions; + } + + public Map getOptions() { + return options; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public String toString() { + return "HoodieIndex{" + + "indexName='" + indexName + '\'' + + ", colNames='" + Arrays.toString(colNames) + '\'' + + ", indexType=" + indexType + + ", colOptions=" + colOptions + + ", options=" + options + + '}'; + } + + public static class Builder { + private String indexName; + private String[] colNames; + private HoodieIndexType indexType; + private Map> colOptions; + private Map options; + + public Builder setIndexName(String indexName) { + this.indexName = indexName; + return this; + } + + public Builder setColNames(String[] colNames) { + this.colNames = colNames; + return this; + } + + public Builder setIndexType(String indexType) { + this.indexType = HoodieIndexType.of(indexType); + return this; + } + + public Builder setColOptions(Map> colOptions) { + this.colOptions = colOptions; + return this; + } + + public Builder setOptions(Map options) { + this.options = options; + return this; + } + + public HoodieIndex build() { + return new HoodieIndex(indexName, colNames, indexType, colOptions, options); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java b/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java new file mode 100644 index 000000000000..03618a767906 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/index/HoodieIndexType.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.common.index; + +import org.apache.hudi.exception.HoodieIndexException; + +import java.util.Arrays; + +public enum HoodieIndexType { + LUCENE((byte) 1); + + private final byte type; + + HoodieIndexType(byte type) { + this.type = type; + } + + public byte getValue() { + return type; + } + + public static HoodieIndexType of(byte indexType) { + return Arrays.stream(HoodieIndexType.values()) + .filter(t -> t.type == indexType) + .findAny() + .orElseThrow(() -> + new HoodieIndexException("Unknown hoodie index type:" + indexType)); + } + + public static HoodieIndexType of(String indexType) { + return Arrays.stream(HoodieIndexType.values()) + .filter(t -> t.name().equals(indexType.toUpperCase())) + .findAny() + .orElseThrow(() -> + new HoodieIndexException("Unknown hoodie index type:" + indexType)); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 index 0cde14a4e4a0..65e17bfb4620 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 +++ b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 @@ -48,6 +48,13 @@ statement : compactionStatement #compactionCommand | CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call + | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE? + tableIdentifier (USING indexType=identifier)? + LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN + (OPTIONS indexOptions=propertyList)? #createIndex + | DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier #dropIndex + | SHOW INDEXES (FROM | IN) TABLE? tableIdentifier #showIndexes + | REFRESH INDEX identifier ON TABLE? tableIdentifier #refreshIndex | .*? #passThrough ; @@ -99,6 +106,14 @@ | MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral ; + multipartIdentifierPropertyList + : multipartIdentifierProperty (COMMA multipartIdentifierProperty)* + ; + + multipartIdentifierProperty + : multipartIdentifier (OPTIONS options=propertyList)? + ; + multipartIdentifier : parts+=identifier ('.' parts+=identifier)* ; @@ -114,9 +129,53 @@ ; nonReserved - : CALL | COMPACTION | RUN | SCHEDULE | ON | SHOW | LIMIT + : CALL + | COMPACTION + | CREATE + | DROP + | EXISTS + | FROM + | IN + | INDEX + | INDEXES + | IF + | LIMIT + | NOT + | ON + | OPTIONS + | REFRESH + | RUN + | SCHEDULE + | SHOW + | TABLE + | USING ; + propertyList + : LEFT_PAREN property (COMMA property)* RIGHT_PAREN + ; + + property + : key=propertyKey (EQ? value=propertyValue)? + ; + + propertyKey + : identifier (DOT identifier)* + | STRING + ; + + propertyValue + : INTEGER_VALUE + | DECIMAL_VALUE + | booleanValue + | STRING + ; + + LEFT_PAREN: '('; + RIGHT_PAREN: ')'; + COMMA: ','; + DOT: '.'; + ALL: 'ALL'; AT: 'AT'; CALL: 'CALL'; @@ -132,6 +191,21 @@ FALSE: 'FALSE'; INTERVAL: 'INTERVAL'; TO: 'TO'; + CREATE: 'CREATE'; + INDEX: 'INDEX'; + INDEXES: 'INDEXES'; + IF: 'IF'; + NOT: 'NOT'; + EXISTS: 'EXISTS'; + TABLE: 'TABLE'; + USING: 'USING'; + OPTIONS: 'OPTIONS'; + DROP: 'DROP'; + FROM: 'FROM'; + IN: 'IN'; + REFRESH: 'REFRESH'; + + EQ: '=' | '=='; PLUS: '+'; MINUS: '-'; diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala new file mode 100644 index 000000000000..12ee2e805834 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.StringType + +/** + * The logical plan of the CREATE INDEX command. + */ +case class CreateIndex( + table: LogicalPlan, + indexName: String, + indexType: String, + ignoreIfExists: Boolean, + columns: Seq[(Attribute, Map[String, String])], + properties: Map[String, String], + override val output: Seq[Attribute] = CreateIndex.getOutputAttrs) extends Command { + + override def children: Seq[LogicalPlan] = Seq(table) + + override lazy val resolved: Boolean = table.resolved && columns.forall(_._1.resolved) + + def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): CreateIndex = { + copy(table = newChild.head) + } +} + +object CreateIndex { + def getOutputAttrs: Seq[Attribute] = Seq.empty +} + +/** + * The logical plan of the DROP INDEX command. + */ +case class DropIndex( + table: LogicalPlan, + indexName: String, + ignoreIfNotExists: Boolean, + override val output: Seq[Attribute] = DropIndex.getOutputAttrs) extends Command { + + override def children: Seq[LogicalPlan] = Seq(table) + + def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): DropIndex = { + copy(table = newChild.head) + } +} + +object DropIndex { + def getOutputAttrs: Seq[Attribute] = Seq.empty +} + +/** + * The logical plan of the SHOW INDEXES command. + */ +case class ShowIndexes( + table: LogicalPlan, + override val output: Seq[Attribute] = ShowIndexes.getOutputAttrs) extends Command { + + override def children: Seq[LogicalPlan] = Seq(table) + + def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): ShowIndexes = { + copy(table = newChild.head) + } +} + +object ShowIndexes { + def getOutputAttrs: Seq[Attribute] = Seq( + AttributeReference("index_name", StringType, nullable = false)(), + AttributeReference("col_name", StringType, nullable = false)(), + AttributeReference("index_type", StringType, nullable = false)(), + AttributeReference("col_options", StringType, nullable = true)(), + AttributeReference("options", StringType, nullable = true)() + ) +} + +/** + * The logical plan of the REFRESH INDEX command. + */ +case class RefreshIndex( + table: LogicalPlan, + indexName: String, + override val output: Seq[Attribute] = RefreshIndex.getOutputAttrs) extends Command { + + override def children: Seq[LogicalPlan] = Seq(table) + + def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): RefreshIndex = { + copy(table = newChild.head) + } +} + +object RefreshIndex { + def getOutputAttrs: Seq[Attribute] = Seq.empty +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index 97e453ff7e93..a44abc72cf53 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -171,6 +171,28 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan] } else { c } + + // Convert to CreateIndexCommand + case CreateIndex(table, indexName, indexType, ignoreIfExists, columns, properties, output) + if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) => + CreateIndexCommand( + getTableIdentifier(table), indexName, indexType, ignoreIfExists, columns, properties, output) + + // Convert to DropIndexCommand + case DropIndex(table, indexName, ignoreIfNotExists, output) + if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) => + DropIndexCommand(getTableIdentifier(table), indexName, ignoreIfNotExists, output) + + // Convert to ShowIndexesCommand + case ShowIndexes(table, output) + if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) => + ShowIndexesCommand(getTableIdentifier(table), output) + + // Covert to RefreshCommand + case RefreshIndex(table, indexName, output) + if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) => + RefreshIndexCommand(getTableIdentifier(table), indexName, output) + case _ => plan } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala new file mode 100644 index 000000000000..5d73af31a949 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.command + +import org.apache.hudi.common.index.HoodieIndex +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.{Row, SparkSession} + +case class CreateIndexCommand( + tableId: TableIdentifier, + indexName: String, + indexType: String, + ignoreIfExists: Boolean, + columns: Seq[(Attribute, Map[String, String])], + properties: Map[String, String], + override val output: Seq[Attribute]) extends IndexBaseCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + // The implementation for different index type + Seq.empty + } +} + +case class DropIndexCommand( + tableId: TableIdentifier, + indexName: String, + ignoreIfNotExists: Boolean, + override val output: Seq[Attribute]) extends IndexBaseCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + // The implementation for different index type + Seq.empty + } +} + +case class ShowIndexesCommand( + tableId: TableIdentifier, + override val output: Seq[Attribute]) extends IndexBaseCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + // The implementation for different index type + Seq.empty + } +} + +case class RefreshIndexCommand( + tableId: TableIdentifier, + indexName: String, + override val output: Seq[Attribute]) extends IndexBaseCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + // The implementation for different index type + Seq.empty + } +} + +abstract class IndexBaseCommand extends HoodieLeafRunnableCommand with Logging { + + /** + * Check hoodie index exists. In a hoodie table, hoodie index name + * must be unique, so the index name will be checked firstly, + * + * @param secondaryIndexes Current hoodie indexes + * @param indexName The index name to be checked + * @param colNames The column names to be checked + * @return true if the index exists + */ + def indexExists( + secondaryIndexes: Option[Array[HoodieIndex]], + indexName: String, + indexType: Option[String] = None, + colNames: Option[Array[String]] = None): Boolean = { + secondaryIndexes.exists(i => { + i.exists(_.getIndexName.equals(indexName)) || + // Index type and column name need to be checked if present + indexType.exists(t => + colNames.exists(c => + i.exists(index => + index.getIndexType.name().equalsIgnoreCase(t) && index.getColNames.sameElements(c)))) + }) + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala index 771798dd225e..d0e5ed613385 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala @@ -25,11 +25,12 @@ import org.apache.hudi.spark.sql.parser.HoodieSqlCommonParser._ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface, ParserUtils} import org.apache.spark.sql.catalyst.plans.logical._ +import java.util.Locale import scala.collection.JavaConverters._ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface) @@ -147,4 +148,144 @@ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface private def typedVisit[T](ctx: ParseTree): T = { ctx.accept(this).asInstanceOf[T] } + + /** + * Create an index, returning a [[CreateIndex]] logical plan. + * For example: + * {{{ + * CREATE INDEX index_name ON [TABLE] table_name [USING index_type] (column_index_property_list) + * [OPTIONS indexPropertyList] + * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [ , . . . ] + * indexPropertyList: index_property_name [= index_property_value] [ , . . . ] + * }}} + */ + override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan = withOrigin(ctx) { + val (indexName, indexType) = if (ctx.identifier.size() == 1) { + (ctx.identifier(0).getText, "") + } else { + (ctx.identifier(0).getText, ctx.identifier(1).getText) + } + + val columns = ctx.columns.multipartIdentifierProperty.asScala + .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq + val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala + .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq + val options = Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty) + + CreateIndex( + visitTableIdentifier(ctx.tableIdentifier()), + indexName, + indexType, + ctx.EXISTS != null, + columns.map(UnresolvedAttribute(_)).zip(columnsProperties), + options) + } + + /** + * Drop an index, returning a [[DropIndex]] logical plan. + * For example: + * {{{ + * DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name + * }}} + */ + override def visitDropIndex(ctx: DropIndexContext): LogicalPlan = withOrigin(ctx) { + val indexName = ctx.identifier.getText + DropIndex( + visitTableIdentifier(ctx.tableIdentifier()), + indexName, + ctx.EXISTS != null) + } + + /** + * Show indexes, returning a [[ShowIndexes]] logical plan. + * For example: + * {{{ + * SHOW INDEXES (FROM | IN) [TABLE] table_name + * }}} + */ + override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan = withOrigin(ctx) { + ShowIndexes(visitTableIdentifier(ctx.tableIdentifier())) + } + + /** + * Refresh index, returning a [[RefreshIndex]] logical plan + * For example: + * {{{ + * REFRESH INDEX index_name ON [TABLE] table_name + * }}} + */ + override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan = withOrigin(ctx) { + RefreshIndex(visitTableIdentifier(ctx.tableIdentifier()), ctx.identifier.getText) + } + + /** + * Convert a property list into a key-value map. + * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. + */ + override def visitPropertyList( + ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) { + val properties = ctx.property.asScala.map { property => + val key = visitPropertyKey(property.key) + val value = visitPropertyValue(property.value) + key -> value + } + // Check for duplicate property names. + checkDuplicateKeys(properties.toSeq, ctx) + properties.toMap + } + + /** + * Parse a key-value map from a [[PropertyListContext]], assuming all values are specified. + */ + def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = { + val props = visitPropertyList(ctx) + val badKeys = props.collect { case (key, null) => key } + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props + } + + /** + * Parse a list of keys from a [[PropertyListContext]], assuming no values are specified. + */ + def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = { + val props = visitPropertyList(ctx) + val badKeys = props.filter { case (_, v) => v != null }.keys + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props.keys.toSeq + } + + /** + * A property key can either be String or a collection of dot separated elements. This + * function extracts the property key based on whether its a string literal or a property + * identifier. + */ + override def visitPropertyKey(key: PropertyKeyContext): String = { + if (key.STRING != null) { + string(key.STRING) + } else { + key.getText + } + } + + /** + * A property value can be String, Integer, Boolean or Decimal. This function extracts + * the property value based on whether its a string, integer, boolean or decimal literal. + */ + override def visitPropertyValue(value: PropertyValueContext): String = { + if (value == null) { + null + } else if (value.STRING != null) { + string(value.STRING) + } else if (value.booleanValue != null) { + value.getText.toLowerCase(Locale.ROOT) + } else { + value.getText + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala new file mode 100644 index 000000000000..3536ae9e0ac7 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestIndexSyntax.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.command.index + +import org.apache.spark.sql.catalyst.analysis.Analyzer +import org.apache.spark.sql.catalyst.parser.ParserInterface +import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase +import org.apache.spark.sql.hudi.command.{CreateIndexCommand, DropIndexCommand, ShowIndexesCommand} + +class TestIndexSyntax extends HoodieSparkSqlTestBase { + + test("Test Create/Drop/Show/Refresh Index") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + + val sqlParser: ParserInterface = spark.sessionState.sqlParser + val analyzer: Analyzer = spark.sessionState.analyzer + + var logicalPlan = sqlParser.parsePlan(s"show indexes from default.$tableName") + var resolvedLogicalPlan = analyzer.execute(logicalPlan) + assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[ShowIndexesCommand].tableId.toString()) + + logicalPlan = sqlParser.parsePlan(s"create index idx_name on $tableName using lucene (name) options(block_size=1024)") + resolvedLogicalPlan = analyzer.execute(logicalPlan) + assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].tableId.toString()) + assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName) + assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType) + assertResult(false)(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].ignoreIfExists) + assertResult(Map("block_size" -> "1024"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].properties) + + logicalPlan = sqlParser.parsePlan(s"create index if not exists idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)") + resolvedLogicalPlan = analyzer.execute(logicalPlan) + assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].tableId.toString()) + assertResult("idx_price")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexName) + assertResult("lucene")(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].indexType) + assertResult(Map("order" -> "desc"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].columns.head._2) + assertResult(Map("block_size" -> "512"))(resolvedLogicalPlan.asInstanceOf[CreateIndexCommand].properties) + + logicalPlan = sqlParser.parsePlan(s"drop index if exists idx_name on $tableName") + resolvedLogicalPlan = analyzer.execute(logicalPlan) + assertResult(s"`default`.`$tableName`")(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].tableId.toString()) + assertResult("idx_name")(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].indexName) + assertResult(true)(resolvedLogicalPlan.asInstanceOf[DropIndexCommand].ignoreIfNotExists) + } + } + } +}