Skip to content

Commit

Permalink
[HUDI-6963] Fix class conflict of CreateIndex from Spark3.3
Browse files Browse the repository at this point in the history
  • Loading branch information
boneanxs committed Oct 26, 2023
1 parent 8bf44c0 commit f7c1981
Show file tree
Hide file tree
Showing 26 changed files with 853 additions and 326 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.JoinType
import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan}
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.internal.SQLConf

trait HoodieCatalystPlansUtils {
Expand Down Expand Up @@ -79,6 +78,25 @@ trait HoodieCatalystPlansUtils {
*/
def unapplyMergeIntoTable(plan: LogicalPlan): Option[(LogicalPlan, LogicalPlan, Expression)]

/**
* Decomposes [[MatchCreateIndex]] into its arguments with accommodation.
*/
def unapplyCreateIndex(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])]

/**
* Decomposes [[MatchDropIndex]] into its arguments with accommodation.
*/
def unapplyDropIndex(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)]

/**
* Decomposes [[MatchShowIndexes]] into its arguments with accommodation.
*/
def unapplyShowIndexes(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])]

/**
* Decomposes [[MatchRefreshIndex]] into its arguments with accommodation.
*/
def unapplyRefreshIndex(plan: LogicalPlan): Option[(LogicalPlan, String)]

