Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Flint query scheduler part1 - integrate job scheduler plugin #2834

Merged
merged 18 commits into from
Jul 31, 2024
Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@ gen
.worktrees
http-client.env.json
/doctest/sql-cli/
/doctest/opensearch-job-scheduler/
.factorypath
17 changes: 11 additions & 6 deletions async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ compoundStatement
;

setStatementWithOptionalVarKeyword
: SET (VARIABLE | VAR)? assignmentList #setVariableWithOptionalKeyword
| SET (VARIABLE | VAR)? LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
: SET variable? assignmentList #setVariableWithOptionalKeyword
| SET variable? LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
LEFT_PAREN query RIGHT_PAREN #setVariableWithOptionalKeyword
;

Expand Down Expand Up @@ -215,9 +215,9 @@ statement
routineCharacteristics
RETURN (query | expression) #createUserDefinedFunction
| DROP TEMPORARY? FUNCTION (IF EXISTS)? identifierReference #dropFunction
| DECLARE (OR REPLACE)? VARIABLE?
| DECLARE (OR REPLACE)? variable?
identifierReference dataType? variableDefaultExpression? #createVariable
| DROP TEMPORARY VARIABLE (IF EXISTS)? identifierReference #dropVariable
| DROP TEMPORARY variable (IF EXISTS)? identifierReference #dropVariable
| EXPLAIN (LOGICAL | FORMATTED | EXTENDED | CODEGEN | COST)?
(statement|setResetStatement) #explain
| SHOW TABLES ((FROM | IN) identifierReference)?
Expand Down Expand Up @@ -272,8 +272,8 @@ setResetStatement
| SET TIME ZONE interval #setTimeZone
| SET TIME ZONE timezone #setTimeZone
| SET TIME ZONE .*? #setTimeZone
| SET (VARIABLE | VAR) assignmentList #setVariable
| SET (VARIABLE | VAR) LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
| SET variable assignmentList #setVariable
| SET variable LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
LEFT_PAREN query RIGHT_PAREN #setVariable
| SET configKey EQ configValue #setQuotedConfiguration
| SET configKey (EQ .*?)? #setConfiguration
Expand Down Expand Up @@ -438,6 +438,11 @@ namespaces
| SCHEMAS
;

variable
: VARIABLE
| VAR
;

