Skip to content

Commit

Permalink
Add Flint drop index SQL support (#1730)
Browse files Browse the repository at this point in the history
* Add ANTLR grammar file

Signed-off-by: Chen Dai <daichen@amazon.com>

* Add Flint command builder and IT

Signed-off-by: Chen Dai <daichen@amazon.com>

* Refactor flint parser logic

Signed-off-by: Chen Dai <daichen@amazon.com>

* Polish comments for PR review

Signed-off-by: Chen Dai <daichen@amazon.com>

* Fix compile error in IT

Signed-off-by: Chen Dai <daichen@amazon.com>

---------

Signed-off-by: Chen Dai <daichen@amazon.com>
  • Loading branch information
dai-chen authored Jun 21, 2023
1 parent 65583d6 commit ae3389d
Show file tree
Hide file tree
Showing 9 changed files with 488 additions and 2 deletions.
8 changes: 7 additions & 1 deletion flint/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ lazy val flintCore = (project in file("flint-core"))

lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
.dependsOn(flintCore)
.enablePlugins(AssemblyPlugin)
.enablePlugins(AssemblyPlugin, Antlr4Plugin)
.settings(
commonSettings,
name := "flint-spark-integration",
Expand All @@ -70,6 +70,12 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))
"org.scalatestplus" %% "mockito-4-6" % "3.2.15.0" % "test",
"com.github.sbt" % "junit-interface" % "0.13.3" % "test"),
libraryDependencies ++= deps(sparkVersion),
// ANTLR settings
Antlr4 / antlr4Version := "4.8",
Antlr4 / antlr4PackageName := Some("org.opensearch.flint.spark.sql"),
Antlr4 / antlr4GenListener := true,
Antlr4 / antlr4GenVisitor := true,
// Assembly settings
assemblyPackageScala / assembleArtifact := false,
assembly / assemblyOption ~= {
_.withIncludeScala(false)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

grammar FlintSparkSqlExtensions;

import SparkSqlBase;


// Flint SQL Syntax Extension

singleStatement
: statement SEMICOLON* EOF
;

statement
: skippingIndexStatement
;

skippingIndexStatement
: dropSkippingIndexStatement
;

dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName=multipartIdentifier
;
155 changes: 155 additions & 0 deletions flint/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

/*
* This file contains code from the Apache Spark project (original license below).
* It contains modifications, which are licensed as above:
*/

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

grammar SparkSqlBase;

// Copy from Spark 3.3.1 SqlBaseParser.g4 and SqlBaseLexer.g4

@members {
/**
* When true, parser should throw ParseExcetion for unclosed bracketed comment.
*/
public boolean has_unclosed_bracketed_comment = false;
/**
* Verify whether current token is a valid decimal token (which contains dot).
* Returns true if the character that follows the token is not a digit or letter or underscore.
*
* For example:
* For char stream "2.3", "2." is not a valid decimal token, because it is followed by digit '3'.
* For char stream "2.3_", "2.3" is not a valid decimal token, because it is followed by '_'.
* For char stream "2.3W", "2.3" is not a valid decimal token, because it is followed by 'W'.
* For char stream "12.0D 34.E2+0.12 " 12.0D is a valid decimal token because it is followed
* by a space. 34.E2 is a valid decimal token because it is followed by symbol '+'
* which is not a digit or letter or underscore.
*/
public boolean isValidDecimal() {
int nextChar = _input.LA(1);
if (nextChar >= 'A' && nextChar <= 'Z' || nextChar >= '0' && nextChar <= '9' ||
nextChar == '_') {
return false;
} else {
return true;
}
}

/**
* This method will be called when we see '/*' and try to match it as a bracketed comment.
* If the next character is '+', it should be parsed as hint later, and we cannot match
* it as a bracketed comment.
*
* Returns true if the next character is '+'.
*/
public boolean isHint() {
int nextChar = _input.LA(1);
if (nextChar == '+') {
return true;
} else {
return false;
}
}

/**
* This method will be called when the character stream ends and try to find out the
* unclosed bracketed comment.
* If the method be called, it means the end of the entire character stream match,
* and we set the flag and fail later.
*/
public void markUnclosedComment() {
has_unclosed_bracketed_comment = true;
}
}


multipartIdentifier
: parts+=identifier (DOT parts+=identifier)*
;

identifier
: IDENTIFIER #unquotedIdentifier
| quotedIdentifier #quotedIdentifierAlternative
| nonReserved #unquotedIdentifier
;

quotedIdentifier
: BACKQUOTED_IDENTIFIER
;

nonReserved
: DROP | SKIPPING | INDEX
;


// Flint lexical tokens

SKIPPING : 'SKIPPING';


// Spark lexical tokens

SEMICOLON: ';';

DOT: '.';
DROP: 'DROP';
INDEX: 'INDEX';
MINUS: '-';
ON: 'ON';

IDENTIFIER
: (LETTER | DIGIT | '_')+
;

BACKQUOTED_IDENTIFIER
: '`' ( ~'`' | '``' )* '`'
;

fragment DIGIT
: [0-9]
;

fragment LETTER
: [A-Z]
;

SIMPLE_COMMENT
: '--' ('\\\n' | ~[\r\n])* '\r'? '\n'? -> channel(HIDDEN)
;

BRACKETED_COMMENT
: '/*' {!isHint()}? ( BRACKETED_COMMENT | . )*? ('*/' | {markUnclosedComment();} EOF) -> channel(HIDDEN)
;

WS
: [ \r\n\t]+ -> channel(HIDDEN)
;

// Catch-all for anything we can't recognize.
// We use this to be able to ignore and recover all the text
// when splitting statements with DelimiterLexer
UNRECOGNIZED
: .
;
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.skipping.ApplyFlintSparkSkippingIndex
import org.opensearch.flint.spark.sql.FlintSparkSqlParser

import org.apache.spark.sql.SparkSessionExtensions

Expand All @@ -15,6 +15,9 @@ import org.apache.spark.sql.SparkSessionExtensions
class FlintSparkExtensions extends (SparkSessionExtensions => Unit) {

override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectParser { (spark, parser) =>
new FlintSparkSqlParser(parser)
}
extensions.injectOptimizerRule { spark =>
new FlintSparkOptimizer(spark)
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql

import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.opensearch.flint.spark.sql.FlintSparkSqlExtensionsParser.DropSkippingIndexStatementContext

import org.apache.spark.sql.catalyst.plans.logical.Command

/**
* Flint Spark AST builder that builds Spark command for Flint index statement.
*/
class FlintSparkSqlAstBuilder extends FlintSparkSqlExtensionsBaseVisitor[Command] {

override def visitDropSkippingIndexStatement(
ctx: DropSkippingIndexStatementContext): Command = {
FlintSparkSqlCommand { flint =>
val tableName = ctx.tableName.getText // TODO: handle schema name
val indexName = getSkippingIndexName(tableName)
flint.deleteIndex(indexName)
Seq.empty
}
}

override def aggregateResult(aggregate: Command, nextResult: Command): Command =
if (nextResult != null) nextResult else aggregate;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark.sql

import org.opensearch.flint.spark.FlintSpark

import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.execution.command.LeafRunnableCommand

/**
* Flint Spark SQL DDL command.
*
* Note that currently Flint SQL layer is thin with all core logic in FlintSpark. May create
* separate command for each Flint SQL statement in future as needed.
*
* @param block
* code block that triggers Flint core API
*/
case class FlintSparkSqlCommand(block: FlintSpark => Seq[Row]) extends LeafRunnableCommand {

override def run(sparkSession: SparkSession): Seq[Row] = block(new FlintSpark(sparkSession))
}
Loading

0 comments on commit ae3389d

Please sign in to comment.