/**
* Spark requires file formats to append the partition path fields to the end of the schema.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,6 @@
statement
: compactionStatement #compactionCommand
| CALL multipartIdentifier callArgumentList? #call
| CREATE INDEX (IF NOT EXISTS)? identifier ON TABLE?
tableIdentifier (USING indexType=identifier)?
LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
(OPTIONS indexOptions=propertyList)? #createIndex
| DROP INDEX (IF EXISTS)? identifier ON TABLE? tableIdentifier #dropIndex
| SHOW INDEXES (FROM | IN) TABLE? tableIdentifier #showIndexes
| REFRESH INDEX identifier ON TABLE? tableIdentifier #refreshIndex
| .*? #passThrough
;

Expand Down Expand Up @@ -110,14 +103,6 @@
| MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral
;

multipartIdentifierPropertyList
: multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
;

multipartIdentifierProperty
: multipartIdentifier (OPTIONS options=propertyList)?
;

multipartIdentifier
: parts+=identifier ('.' parts+=identifier)*
;
Expand All @@ -135,51 +120,13 @@
nonReserved
: CALL
| COMPACTION
| CREATE
| DROP
| EXISTS
| FROM
| IN
| INDEX
| INDEXES
| IF
| LIMIT
| NOT
| ON
| OPTIONS
| REFRESH
| RUN
| SCHEDULE
| SHOW
| TABLE
| USING
;

propertyList
: LEFT_PAREN property (COMMA property)* RIGHT_PAREN
;

property
: key=propertyKey (EQ? value=propertyValue)?
;

propertyKey
: identifier (DOT identifier)*
| STRING
;

propertyValue
: INTEGER_VALUE
| DECIMAL_VALUE
| booleanValue
| STRING
;

LEFT_PAREN: '(';
RIGHT_PAREN: ')';
COMMA: ',';
DOT: '.';

ALL: 'ALL';
AT: 'AT';
CALL: 'CALL';
Expand All @@ -195,21 +142,6 @@
FALSE: 'FALSE';
INTERVAL: 'INTERVAL';
TO: 'TO';
CREATE: 'CREATE';
INDEX: 'INDEX';
INDEXES: 'INDEXES';
IF: 'IF';
NOT: 'NOT';
EXISTS: 'EXISTS';
TABLE: 'TABLE';
USING: 'USING';
OPTIONS: 'OPTIONS';
DROP: 'DROP';
FROM: 'FROM';
IN: 'IN';
REFRESH: 'REFRESH';

EQ: '=' | '==';

PLUS: '+';
MINUS: '-';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.{isMetaField, removeMetaFields}
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateTableLike, MatchInsertIntoStatement, MatchMergeIntoTable, ResolvesToHudiTable, sparkAdapter}
import org.apache.spark.sql.hudi.analysis.HoodieAnalysis.{MatchCreateIndex, MatchCreateTableLike, MatchDropIndex, MatchInsertIntoStatement, MatchMergeIntoTable, MatchRefreshIndex, MatchShowIndexes, ResolvesToHudiTable, sparkAdapter}
import org.apache.spark.sql.hudi.command._
import org.apache.spark.sql.hudi.command.procedures.{HoodieProcedures, Procedure, ProcedureArgs}
import org.apache.spark.sql.{AnalysisException, SparkSession}
Expand Down Expand Up @@ -354,6 +354,26 @@ object HoodieAnalysis extends SparkAdapterSupport {
sparkAdapter.getCatalystPlanUtils.unapplyCreateTableLikeCommand(plan)
}

private[sql] object MatchCreateIndex {
def unapply(plan: LogicalPlan): Option[(LogicalPlan, String, String, Boolean, Seq[(Seq[String], Map[String, String])], Map[String, String])] =
sparkAdapter.getCatalystPlanUtils.unapplyCreateIndex(plan)
}

private[sql] object MatchDropIndex {
def unapply(plan: LogicalPlan): Option[(LogicalPlan, String, Boolean)] =
sparkAdapter.getCatalystPlanUtils.unapplyDropIndex(plan)
}

private[sql] object MatchShowIndexes {
def unapply(plan: LogicalPlan): Option[(LogicalPlan, Seq[Attribute])] =
sparkAdapter.getCatalystPlanUtils.unapplyShowIndexes(plan)
}

private[sql] object MatchRefreshIndex {
def unapply(plan: LogicalPlan): Option[(LogicalPlan, String)] =
sparkAdapter.getCatalystPlanUtils.unapplyRefreshIndex(plan)
}

private[sql] def failAnalysis(msg: String): Nothing = {
throw new AnalysisException(msg)
}
Expand Down Expand Up @@ -442,21 +462,20 @@ case class ResolveImplementations() extends Rule[LogicalPlan] {
}

// Convert to CreateIndexCommand
case ci @ CreateIndex(plan @ ResolvesToHudiTable(table), indexName, indexType, ignoreIfExists, columns, options, output) =>
// TODO need to resolve columns
CreateIndexCommand(table, indexName, indexType, ignoreIfExists, columns, options, output)
case ci @ MatchCreateIndex(plan @ ResolvesToHudiTable(table), indexName, indexType, ignoreIfExists, columns, options) if ci.resolved =>
CreateIndexCommand(table, indexName, indexType, ignoreIfExists, columns, options)

// Convert to DropIndexCommand
case di @ DropIndex(plan @ ResolvesToHudiTable(table), indexName, ignoreIfNotExists, output) if di.resolved =>
DropIndexCommand(table, indexName, ignoreIfNotExists, output)
case di @ MatchDropIndex(plan @ ResolvesToHudiTable(table), indexName, ignoreIfNotExists) if di.resolved =>
DropIndexCommand(table, indexName, ignoreIfNotExists)

// Convert to ShowIndexesCommand
case si @ ShowIndexes(plan @ ResolvesToHudiTable(table), output) if si.resolved =>
case si @ MatchShowIndexes(plan @ ResolvesToHudiTable(table), output) if si.resolved =>
ShowIndexesCommand(table, output)

// Covert to RefreshCommand
case ri @ RefreshIndex(plan @ ResolvesToHudiTable(table), indexName, output) if ri.resolved =>
RefreshIndexCommand(table, indexName, output)
case ri @ MatchRefreshIndex(plan @ ResolvesToHudiTable(table), indexName) if ri.resolved =>
RefreshIndexCommand(table, indexName)

case _ => plan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@

package org.apache.spark.sql.hudi.command

import com.fasterxml.jackson.annotation.{JsonAutoDetect, PropertyAccessor}
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import org.apache.hudi.HoodieConversionUtils.toScalaOption
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.JsonUtils
import org.apache.hudi.secondary.index.SecondaryIndexManager
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.catalog.CatalogTable
Expand All @@ -32,23 +31,21 @@ import org.apache.spark.sql.hudi.HoodieSqlCommonUtils.getTableLocation
import org.apache.spark.sql.{Row, SparkSession}

import java.util

import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, mapAsJavaMapConverter}

case class CreateIndexCommand(table: CatalogTable,
indexName: String,
indexType: String,
ignoreIfExists: Boolean,
columns: Seq[(Attribute, Map[String, String])],
options: Map[String, String],
override val output: Seq[Attribute]) extends IndexBaseCommand {
columns: Seq[(Seq[String], Map[String, String])],
options: Map[String, String]) extends IndexBaseCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableId = table.identifier
val metaClient = createHoodieTableMetaClient(tableId, sparkSession)
val columnsMap: java.util.LinkedHashMap[String, java.util.Map[String, String]] =
new util.LinkedHashMap[String, java.util.Map[String, String]]()
columns.map(c => columnsMap.put(c._1.name, c._2.asJava))
columns.map(c => columnsMap.put(c._1.mkString("."), c._2.asJava))

SecondaryIndexManager.getInstance().create(
metaClient, indexName, indexType, ignoreIfExists, columnsMap, options.asJava)
Expand All @@ -65,8 +62,7 @@ case class CreateIndexCommand(table: CatalogTable,

case class DropIndexCommand(table: CatalogTable,
indexName: String,
ignoreIfNotExists: Boolean,
override val output: Seq[Attribute]) extends IndexBaseCommand {
ignoreIfNotExists: Boolean) extends IndexBaseCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val tableId = table.identifier
Expand All @@ -90,7 +86,7 @@ case class ShowIndexesCommand(table: CatalogTable,
val metaClient = createHoodieTableMetaClient(table.identifier, sparkSession)
val secondaryIndexes = SecondaryIndexManager.getInstance().show(metaClient)

val mapper = getObjectMapper
val mapper = JsonUtils.getObjectMapper
toScalaOption(secondaryIndexes).map(x =>
x.asScala.map(i => {
val colOptions =
Expand All @@ -100,18 +96,10 @@ case class ShowIndexesCommand(table: CatalogTable,
i.getIndexType.name().toLowerCase, colOptions, options)
}).toSeq).getOrElse(Seq.empty[Row])
}

protected def getObjectMapper: ObjectMapper = {
val mapper = new ObjectMapper
mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES)
mapper.setVisibility(PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY)
mapper
}
}

case class RefreshIndexCommand(table: CatalogTable,
indexName: String,
override val output: Seq[Attribute]) extends IndexBaseCommand {
indexName: String) extends IndexBaseCommand {

override def run(sparkSession: SparkSession): Seq[Row] = {
val metaClient = createHoodieTableMetaClient(table.identifier, sparkSession)
Expand Down
Loading

0 comments on commit f7c1981

Please sign in to comment.