Skip to content

Commit

Permalink
[Feature] Flint query scheduler part1 - integrate job scheduler plugin (
Browse files Browse the repository at this point in the history
opensearch-project#2834)

* [Feature] Flint query scheduler part1 - integrate job scheduler plugin

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Add comments

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Add unit test

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Remove test rest API

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Fix doc test

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Add more tests

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Fix IT

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Fix IT with security

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Improve test coverage

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Fix integTest cluster

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Fix UT

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Update UT

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Fix bwc test

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Resolve comments

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Fix bwc test

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* clean up doc test

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Resolve comments

Signed-off-by: Louis Chu <clingzhi@amazon.com>

* Fix UT

Signed-off-by: Louis Chu <clingzhi@amazon.com>

---------

Signed-off-by: Louis Chu <clingzhi@amazon.com>
(cherry picked from commit 3daf64f)
  • Loading branch information
noCharger committed Aug 1, 2024
1 parent 3030ad5 commit b02895c
Show file tree
Hide file tree
Showing 27 changed files with 1,443 additions and 52 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ gen
.worktrees
http-client.env.json
/doctest/sql-cli/
/doctest/opensearch-job-scheduler/
.factorypath
5 changes: 4 additions & 1 deletion async-query-core/src/main/antlr/FlintSparkSqlExtensions.g4
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,10 @@ indexManagementStatement
;

showFlintIndexStatement
: SHOW FLINT (INDEX | INDEXES) IN catalogDb=multipartIdentifier
: SHOW FLINT (INDEX | INDEXES)
IN catalogDb=multipartIdentifier #showFlintIndex
| SHOW FLINT (INDEX | INDEXES) EXTENDED
IN catalogDb=multipartIdentifier #showFlintIndexExtended
;

indexJobManagementStatement
Expand Down
1 change: 1 addition & 0 deletions async-query-core/src/main/antlr/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ DESC: 'DESC';
DESCRIBE: 'DESCRIBE';
DROP: 'DROP';
EXISTS: 'EXISTS';
EXTENDED: 'EXTENDED';
FALSE: 'FALSE';
FLINT: 'FLINT';
IF: 'IF';
Expand Down
1 change: 1 addition & 0 deletions async-query-core/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ NANOSECOND: 'NANOSECOND';
NANOSECONDS: 'NANOSECONDS';
NATURAL: 'NATURAL';
NO: 'NO';
NONE: 'NONE';
NOT: 'NOT';
NULL: 'NULL';
NULLS: 'NULLS';
Expand Down
74 changes: 52 additions & 22 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ singleCompoundStatement
;

beginEndCompoundBlock
: BEGIN compoundBody END
: beginLabel? BEGIN compoundBody END endLabel?
;

compoundBody
Expand All @@ -61,11 +61,26 @@ compoundBody

compoundStatement
: statement
| setStatementWithOptionalVarKeyword
| beginEndCompoundBlock
;

setStatementWithOptionalVarKeyword
: SET variable? assignmentList #setVariableWithOptionalKeyword
| SET variable? LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
LEFT_PAREN query RIGHT_PAREN #setVariableWithOptionalKeyword
;

singleStatement
: statement SEMICOLON* EOF
: (statement|setResetStatement) SEMICOLON* EOF
;

beginLabel
: multipartIdentifier COLON
;

endLabel
: multipartIdentifier
;

singleExpression
Expand Down Expand Up @@ -114,7 +129,6 @@ statement
| SHOW namespaces ((FROM | IN) multipartIdentifier)?
(LIKE? pattern=stringLit)? #showNamespaces
| createTableHeader (LEFT_PAREN colDefinitionList RIGHT_PAREN)? tableProvider?
| createTableHeader (LEFT_PAREN createOrReplaceTableColTypeList RIGHT_PAREN)? tableProvider?
createTableClauses
(AS? query)? #createTable
| CREATE TABLE (IF errorCapturingNot EXISTS)? target=tableIdentifier
Expand Down Expand Up @@ -175,6 +189,8 @@ statement
| ALTER TABLE identifierReference
(partitionSpec)? SET locationSpec #setTableLocation
| ALTER TABLE identifierReference RECOVER PARTITIONS #recoverPartitions
| ALTER TABLE identifierReference
(clusterBySpec | CLUSTER BY NONE) #alterClusterBy
| DROP TABLE (IF EXISTS)? identifierReference PURGE? #dropTable
| DROP VIEW (IF EXISTS)? identifierReference #dropView
| CREATE (OR REPLACE)? (GLOBAL? TEMPORARY)?
Expand All @@ -194,16 +210,16 @@ statement
identifierReference AS className=stringLit
(USING resource (COMMA resource)*)? #createFunction
| CREATE (OR REPLACE)? TEMPORARY? FUNCTION (IF errorCapturingNot EXISTS)?
identifierReference LEFT_PAREN parameters=colDefinitionList? RIGHT_PAREN
(RETURNS (dataType | TABLE LEFT_PAREN returnParams=colTypeList RIGHT_PAREN))?
routineCharacteristics
RETURN (query | expression) #createUserDefinedFunction
identifierReference LEFT_PAREN parameters=colDefinitionList? RIGHT_PAREN
(RETURNS (dataType | TABLE LEFT_PAREN returnParams=colTypeList RIGHT_PAREN))?
routineCharacteristics
RETURN (query | expression) #createUserDefinedFunction
| DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference #dropFunction
| DECLARE (OR REPLACE)? VARIABLE?
| DECLARE (OR REPLACE)? variable?
identifierReference dataType? variableDefaultExpression? #createVariable
| DROP TEMPORARY VARIABLE (IF EXISTS)? identifierReference #dropVariable
| DROP TEMPORARY variable (IF EXISTS)? identifierReference #dropVariable
| EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?
statement #explain
(statement|setResetStatement) #explain
| SHOW TABLES ((FROM | IN) identifierReference)?
(LIKE? pattern=stringLit)? #showTables
| SHOW TABLE EXTENDED ((FROM | IN) ns=identifierReference)?
Expand Down Expand Up @@ -242,26 +258,29 @@ statement
| (MSCK)? REPAIR TABLE identifierReference
(option=(ADD|DROP|SYNC) PARTITIONS)? #repairTable
| op=(ADD | LIST) identifier .*? #manageResource
| SET COLLATION collationName=identifier #setCollation
| SET ROLE .*? #failNativeCommand
| CREATE INDEX (IF errorCapturingNot EXISTS)? identifier ON TABLE?
identifierReference (USING indexType=identifier)?
LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
(OPTIONS options=propertyList)? #createIndex
| DROP INDEX (IF EXISTS)? identifier ON TABLE? identifierReference #dropIndex
| unsupportedHiveNativeCommands .*? #failNativeCommand
;

setResetStatement
: SET COLLATION collationName=identifier #setCollation
| SET ROLE .*? #failSetRole
| SET TIME ZONE interval #setTimeZone
| SET TIME ZONE timezone #setTimeZone
| SET TIME ZONE .*? #setTimeZone
| SET (VARIABLE | VAR) assignmentList #setVariable
| SET (VARIABLE | VAR) LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
LEFT_PAREN query RIGHT_PAREN #setVariable
| SET variable assignmentList #setVariable
| SET variable LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
LEFT_PAREN query RIGHT_PAREN #setVariable
| SET configKey EQ configValue #setQuotedConfiguration
| SET configKey (EQ .*?)? #setConfiguration
| SET .*? EQ configValue #setQuotedConfiguration
| SET .*? #setConfiguration
| RESET configKey #resetQuotedConfiguration
| RESET .*? #resetConfiguration
| CREATE INDEX (IF errorCapturingNot EXISTS)? identifier ON TABLE?
identifierReference (USING indexType=identifier)?
LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
(OPTIONS options=propertyList)? #createIndex
| DROP INDEX (IF EXISTS)? identifier ON TABLE? identifierReference #dropIndex
| unsupportedHiveNativeCommands .*? #failNativeCommand
;

executeImmediate
Expand Down Expand Up @@ -419,6 +438,11 @@ namespaces
| SCHEMAS
;

variable
: VARIABLE
| VAR
;

describeFuncName
: identifierReference
| stringLit
Expand Down Expand Up @@ -854,13 +878,17 @@ identifierComment

relationPrimary
: identifierReference temporalClause?
sample? tableAlias #tableName
optionsClause? sample? tableAlias #tableName
| LEFT_PAREN query RIGHT_PAREN sample? tableAlias #aliasedQuery
| LEFT_PAREN relation RIGHT_PAREN sample? tableAlias #aliasedRelation
| inlineTable #inlineTableDefault2
| functionTable #tableValuedFunction
;

optionsClause
: WITH options=propertyList
;

inlineTable
: VALUES expression (COMMA expression)* tableAlias
;
Expand Down Expand Up @@ -1573,6 +1601,7 @@ ansiNonReserved
| NANOSECOND
| NANOSECONDS
| NO
| NONE
| NULLS
| NUMERIC
| OF
Expand Down Expand Up @@ -1921,6 +1950,7 @@ nonReserved
| NANOSECOND
| NANOSECONDS
| NO
| NONE
| NOT
| NULL
| NULLS
Expand Down
3 changes: 3 additions & 0 deletions async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ repositories {


dependencies {
implementation "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"

api project(':core')
api project(':async-query-core')
implementation project(':protocol')
Expand Down Expand Up @@ -97,6 +99,7 @@ jacocoTestCoverageVerification {
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.scheduler.OpenSearchRefreshIndexJobRequestParser',
'org.opensearch.sql.spark.transport.model.*'
]
limit {
Expand Down
Loading

0 comments on commit b02895c

Please sign in to comment.