describeFuncName
: identifierReference
| stringLit
Expand Down
3 changes: 3 additions & 0 deletions async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ repositories {


dependencies {
implementation "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"

api project(':core')
api project(':async-query-core')
implementation project(':protocol')
Expand Down Expand Up @@ -97,6 +99,7 @@ jacocoTestCoverageVerification {
// ignore because XContext IOException
'org.opensearch.sql.spark.execution.statestore.StateStore',
'org.opensearch.sql.spark.rest.*',
'org.opensearch.sql.spark.scheduler.OpenSearchRefreshIndexJobRequestParser',
'org.opensearch.sql.spark.transport.model.*'
]
limit {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;

import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.action.DocWriteRequest;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;
import org.opensearch.threadpool.ThreadPool;

/** Scheduler class for managing asynchronous query jobs. */
ykmr1224 marked this conversation as resolved.
Show resolved Hide resolved
public class OpenSearchAsyncQueryScheduler {
ykmr1224 marked this conversation as resolved.
Show resolved Hide resolved
public static final String SCHEDULER_INDEX_NAME = ".async-query-scheduler";
public static final String SCHEDULER_PLUGIN_JOB_TYPE = "async-query-scheduler";
private static final String SCHEDULER_INDEX_MAPPING_FILE_NAME =
"async-query-scheduler-index-mapping.yml";
private static final String SCHEDULER_INDEX_SETTINGS_FILE_NAME =
"async-query-scheduler-index-settings.yml";
private static final Logger LOG = LogManager.getLogger();

private Client client;
private ClusterService clusterService;

/** Loads job resources, setting up required services and job runner instance. */
public void loadJobResource(Client client, ClusterService clusterService, ThreadPool threadPool) {
penghuo marked this conversation as resolved.
Show resolved Hide resolved
this.client = client;
this.clusterService = clusterService;
OpenSearchRefreshIndexJob openSearchRefreshIndexJob =
OpenSearchRefreshIndexJob.getJobRunnerInstance();
openSearchRefreshIndexJob.setClusterService(clusterService);
openSearchRefreshIndexJob.setThreadPool(threadPool);
openSearchRefreshIndexJob.setClient(client);
}

/** Schedules a new job by indexing it into the job index. */
public void scheduleJob(OpenSearchRefreshIndexJobRequest request) {
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
createAsyncQuerySchedulerIndex();
}
IndexRequest indexRequest = new IndexRequest(SCHEDULER_INDEX_NAME);
indexRequest.id(request.getName());
indexRequest.opType(DocWriteRequest.OpType.CREATE);
indexRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
IndexResponse indexResponse;
try {
indexRequest.source(request.toXContent(JsonXContent.contentBuilder(), EMPTY_PARAMS));
ActionFuture<IndexResponse> indexResponseActionFuture = client.index(indexRequest);
indexResponse = indexResponseActionFuture.actionGet();
} catch (VersionConflictEngineException exception) {
throw new IllegalArgumentException("A job already exists with name: " + request.getName());
} catch (Throwable e) {
LOG.error("Failed to schedule job : {}", request.getName(), e);
throw new RuntimeException(e);
noCharger marked this conversation as resolved.
Show resolved Hide resolved
}

if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
LOG.debug("Job : {} successfully created", request.getName());
} else {
throw new RuntimeException(
"Schedule job failed with result : " + indexResponse.getResult().getLowercase());
}
}

/** Unschedules a job by marking it as disabled and updating its last update time. */
public void unscheduleJob(String jobId) throws IOException {
assertIndexExists();
OpenSearchRefreshIndexJobRequest request =
OpenSearchRefreshIndexJobRequest.builder()
.jobName(jobId)
.enabled(false)
.lastUpdateTime(Instant.now())
.build();
updateJob(request);
}

/** Updates an existing job with new parameters. */
public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOException {
noCharger marked this conversation as resolved.
Show resolved Hide resolved
assertIndexExists();
UpdateRequest updateRequest = new UpdateRequest(SCHEDULER_INDEX_NAME, request.getName());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(request.toXContent(JsonXContent.contentBuilder(), EMPTY_PARAMS));
UpdateResponse updateResponse;
try {
ActionFuture<UpdateResponse> updateResponseActionFuture = client.update(updateRequest);
updateResponse = updateResponseActionFuture.actionGet();
} catch (DocumentMissingException exception) {
throw new IllegalArgumentException("Job: " + request.getName() + " doesn't exist");
} catch (Throwable e) {
LOG.error("Failed to update job : {}", request.getName(), e);
throw new RuntimeException(e);
noCharger marked this conversation as resolved.
Show resolved Hide resolved
}

if (updateResponse.getResult().equals(DocWriteResponse.Result.UPDATED)
|| updateResponse.getResult().equals(DocWriteResponse.Result.NOOP)) {
LOG.debug("Job : {} successfully updated", request.getName());
} else {
throw new RuntimeException(
"Update job failed with result : " + updateResponse.getResult().getLowercase());
}
}

/** Removes a job by deleting its document from the index. */
public void removeJob(String jobId) {
assertIndexExists();
DeleteRequest deleteRequest = new DeleteRequest(SCHEDULER_INDEX_NAME, jobId);
deleteRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
ActionFuture<DeleteResponse> deleteResponseActionFuture = client.delete(deleteRequest);
DeleteResponse deleteResponse = deleteResponseActionFuture.actionGet();

if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED)) {
LOG.debug("Job : {} successfully deleted", jobId);
} else if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
throw new IllegalArgumentException("Job : " + jobId + " doesn't exist");
} else {
throw new RuntimeException(
"Remove job failed with result : " + deleteResponse.getResult().getLowercase());
}
}

