Skip to content

Commit

Permalink
Extend scheduler interface for Multitenancy
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger committed Sep 12, 2024
1 parent 4303a2a commit 7496a83
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 31 deletions.
5 changes: 5 additions & 0 deletions async-query-core/src/main/antlr/SqlBaseLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ CLUSTERED: 'CLUSTERED';
CODEGEN: 'CODEGEN';
COLLATE: 'COLLATE';
COLLATION: 'COLLATION';
COLLATIONS: 'COLLATIONS';
COLLECTION: 'COLLECTION';
COLUMN: 'COLUMN';
COLUMNS: 'COLUMNS';
Expand Down Expand Up @@ -276,13 +277,15 @@ INTO: 'INTO';
INVOKER: 'INVOKER';
IS: 'IS';
ITEMS: 'ITEMS';
ITERATE: 'ITERATE';
JOIN: 'JOIN';
KEYS: 'KEYS';
LANGUAGE: 'LANGUAGE';
LAST: 'LAST';
LATERAL: 'LATERAL';
LAZY: 'LAZY';
LEADING: 'LEADING';
LEAVE: 'LEAVE';
LEFT: 'LEFT';
LIKE: 'LIKE';
ILIKE: 'ILIKE';
Expand Down Expand Up @@ -362,6 +365,7 @@ REFERENCES: 'REFERENCES';
REFRESH: 'REFRESH';
RENAME: 'RENAME';
REPAIR: 'REPAIR';
REPEAT: 'REPEAT';
REPEATABLE: 'REPEATABLE';
REPLACE: 'REPLACE';
RESET: 'RESET';
Expand Down Expand Up @@ -451,6 +455,7 @@ UNKNOWN: 'UNKNOWN';
UNLOCK: 'UNLOCK';
UNPIVOT: 'UNPIVOT';
UNSET: 'UNSET';
UNTIL: 'UNTIL';
UPDATE: 'UPDATE';
USE: 'USE';
USER: 'USER';
Expand Down
25 changes: 25 additions & 0 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ compoundStatement
| beginEndCompoundBlock
| ifElseStatement
| whileStatement
| repeatStatement
| leaveStatement
| iterateStatement
;

setStatementWithOptionalVarKeyword
Expand All @@ -83,6 +86,18 @@ ifElseStatement
(ELSE elseBody=compoundBody)? END IF
;

repeatStatement
: beginLabel? REPEAT compoundBody UNTIL booleanExpression END REPEAT endLabel?
;

leaveStatement
: LEAVE multipartIdentifier
;

iterateStatement
: ITERATE multipartIdentifier
;

