diff --git a/hudi-common/src/main/java/org/apache/hudi/common/index/SecondaryIndex.java b/hudi-common/src/main/java/org/apache/hudi/common/index/SecondaryIndex.java new file mode 100644 index 0000000000000..3e3dae596968b --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/index/SecondaryIndex.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.index; + +import java.util.Arrays; +import java.util.Map; + +public class SecondaryIndex { + private String indexName; + private String[] colNames; + private SecondaryIndexType indexType; + private Map> colOptions; + private Map options; + + public SecondaryIndex() { + } + + public SecondaryIndex( + String indexName, + String[] colNames, + SecondaryIndexType indexType, + Map> colOptions, + Map options) { + this.indexName = indexName; + this.colNames = colNames; + this.indexType = indexType; + this.colOptions = colOptions; + this.options = options; + } + + public String getIndexName() { + return indexName; + } + + public String[] getColNames() { + return colNames; + } + + public SecondaryIndexType getIndexType() { + return indexType; + } + + public Map> getColOptions() { + return colOptions; + } + + public Map getOptions() { + return options; + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public String toString() { + return "SecondaryIndex{" + + "indexName='" + indexName + '\'' + + ", colNames='" + Arrays.toString(colNames) + '\'' + + ", indexType=" + indexType + + ", colOptions=" + colOptions + + ", options=" + options + + '}'; + } + + public static class Builder { + private String indexName; + private String[] colNames; + private SecondaryIndexType indexType; + private Map> colOptions; + private Map options; + + public Builder setIndexName(String indexName) { + this.indexName = indexName; + return this; + } + + public Builder setColNames(String[] colNames) { + this.colNames = colNames; + return this; + } + + public Builder setIndexType(String indexType) { + this.indexType = SecondaryIndexType.of(indexType); + return this; + } + + public Builder setColOptions(Map> colOptions) { + this.colOptions = colOptions; + return this; + } + + public Builder setOptions(Map options) { + this.options = options; + return this; + } + + public SecondaryIndex build() { + return new SecondaryIndex(indexName, colNames, indexType, colOptions, options); + } + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/index/SecondaryIndexType.java b/hudi-common/src/main/java/org/apache/hudi/common/index/SecondaryIndexType.java new file mode 100644 index 0000000000000..d6282c1ccc799 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/common/index/SecondaryIndexType.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.common.index; + +import org.apache.hudi.exception.HoodieSecondaryIndexException; + +import java.util.Arrays; + +public enum SecondaryIndexType { + LUCENE((byte) 1); + + private final byte type; + + SecondaryIndexType(byte type) { + this.type = type; + } + + public byte getValue() { + return type; + } + + public static SecondaryIndexType of(byte indexType) { + return Arrays.stream(SecondaryIndexType.values()) + .filter(t -> t.type == indexType) + .findAny() + .orElseThrow(() -> + new HoodieSecondaryIndexException("Unknown secondary index type:" + indexType) + ); + } + + public static SecondaryIndexType of(String indexType) { + return Arrays.stream(SecondaryIndexType.values()) + .filter(t -> t.name().equals(indexType.toUpperCase())) + .findAny() + .orElseThrow(() -> + new HoodieSecondaryIndexException("Unknown secondary index type:" + indexType) + ); + } +} diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java index 886911466b95f..14626b7f36afd 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableConfig.java @@ -26,6 +26,7 @@ import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.OrderedProperties; import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.index.SecondaryIndex; import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; @@ -39,9 +40,14 @@ import org.apache.hudi.common.util.StringUtils; import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.HoodieSecondaryIndexException; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; import org.apache.hudi.keygen.constant.KeyGeneratorOptions.Config; +import com.fasterxml.jackson.annotation.JsonAutoDetect; +import com.fasterxml.jackson.annotation.PropertyAccessor; +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.avro.Schema; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -234,6 +240,11 @@ public class HoodieTableConfig extends HoodieConfig { .withDocumentation("Comma-separated list of metadata partitions that have been completely built and in-sync with data table. " + "These partitions are ready for use by the readers"); + public static final ConfigProperty SECONDARY_INDEXES = ConfigProperty + .key("hoodie.table.secondary.indexes") + .noDefaultValue() + .withDocumentation("The secondary indexes' info"); + private static final String TABLE_CHECKSUM_FORMAT = "%s.%s"; // . public HoodieTableConfig(FileSystem fs, String metaPath, String payloadClassName) { @@ -498,6 +509,19 @@ public Option getPartitionFields() { return Option.empty(); } + public Option getSecondaryIndexes() { + if (contains(SECONDARY_INDEXES)) { + String indexesStr = getString(SECONDARY_INDEXES); + try { + return Option.ofNullable(fromJsonString(indexesStr, SecondaryIndex[].class)); + } catch (Exception e) { + throw new HoodieSecondaryIndexException("Fail to parse secondary indexes", e); + } + } + + return Option.empty(); + } + /** * @returns the partition field prop. */ @@ -640,6 +664,21 @@ public Map propsMap() { .collect(Collectors.toMap(e -> String.valueOf(e.getKey()), e -> String.valueOf(e.getValue()))); } + private static T fromJsonString(String jsonStr, Class clazz) throws Exception { + if (jsonStr == null || jsonStr.isEmpty()) { + return clazz.newInstance(); + } + + return getObjectMapper().readValue(jsonStr, clazz); + } + + private static ObjectMapper getObjectMapper() { + ObjectMapper mapper = new ObjectMapper(); + mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); + return mapper; + } + /** * @deprecated Use {@link #BASE_FILE_FORMAT} and its methods. */ diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java index 9945eb0650feb..a2c6af0e55490 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java @@ -44,14 +44,14 @@ import org.apache.hudi.common.util.ValidationUtils; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.TableNotFoundException; +import org.apache.hudi.hadoop.CachingPath; +import org.apache.hudi.hadoop.SerializablePath; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; -import org.apache.hudi.hadoop.CachingPath; -import org.apache.hudi.hadoop.SerializablePath; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -741,6 +741,7 @@ public static class PropertyBuilder { private Boolean shouldDropPartitionColumns; private String metadataPartitions; private String inflightMetadataPartitions; + private String secondaryIndexFields; /** * Persist the configs that is written at the first time, and should not be changed. @@ -875,6 +876,11 @@ public PropertyBuilder setInflightMetadataPartitions(String partitions) { return this; } + public PropertyBuilder setSecondaryIndexFields(String secondaryIndexFields) { + this.secondaryIndexFields = secondaryIndexFields; + return this; + } + public PropertyBuilder set(String key, Object value) { if (HoodieTableConfig.PERSISTED_CONFIG_LIST.contains(key)) { this.others.put(key, value); @@ -883,7 +889,7 @@ public PropertyBuilder set(String key, Object value) { } public PropertyBuilder set(Map props) { - for (String key: HoodieTableConfig.PERSISTED_CONFIG_LIST) { + for (String key : HoodieTableConfig.PERSISTED_CONFIG_LIST) { Object value = props.get(key); if (value != null) { set(key, value); @@ -982,6 +988,9 @@ public PropertyBuilder fromProperties(Properties properties) { if (hoodieConfig.contains(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)) { setInflightMetadataPartitions(hoodieConfig.getString(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT)); } + if (hoodieConfig.contains(HoodieTableConfig.SECONDARY_INDEXES)) { + setSecondaryIndexFields(hoodieConfig.getString(HoodieTableConfig.SECONDARY_INDEXES)); + } return this; } @@ -1072,6 +1081,9 @@ public Properties build() { if (null != inflightMetadataPartitions) { tableConfig.setValue(HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT, inflightMetadataPartitions); } + if (null != secondaryIndexFields) { + tableConfig.setValue(HoodieTableConfig.SECONDARY_INDEXES, secondaryIndexFields); + } return tableConfig.getProps(); } diff --git a/hudi-common/src/main/java/org/apache/hudi/exception/HoodieSecondaryIndexException.java b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieSecondaryIndexException.java new file mode 100644 index 0000000000000..361416c9090a5 --- /dev/null +++ b/hudi-common/src/main/java/org/apache/hudi/exception/HoodieSecondaryIndexException.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.exception; + +public class HoodieSecondaryIndexException extends HoodieException { + public HoodieSecondaryIndexException(String message) { + super(message); + } + + public HoodieSecondaryIndexException(String message, Throwable t) { + super(message, t); + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 index 0cde14a4e4a0e..65e17bfb46209 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 +++ b/hudi-spark-datasource/hudi-spark/src/main/antlr4/org/apache/hudi/spark/sql/parser/HoodieSqlCommon.g4 @@ -48,6 +48,13 @@ statement : compactionStatement #compactionCommand | CALL multipartIdentifier '(' (callArgument (',' callArgument)*)? ')' #call + | CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE? + tableIdentifier (USING indexType=identifier)? + LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN + (OPTIONS indexOptions=propertyList)? #createIndex + | DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier #dropIndex + | SHOW INDEXES (FROM | IN) TABLE? tableIdentifier #showIndexes + | REFRESH INDEX identifier ON TABLE? tableIdentifier #refreshIndex | .*? #passThrough ; @@ -99,6 +106,14 @@ | MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral ; + multipartIdentifierPropertyList + : multipartIdentifierProperty (COMMA multipartIdentifierProperty)* + ; + + multipartIdentifierProperty + : multipartIdentifier (OPTIONS options=propertyList)? + ; + multipartIdentifier : parts+=identifier ('.' parts+=identifier)* ; @@ -114,9 +129,53 @@ ; nonReserved - : CALL | COMPACTION | RUN | SCHEDULE | ON | SHOW | LIMIT + : CALL + | COMPACTION + | CREATE + | DROP + | EXISTS + | FROM + | IN + | INDEX + | INDEXES + | IF + | LIMIT + | NOT + | ON + | OPTIONS + | REFRESH + | RUN + | SCHEDULE + | SHOW + | TABLE + | USING ; + propertyList + : LEFT_PAREN property (COMMA property)* RIGHT_PAREN + ; + + property + : key=propertyKey (EQ? value=propertyValue)? + ; + + propertyKey + : identifier (DOT identifier)* + | STRING + ; + + propertyValue + : INTEGER_VALUE + | DECIMAL_VALUE + | booleanValue + | STRING + ; + + LEFT_PAREN: '('; + RIGHT_PAREN: ')'; + COMMA: ','; + DOT: '.'; + ALL: 'ALL'; AT: 'AT'; CALL: 'CALL'; @@ -132,6 +191,21 @@ FALSE: 'FALSE'; INTERVAL: 'INTERVAL'; TO: 'TO'; + CREATE: 'CREATE'; + INDEX: 'INDEX'; + INDEXES: 'INDEXES'; + IF: 'IF'; + NOT: 'NOT'; + EXISTS: 'EXISTS'; + TABLE: 'TABLE'; + USING: 'USING'; + OPTIONS: 'OPTIONS'; + DROP: 'DROP'; + FROM: 'FROM'; + IN: 'IN'; + REFRESH: 'REFRESH'; + + EQ: '=' | '=='; PLUS: '+'; MINUS: '-'; diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala new file mode 100644 index 0000000000000..12ee2e8058343 --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Index.scala @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.catalyst.plans.logical + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.StringType + +/** + * The logical plan of the CREATE INDEX command. + */ +case class CreateIndex( + table: LogicalPlan, + indexName: String, + indexType: String, + ignoreIfExists: Boolean, + columns: Seq[(Attribute, Map[String, String])], + properties: Map[String, String], + override val output: Seq[Attribute] = CreateIndex.getOutputAttrs) extends Command { + + override def children: Seq[LogicalPlan] = Seq(table) + + override lazy val resolved: Boolean = table.resolved && columns.forall(_._1.resolved) + + def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): CreateIndex = { + copy(table = newChild.head) + } +} + +object CreateIndex { + def getOutputAttrs: Seq[Attribute] = Seq.empty +} + +/** + * The logical plan of the DROP INDEX command. + */ +case class DropIndex( + table: LogicalPlan, + indexName: String, + ignoreIfNotExists: Boolean, + override val output: Seq[Attribute] = DropIndex.getOutputAttrs) extends Command { + + override def children: Seq[LogicalPlan] = Seq(table) + + def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): DropIndex = { + copy(table = newChild.head) + } +} + +object DropIndex { + def getOutputAttrs: Seq[Attribute] = Seq.empty +} + +/** + * The logical plan of the SHOW INDEXES command. + */ +case class ShowIndexes( + table: LogicalPlan, + override val output: Seq[Attribute] = ShowIndexes.getOutputAttrs) extends Command { + + override def children: Seq[LogicalPlan] = Seq(table) + + def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): ShowIndexes = { + copy(table = newChild.head) + } +} + +object ShowIndexes { + def getOutputAttrs: Seq[Attribute] = Seq( + AttributeReference("index_name", StringType, nullable = false)(), + AttributeReference("col_name", StringType, nullable = false)(), + AttributeReference("index_type", StringType, nullable = false)(), + AttributeReference("col_options", StringType, nullable = true)(), + AttributeReference("options", StringType, nullable = true)() + ) +} + +/** + * The logical plan of the REFRESH INDEX command. + */ +case class RefreshIndex( + table: LogicalPlan, + indexName: String, + override val output: Seq[Attribute] = RefreshIndex.getOutputAttrs) extends Command { + + override def children: Seq[LogicalPlan] = Seq(table) + + def withNewChildrenInternal(newChild: IndexedSeq[LogicalPlan]): RefreshIndex = { + copy(table = newChild.head) + } +} + +object RefreshIndex { + def getOutputAttrs: Seq[Attribute] = Seq.empty +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala index dcacbef3a26fa..1d815d4118ddd 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/analysis/HoodieAnalysis.scala @@ -147,6 +147,27 @@ case class HoodieAnalysis(sparkSession: SparkSession) extends Rule[LogicalPlan] } else { c } + + // Convert to CreateIndexCommand + case CreateIndex(table, indexName, indexType, ignoreIfExists, columns, properties, output) + if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) => + CreateIndexCommand( + getTableIdentifier(table), indexName, indexType, ignoreIfExists, columns, properties, output) + + // Convert to DropIndexCommand + case DropIndex(table, indexName, ignoreIfNotExists, output) + if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) => + DropIndexCommand(getTableIdentifier(table), indexName, ignoreIfNotExists, output) + + // Convert to ShowIndexesCommand + case ShowIndexes(table, output) + if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) => + ShowIndexesCommand(getTableIdentifier(table), output) + + case RefreshIndex(table, indexName, output) + if table.resolved && sparkAdapter.isHoodieTable(table, sparkSession) => + RefreshIndexCommand(getTableIdentifier(table), indexName, output) + case _ => plan } } diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala new file mode 100644 index 0000000000000..5b1ec059fdcaa --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/IndexCommands.scala @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.command + +import com.fasterxml.jackson.annotation.{JsonAutoDetect, PropertyAccessor} +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import org.apache.hadoop.fs.Path +import org.apache.hudi.common.index.SecondaryIndex +import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient} +import org.apache.hudi.exception.HoodieSecondaryIndexException +import org.apache.spark.internal.Logging +import org.apache.spark.sql.catalyst.TableIdentifier +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation +import org.apache.spark.sql.{Row, SparkSession} + +import java.util +import java.util.Properties +import scala.collection.JavaConversions +import scala.collection.JavaConverters.mapAsJavaMapConverter + +case class CreateIndexCommand( + tableId: TableIdentifier, + indexName: String, + indexType: String, + ignoreIfExists: Boolean, + columns: Seq[(Attribute, Map[String, String])], + properties: Map[String, String], + override val output: Seq[Attribute]) extends IndexBaseCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val metaClient = createHoodieTableMetaClient(tableId, sparkSession) + val secondaryIndexes = toScalaOption(metaClient.getTableConfig.getSecondaryIndexes) + require(columns.size == 1, s"Only support single column secondary index now") + + val colNames = columns.map(_._1.name).iterator.toArray + if (indexExists(secondaryIndexes, indexName, Some(indexType), Some(colNames))) { + if (ignoreIfExists) { + Seq.empty + } else { + throw new HoodieSecondaryIndexException("Secondary index already exists: " + indexName) + } + } else { + val colOptions: java.util.Map[String, java.util.Map[String, String]] = + new util.HashMap[String, java.util.Map[String, String]]() + columns.map(c => if (c._2.nonEmpty) colOptions.put(c._1.name, c._2.asJava)) + val newSecondaryIndex = SecondaryIndex.builder() + .setIndexName(indexName) + .setColNames(colNames) + .setIndexType(indexType) + .setColOptions(colOptions) + .setOptions(properties.asJava) + .build() + + val newSecondaryIndexes = secondaryIndexes.map(_ :+ newSecondaryIndex).getOrElse(Array(newSecondaryIndex)) + val updatedProps = new Properties() + updatedProps.put(HoodieTableConfig.SECONDARY_INDEXES.key(), + getObjectMapper.writeValueAsString(newSecondaryIndexes.sortBy(_.getIndexName))) + HoodieTableConfig.update(metaClient.getFs, new Path(metaClient.getMetaPath), updatedProps) + logInfo(s"Success to create secondary index: $newSecondaryIndex") + + Seq.empty + } + } +} + +case class DropIndexCommand( + tableId: TableIdentifier, + indexName: String, + ignoreIfNotExists: Boolean, + override val output: Seq[Attribute]) extends IndexBaseCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val metaClient = createHoodieTableMetaClient(tableId, sparkSession) + val secondaryIndexes = toScalaOption(metaClient.getTableConfig.getSecondaryIndexes) + if (!indexExists(secondaryIndexes, indexName)) { + if (ignoreIfNotExists) { + Seq.empty + } else { + throw new HoodieSecondaryIndexException("Secondary index not exists: " + indexName) + } + } else { + val secondaryIndexesToKeep = + secondaryIndexes.map(_.filter(!_.getIndexName.equals(indexName))).filter(!_.isEmpty) + if (secondaryIndexesToKeep.nonEmpty) { + val updatedProps = new Properties() + updatedProps.put(HoodieTableConfig.SECONDARY_INDEXES.key(), + getObjectMapper.writeValueAsString(secondaryIndexesToKeep.get.sortBy(_.getIndexName))) + HoodieTableConfig.update(metaClient.getFs, new Path(metaClient.getMetaPath), updatedProps) + } else { + HoodieTableConfig.delete(metaClient.getFs, new Path(metaClient.getMetaPath), + JavaConversions.setAsJavaSet(Set(HoodieTableConfig.SECONDARY_INDEXES.key()))) + } + + // TODO: need to delete index files safely(index files maybe used by running queries) + logInfo(s"Success to drop secondary index: ${indexName}") + + Seq.empty + } + } +} + +case class ShowIndexesCommand( + tableId: TableIdentifier, + override val output: Seq[Attribute]) extends IndexBaseCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + val metaClient = createHoodieTableMetaClient(tableId, sparkSession) + val secondaryIndexes = metaClient.getTableConfig.getSecondaryIndexes + + val mapper = getObjectMapper + toScalaOption(secondaryIndexes).map(_.map(i => { + val colOptions = if (i.getColOptions.isEmpty) "" else mapper.writeValueAsString(i.getColOptions) + val options = if (i.getOptions.isEmpty) "" else mapper.writeValueAsString(i.getOptions) + Row(i.getIndexName, i.getColNames.mkString(","), i.getIndexType.name().toLowerCase, colOptions, options) + }).toSeq).getOrElse(Seq.empty[Row]) + } +} + +case class RefreshIndexCommand( + tableId: TableIdentifier, + indexName: String, + override val output: Seq[Attribute]) extends IndexBaseCommand { + + override def run(sparkSession: SparkSession): Seq[Row] = { + Seq.empty + } +} + +abstract class IndexBaseCommand extends HoodieLeafRunnableCommand with Logging { + + /** + * Create hoodie table meta client according to given table identifier and + * spark session + * + * @param tableId The table identifier + * @param sparkSession The spark session + * @return The hoodie table meta client + */ + def createHoodieTableMetaClient( + tableId: TableIdentifier, + sparkSession: SparkSession): HoodieTableMetaClient = { + val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(tableId) + val basePath = getTableLocation(catalogTable, sparkSession) + + HoodieTableMetaClient.builder() + .setConf(sparkSession.sqlContext.sparkContext.hadoopConfiguration) + .setBasePath(basePath) + .build() + } + + /** + * Check secondary index exists. In a hoodie table, secondary index name + * must be unique, so the index name will be checked firstly, + * + * @param secondaryIndexes Current secondary indexes + * @param indexName The index name to be checked + * @param colNames The column names to be checked + * @return true if the index exists + */ + def indexExists( + secondaryIndexes: Option[Array[SecondaryIndex]], + indexName: String, + indexType: Option[String] = None, + colNames: Option[Array[String]] = None): Boolean = { + secondaryIndexes.exists(i => { + i.exists(_.getIndexName.equals(indexName)) || + // Index type and column name need to be checked if present + indexType.exists(t => + colNames.exists(c => + i.exists(index => + index.getIndexType.name().equalsIgnoreCase(t) && index.getColNames.sameElements(c)))) + }) + } + + protected def toScalaOption[T](opt: org.apache.hudi.common.util.Option[T]): Option[T] = + if (opt.isPresent) Some(opt.get) else None + + protected def getObjectMapper: ObjectMapper = { + val mapper = new ObjectMapper + mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) + mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY) + mapper + } +} diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala index 771798dd225e9..d0e5ed6133856 100644 --- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala +++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/parser/HoodieSqlCommonAstBuilder.scala @@ -25,11 +25,12 @@ import org.apache.hudi.spark.sql.parser.HoodieSqlCommonParser._ import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRelation} import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.catalyst.parser.{ParseException, ParserInterface, ParserUtils} import org.apache.spark.sql.catalyst.plans.logical._ +import java.util.Locale import scala.collection.JavaConverters._ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface) @@ -147,4 +148,144 @@ class HoodieSqlCommonAstBuilder(session: SparkSession, delegate: ParserInterface private def typedVisit[T](ctx: ParseTree): T = { ctx.accept(this).asInstanceOf[T] } + + /** + * Create an index, returning a [[CreateIndex]] logical plan. + * For example: + * {{{ + * CREATE INDEX index_name ON [TABLE] table_name [USING index_type] (column_index_property_list) + * [OPTIONS indexPropertyList] + * column_index_property_list: column_name [OPTIONS(indexPropertyList)] [ , . . . ] + * indexPropertyList: index_property_name [= index_property_value] [ , . . . ] + * }}} + */ + override def visitCreateIndex(ctx: CreateIndexContext): LogicalPlan = withOrigin(ctx) { + val (indexName, indexType) = if (ctx.identifier.size() == 1) { + (ctx.identifier(0).getText, "") + } else { + (ctx.identifier(0).getText, ctx.identifier(1).getText) + } + + val columns = ctx.columns.multipartIdentifierProperty.asScala + .map(_.multipartIdentifier).map(typedVisit[Seq[String]]).toSeq + val columnsProperties = ctx.columns.multipartIdentifierProperty.asScala + .map(x => (Option(x.options).map(visitPropertyKeyValues).getOrElse(Map.empty))).toSeq + val options = Option(ctx.indexOptions).map(visitPropertyKeyValues).getOrElse(Map.empty) + + CreateIndex( + visitTableIdentifier(ctx.tableIdentifier()), + indexName, + indexType, + ctx.EXISTS != null, + columns.map(UnresolvedAttribute(_)).zip(columnsProperties), + options) + } + + /** + * Drop an index, returning a [[DropIndex]] logical plan. + * For example: + * {{{ + * DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name + * }}} + */ + override def visitDropIndex(ctx: DropIndexContext): LogicalPlan = withOrigin(ctx) { + val indexName = ctx.identifier.getText + DropIndex( + visitTableIdentifier(ctx.tableIdentifier()), + indexName, + ctx.EXISTS != null) + } + + /** + * Show indexes, returning a [[ShowIndexes]] logical plan. + * For example: + * {{{ + * SHOW INDEXES (FROM | IN) [TABLE] table_name + * }}} + */ + override def visitShowIndexes(ctx: ShowIndexesContext): LogicalPlan = withOrigin(ctx) { + ShowIndexes(visitTableIdentifier(ctx.tableIdentifier())) + } + + /** + * Refresh index, returning a [[RefreshIndex]] logical plan + * For example: + * {{{ + * REFRESH INDEX index_name ON [TABLE] table_name + * }}} + */ + override def visitRefreshIndex(ctx: RefreshIndexContext): LogicalPlan = withOrigin(ctx) { + RefreshIndex(visitTableIdentifier(ctx.tableIdentifier()), ctx.identifier.getText) + } + + /** + * Convert a property list into a key-value map. + * This should be called through [[visitPropertyKeyValues]] or [[visitPropertyKeys]]. + */ + override def visitPropertyList( + ctx: PropertyListContext): Map[String, String] = withOrigin(ctx) { + val properties = ctx.property.asScala.map { property => + val key = visitPropertyKey(property.key) + val value = visitPropertyValue(property.value) + key -> value + } + // Check for duplicate property names. + checkDuplicateKeys(properties.toSeq, ctx) + properties.toMap + } + + /** + * Parse a key-value map from a [[PropertyListContext]], assuming all values are specified. + */ + def visitPropertyKeyValues(ctx: PropertyListContext): Map[String, String] = { + val props = visitPropertyList(ctx) + val badKeys = props.collect { case (key, null) => key } + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values must be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props + } + + /** + * Parse a list of keys from a [[PropertyListContext]], assuming no values are specified. + */ + def visitPropertyKeys(ctx: PropertyListContext): Seq[String] = { + val props = visitPropertyList(ctx) + val badKeys = props.filter { case (_, v) => v != null }.keys + if (badKeys.nonEmpty) { + operationNotAllowed( + s"Values should not be specified for key(s): ${badKeys.mkString("[", ",", "]")}", ctx) + } + props.keys.toSeq + } + + /** + * A property key can either be String or a collection of dot separated elements. This + * function extracts the property key based on whether its a string literal or a property + * identifier. + */ + override def visitPropertyKey(key: PropertyKeyContext): String = { + if (key.STRING != null) { + string(key.STRING) + } else { + key.getText + } + } + + /** + * A property value can be String, Integer, Boolean or Decimal. This function extracts + * the property value based on whether its a string, integer, boolean or decimal literal. + */ + override def visitPropertyValue(value: PropertyValueContext): String = { + if (value == null) { + null + } else if (value.STRING != null) { + string(value.STRING) + } else if (value.booleanValue != null) { + value.getText.toLowerCase(Locale.ROOT) + } else { + value.getText + } + } } diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala new file mode 100644 index 0000000000000..af688b0f3281a --- /dev/null +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/command/index/TestSecondaryIndex.scala @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.hudi.command.index + +import org.apache.spark.sql.hudi.TestHoodieSqlBase + +class TestSecondaryIndex extends TestHoodieSqlBase { + test("Test Create/Show/Drop Secondary Index") { + withTempDir { tmp => + Seq("cow", "mor").foreach { tableType => + val tableName = generateTableName + val basePath = s"${tmp.getCanonicalPath}/$tableName" + spark.sql( + s""" + |create table $tableName ( + | id int, + | name string, + | price double, + | ts long + |) using hudi + | options ( + | primaryKey ='id', + | type = '$tableType', + | preCombineField = 'ts' + | ) + | partitioned by(ts) + | location '$basePath' + """.stripMargin) + spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)") + spark.sql(s"insert into $tableName values(2, 'a2', 10, 1001)") + spark.sql(s"insert into $tableName values(3, 'a3', 10, 1002)") + checkAnswer(s"show indexes from default.$tableName")() + + checkAnswer(s"create index idx_name on $tableName using lucene (name) options(block_size=1024)")() + checkAnswer(s"create index idx_price on $tableName using lucene (price options(order='desc')) options(block_size=512)")() + + // Create an index with multiple columns + checkException(s"create index idx_id_ts on $tableName using lucene (id, ts)")( + "requirement failed: Only support single column secondary index now" + ) + + // Create an index with the occupied name + checkException(s"create index idx_price on $tableName using lucene (price)")( + "Secondary index already exists: idx_price" + ) + + // Create indexes repeatedly on columns(index name is different, but the index type and involved column is same) + checkException(s"create index idx_price_1 on $tableName using lucene (price)")( + "Secondary index already exists: idx_price_1" + ) + + spark.sql(s"show indexes from $tableName").show() + checkAnswer(s"show indexes from $tableName")( + Seq("idx_name", "name", "lucene", "", "{\"block_size\":\"1024\"}"), + Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}") + ) + + checkAnswer(s"drop index idx_name on $tableName")() + checkException(s"drop index idx_name on $tableName")("Secondary index not exists: idx_name") + + spark.sql(s"show indexes from $tableName").show() + checkAnswer(s"show indexes from $tableName")( + Seq("idx_price", "price", "lucene", "{\"price\":{\"order\":\"desc\"}}", "{\"block_size\":\"512\"}") + ) + + checkAnswer(s"drop index idx_price on $tableName")() + checkAnswer(s"show indexes from $tableName")() + + checkException(s"drop index idx_price on $tableName")("Secondary index not exists: idx_price") + } + } + } +}