/** Creates the async query scheduler index with specified mappings and settings. */
@VisibleForTesting
void createAsyncQuerySchedulerIndex() {
noCharger marked this conversation as resolved.
Show resolved Hide resolved
try {
InputStream mappingFileStream =
OpenSearchAsyncQueryScheduler.class
.getClassLoader()
.getResourceAsStream(SCHEDULER_INDEX_MAPPING_FILE_NAME);
InputStream settingsFileStream =
OpenSearchAsyncQueryScheduler.class
.getClassLoader()
.getResourceAsStream(SCHEDULER_INDEX_SETTINGS_FILE_NAME);
CreateIndexRequest createIndexRequest = new CreateIndexRequest(SCHEDULER_INDEX_NAME);
createIndexRequest.mapping(
IOUtils.toString(mappingFileStream, StandardCharsets.UTF_8), XContentType.YAML);
createIndexRequest.settings(
IOUtils.toString(settingsFileStream, StandardCharsets.UTF_8), XContentType.YAML);
ActionFuture<CreateIndexResponse> createIndexResponseActionFuture =
client.admin().indices().create(createIndexRequest);
CreateIndexResponse createIndexResponse = createIndexResponseActionFuture.actionGet();

if (createIndexResponse.isAcknowledged()) {
LOG.debug("Index: {} creation Acknowledged", SCHEDULER_INDEX_NAME);
} else {
throw new RuntimeException("Index creation is not acknowledged.");
}
} catch (Throwable e) {
LOG.error("Error creating index: {}", SCHEDULER_INDEX_NAME, e);
throw new RuntimeException(
"Internal server error while creating "
+ SCHEDULER_INDEX_NAME
+ " index: "
+ e.getMessage(),
e);
}
}

private void assertIndexExists() {
if (!this.clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)) {
throw new IllegalStateException("Job index does not exist.");
}
}

/** Returns the job runner instance for the scheduler. */
public static ScheduledJobRunner getJobRunner() {
return OpenSearchRefreshIndexJob.getJobRunnerInstance();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import java.io.IOException;
import java.time.Instant;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;

public class OpenSearchRefreshIndexJobRequestParser {

private static Instant parseInstantValue(XContentParser parser) throws IOException {
if (XContentParser.Token.VALUE_NULL.equals(parser.currentToken())) {
return null;
}
if (parser.currentToken().isValue()) {
return Instant.ofEpochMilli(parser.longValue());
}
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
return null;
}

public static ScheduledJobParser getJobParser() {
return (parser, id, jobDocVersion) -> {
OpenSearchRefreshIndexJobRequest.OpenSearchRefreshIndexJobRequestBuilder builder =
OpenSearchRefreshIndexJobRequest.builder();
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

while (!parser.nextToken().equals(XContentParser.Token.END_OBJECT)) {
String fieldName = parser.currentName();
parser.nextToken();
switch (fieldName) {
case OpenSearchRefreshIndexJobRequest.JOB_NAME_FIELD:
builder.jobName(parser.text());
break;
case OpenSearchRefreshIndexJobRequest.JOB_TYPE_FIELD:
builder.jobType(parser.text());
break;
case OpenSearchRefreshIndexJobRequest.ENABLED_FIELD:
builder.enabled(parser.booleanValue());
break;
case OpenSearchRefreshIndexJobRequest.ENABLED_TIME_FIELD:
builder.enabledTime(parseInstantValue(parser));
break;
case OpenSearchRefreshIndexJobRequest.LAST_UPDATE_TIME_FIELD:
builder.lastUpdateTime(parseInstantValue(parser));
break;
case OpenSearchRefreshIndexJobRequest.SCHEDULE_FIELD:
builder.schedule(ScheduleParser.parse(parser));
break;
case OpenSearchRefreshIndexJobRequest.LOCK_DURATION_SECONDS:
builder.lockDurationSeconds(parser.longValue());
break;
case OpenSearchRefreshIndexJobRequest.JITTER:
builder.jitter(parser.doubleValue());
break;
default:
XContentParserUtils.throwUnknownToken(parser.currentToken(), parser.getTokenLocation());
}
}
return builder.build();
};
}
}
Loading
Loading