singleStatement
: (statement|setResetStatement) SEMICOLON* EOF
;
Expand Down Expand Up @@ -245,6 +260,7 @@ statement
| SHOW PARTITIONS identifierReference partitionSpec? #showPartitions
| SHOW identifier? FUNCTIONS ((FROM | IN) ns=identifierReference)?
(LIKE? (legacy=multipartIdentifier | pattern=stringLit))? #showFunctions
| SHOW COLLATIONS (LIKE? pattern=stringLit)? #showCollations
| SHOW CREATE TABLE identifierReference (AS SERDE)? #showCreateTable
| SHOW CURRENT namespace #showCurrentNamespace
| SHOW CATALOGS (LIKE? pattern=stringLit)? #showCatalogs
Expand Down Expand Up @@ -1578,10 +1594,12 @@ ansiNonReserved
| INTERVAL
| INVOKER
| ITEMS
| ITERATE
| KEYS
| LANGUAGE
| LAST
| LAZY
| LEAVE
| LIKE
| ILIKE
| LIMIT
Expand Down Expand Up @@ -1648,6 +1666,7 @@ ansiNonReserved
| REFRESH
| RENAME
| REPAIR
| REPEAT
| REPEATABLE
| REPLACE
| RESET
Expand Down Expand Up @@ -1723,6 +1742,7 @@ ansiNonReserved
| UNLOCK
| UNPIVOT
| UNSET
| UNTIL
| UPDATE
| USE
| VALUES
Expand Down Expand Up @@ -1818,6 +1838,7 @@ nonReserved
| CODEGEN
| COLLATE
| COLLATION
| COLLATIONS
| COLLECTION
| COLUMN
| COLUMNS
Expand Down Expand Up @@ -1927,11 +1948,13 @@ nonReserved
| INVOKER
| IS
| ITEMS
| ITERATE
| KEYS
| LANGUAGE
| LAST
| LAZY
| LEADING
| LEAVE
| LIKE
| LONG
| ILIKE
Expand Down Expand Up @@ -2009,6 +2032,7 @@ nonReserved
| REFRESH
| RENAME
| REPAIR
| REPEAT
| REPEATABLE
| REPLACE
| RESET
Expand Down Expand Up @@ -2093,6 +2117,7 @@ nonReserved
| UNLOCK
| UNPIVOT
| UNSET
| UNTIL
| UPDATE
| USE
| USER
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/**
* Index Operation for Altering the flint index. Only handles alter operation when
Expand Down Expand Up @@ -62,7 +63,11 @@ void runOp(
this.flintIndexMetadataService.updateIndexToManualRefresh(
flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest();
request.setAccountId(flintIndexStateModel.getAccountId());
request.setDataSource(flintIndexStateModel.getDatasourceName());
request.setJobId(flintIndexMetadata.getOpensearchIndexName());
asyncQueryScheduler.unscheduleJob(request);
} else {
cancelStreamingJob(flintIndexStateModel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/** Operation to drop Flint index */
public class FlintIndexOpDrop extends FlintIndexOp {
Expand Down Expand Up @@ -54,7 +55,11 @@ void runOp(
"Performing drop index operation for index: {}",
flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
asyncQueryScheduler.unscheduleJob(flintIndexMetadata.getOpensearchIndexName());
AsyncQuerySchedulerRequest request = new AsyncQuerySchedulerRequest();
request.setAccountId(flintIndexStateModel.getAccountId());
request.setDataSource(flintIndexStateModel.getDatasourceName());
request.setJobId(flintIndexMetadata.getOpensearchIndexName());
asyncQueryScheduler.unscheduleJob(request);
} else {
cancelStreamingJob(flintIndexStateModel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@ public interface AsyncQueryScheduler {
* Temporarily disabling a job during maintenance or high-load periods - Allowing for easy
* re-enabling of the job in the future
*
* @param jobId The unique identifier of the job to unschedule
* @param asyncQuerySchedulerRequest The request containing updated job configuration
* @throws IllegalArgumentException if the job to be unscheduled doesn't exist
* @throws RuntimeException if there's an error during the unschedule process
*/
void unscheduleJob(String jobId);
void unscheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);

/**
* Removes a job completely from the scheduler. This method permanently deletes the job and all
Expand All @@ -51,7 +53,9 @@ public interface AsyncQueryScheduler {
* <p>Use cases: - Cleaning up jobs that are no longer needed - Removing obsolete or erroneously
* created jobs - Freeing up resources by deleting unused job configurations
*
* @param jobId The unique identifier of the job to remove
* @param asyncQuerySchedulerRequest The request to delete the job configuration
* @throws IllegalArgumentException if the job to be removed doesn't exist
* @throws RuntimeException if there's an error during the remove process
*/
void removeJob(String jobId);
void removeJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -230,7 +231,9 @@ public void createDropIndexQueryWithScheduler() {
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);

verify(asyncQueryScheduler).unscheduleJob(indexName);
verify(asyncQueryScheduler)
.unscheduleJob(
argThat(request -> indexName.equals(request.getJobId()) && !request.isEnabled()));
}

@Test
Expand Down Expand Up @@ -318,8 +321,9 @@ public void createAlterIndexQueryWithScheduler() {
FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue();
assertFalse(flintIndexOptions.autoRefresh());

verify(asyncQueryScheduler).unscheduleJob(indexName);

verify(asyncQueryScheduler)
.unscheduleJob(
argThat(request -> indexName.equals(request.getJobId()) && !request.isEnabled()));
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand Down Expand Up @@ -87,18 +88,18 @@ public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {

/** Unschedules a job by marking it as disabled and updating its last update time. */
@Override
public void unscheduleJob(String jobId) {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.builder()
.jobId(jobId)
.enabled(false)
.lastUpdateTime(Instant.now())
.build();
public void unscheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
String jobId = asyncQuerySchedulerRequest.getJobId();
if (Strings.isNullOrEmpty(jobId)) {
throw new IllegalArgumentException("JobId cannot be null or empty");
}
try {
updateJob(request);
LOG.info("Unscheduled job for jobId: {}", jobId);
asyncQuerySchedulerRequest.setEnabled(false);
asyncQuerySchedulerRequest.setLastUpdateTime(Instant.now());
updateJob(asyncQuerySchedulerRequest);
LOG.info("Unscheduled job for jobId: {}", asyncQuerySchedulerRequest);
} catch (IllegalStateException | DocumentMissingException e) {
LOG.error("Failed to unschedule job: {}", jobId, e);
LOG.error("Failed to unschedule job: {}", asyncQuerySchedulerRequest, e);
}
}

Expand Down Expand Up @@ -134,8 +135,12 @@ public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {

/** Removes a job by deleting its document from the index. */
@Override
public void removeJob(String jobId) {
public void removeJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
assertIndexExists();
String jobId = asyncQuerySchedulerRequest.getJobId();
if (Strings.isNullOrEmpty(jobId)) {
throw new IllegalArgumentException("JobId cannot be null or empty");
}
DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<DeleteResponse> deleteResponseActionFuture = client.delete(deleteRequest);
Expand Down
Loading

0 comments on commit 7496a83

Please sign in to comment.