Skip to content

Commit

Permalink
Revert change and add context
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger committed Sep 13, 2024
1 parent ab4a148 commit 500f607
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 75 deletions.
8 changes: 8 additions & 0 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ compoundStatement
| setStatementWithOptionalVarKeyword
| beginEndCompoundBlock
| ifElseStatement
| caseStatement
| whileStatement
| repeatStatement
| leaveStatement
Expand Down Expand Up @@ -98,6 +99,13 @@ iterateStatement
: ITERATE multipartIdentifier
;

caseStatement
: CASE (WHEN conditions+=booleanExpression THEN conditionalBodies+=compoundBody)+
(ELSE elseBody=compoundBody)? END CASE #searchedCaseStatement
| CASE caseVariable=expression (WHEN conditionExpressions+=expression THEN conditionalBodies+=compoundBody)+
(ELSE elseBody=compoundBody)? END CASE #simpleCaseStatement
;

singleStatement
: (statement|setResetStatement) SEMICOLON* EOF
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/**
* Index Operation for Altering the flint index. Only handles alter operation when
Expand Down Expand Up @@ -63,13 +62,8 @@ void runOp(
this.flintIndexMetadataService.updateIndexToManualRefresh(
flintIndexMetadata.getOpensearchIndexName(), flintIndexOptions, asyncQueryRequestContext);
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
AsyncQuerySchedulerRequest request =
AsyncQuerySchedulerRequest.builder()
.accountId(flintIndexStateModel.getAccountId())
.dataSource(flintIndexStateModel.getDatasourceName())
.jobId(flintIndexMetadata.getOpensearchIndexName())
.build();
asyncQueryScheduler.unscheduleJob(request);
asyncQueryScheduler.unscheduleJob(
flintIndexMetadata.getOpensearchIndexName(), asyncQueryRequestContext);
} else {
cancelStreamingJob(flintIndexStateModel);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.opensearch.sql.spark.flint.FlintIndexStateModel;
import org.opensearch.sql.spark.flint.FlintIndexStateModelService;
import org.opensearch.sql.spark.scheduler.AsyncQueryScheduler;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/** Operation to drop Flint index */
public class FlintIndexOpDrop extends FlintIndexOp {
Expand Down Expand Up @@ -55,13 +54,8 @@ void runOp(
"Performing drop index operation for index: {}",
flintIndexMetadata.getOpensearchIndexName());
if (flintIndexMetadata.getFlintIndexOptions().isExternalScheduler()) {
AsyncQuerySchedulerRequest request =
AsyncQuerySchedulerRequest.builder()
.accountId(flintIndexStateModel.getAccountId())
.dataSource(flintIndexStateModel.getDatasourceName())
.jobId(flintIndexMetadata.getOpensearchIndexName())
.build();
asyncQueryScheduler.unscheduleJob(request);
asyncQueryScheduler.unscheduleJob(
flintIndexMetadata.getOpensearchIndexName(), asyncQueryRequestContext);
} else {
cancelStreamingJob(flintIndexStateModel);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;

/** Scheduler interface for scheduling asynchronous query jobs. */
Expand All @@ -13,10 +19,13 @@ public interface AsyncQueryScheduler {
* task
*
* @param asyncQuerySchedulerRequest The request containing job configuration details
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if a job with the same name already exists
* @throws RuntimeException if there's an error during job creation
*/
void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
void scheduleJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Updates an existing job with new parameters. This method modifies the configuration of an
Expand All @@ -26,10 +35,13 @@ public interface AsyncQueryScheduler {
* scheduled job - Updating resource allocations for a job
*
* @param asyncQuerySchedulerRequest The request containing updated job configuration
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if the job to be updated doesn't exist
* @throws RuntimeException if there's an error during the update process
*/
void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
void updateJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Unschedules a job by marking it as disabled and updating its last update time. This method is
Expand All @@ -40,12 +52,12 @@ public interface AsyncQueryScheduler {
* Temporarily disabling a job during maintenance or high-load periods - Allowing for easy
* re-enabling of the job in the future
*
* @param asyncQuerySchedulerRequest The request containing the job configuration to unschedule.
* At minimum, it must include the jobId.
* @param jobId The unique identifier of the job to unschedule
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if the job to be unscheduled doesn't exist
* @throws RuntimeException if there's an error during the unschedule process
*/
void unscheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
void unscheduleJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Removes a job completely from the scheduler. This method permanently deletes the job and all
Expand All @@ -54,10 +66,10 @@ public interface AsyncQueryScheduler {
* <p>Use cases: - Cleaning up jobs that are no longer needed - Removing obsolete or erroneously
* created jobs - Freeing up resources by deleting unused job configurations
*
* @param asyncQuerySchedulerRequest The request containing the job configuration to unschedule.
* At minimum, it must include the jobId.
* @param jobId The unique identifier of the job to remove
* @param asyncQueryRequestContext The request context passed to AsyncQueryExecutorService
* @throws IllegalArgumentException if the job to be removed doesn't exist
* @throws RuntimeException if there's an error during the remove process
*/
void removeJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest);
void removeJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -231,9 +230,7 @@ public void createDropIndexQueryWithScheduler() {
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);

verify(asyncQueryScheduler)
.unscheduleJob(
argThat(request -> indexName.equals(request.getJobId()) && !request.isEnabled()));
verify(asyncQueryScheduler).unscheduleJob(indexName, asyncQueryRequestContext);
}

@Test
Expand Down Expand Up @@ -321,9 +318,7 @@ public void createAlterIndexQueryWithScheduler() {
FlintIndexOptions flintIndexOptions = flintIndexOptionsArgumentCaptor.getValue();
assertFalse(flintIndexOptions.autoRefresh());

verify(asyncQueryScheduler)
.unscheduleJob(
argThat(request -> indexName.equals(request.getJobId()) && !request.isEnabled()));
verify(asyncQueryScheduler).unscheduleJob(indexName, asyncQueryRequestContext);
verifyCreateIndexDMLResultCalled();
verifyStoreJobMetadataCalled(DML_QUERY_JOB_ID, QueryState.SUCCESS, JobType.BATCH);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.scheduler.job.ScheduledAsyncQueryJobRunner;
import org.opensearch.sql.spark.scheduler.model.AsyncQuerySchedulerRequest;
import org.opensearch.sql.spark.scheduler.model.ScheduledAsyncQueryJobRequest;
Expand All @@ -56,7 +57,9 @@ public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler {

@Override
/** Schedules a new job by indexing it into the job index. */
public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
public void scheduleJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest);
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
Expand Down Expand Up @@ -88,15 +91,18 @@ public void scheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {

/** Unschedules a job by marking it as disabled and updating its last update time. */
@Override
public void unscheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
String jobId = asyncQuerySchedulerRequest.getJobId();
public void unscheduleJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext) {
if (Strings.isNullOrEmpty(jobId)) {
throw new IllegalArgumentException("JobId cannot be null or empty");
}
try {
asyncQuerySchedulerRequest.setEnabled(false);
asyncQuerySchedulerRequest.setLastUpdateTime(Instant.now());
updateJob(asyncQuerySchedulerRequest);
AsyncQuerySchedulerRequest request =
ScheduledAsyncQueryJobRequest.builder()
.jobId(jobId)
.enabled(false)
.lastUpdateTime(Instant.now())
.build();
updateJob(request, asyncQueryRequestContext);
LOG.info("Unscheduled job for jobId: {}", jobId);
} catch (IllegalStateException | DocumentMissingException e) {
LOG.error("Failed to unschedule job: {}", jobId, e);
Expand All @@ -106,7 +112,9 @@ public void unscheduleJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest)
/** Updates an existing job with new parameters. */
@Override
@SneakyThrows
public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
public void updateJob(
AsyncQuerySchedulerRequest asyncQuerySchedulerRequest,
AsyncQueryRequestContext asyncQueryRequestContext) {
ScheduledAsyncQueryJobRequest request =
ScheduledAsyncQueryJobRequest.fromAsyncQuerySchedulerRequest(asyncQuerySchedulerRequest);
assertIndexExists();
Expand Down Expand Up @@ -135,9 +143,8 @@ public void updateJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {

/** Removes a job by deleting its document from the index. */
@Override
public void removeJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
public void removeJob(String jobId, AsyncQueryRequestContext asyncQueryRequestContext) {
assertIndexExists();
String jobId = asyncQuerySchedulerRequest.getJobId();
if (Strings.isNullOrEmpty(jobId)) {
throw new IllegalArgumentException("JobId cannot be null or empty");
}
Expand Down
Loading

0 comments on commit 500f607

Please sign in to comment.