Skip to content

Commit

Permalink
Add AsyncQueryRequestContext to FlintIndexMetadataService/FlintIndexS…
Browse files Browse the repository at this point in the history
…tateModelService (#2879)

Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
  • Loading branch information
ykmr1224 authored Aug 1, 2024
1 parent 82ef68e commit 14a80a9
Show file tree
Hide file tree
Showing 33 changed files with 384 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ CreateAsyncQueryResponse createAsyncQuery(
* @param queryId queryId.
* @return {@link String} cancelledQueryId.
*/
String cancelQuery(String queryId);
String cancelQuery(String queryId, AsyncQueryRequestContext asyncQueryRequestContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ public AsyncQueryExecutionResponse getAsyncQueryResults(String queryId) {
}

@Override
public String cancelQuery(String queryId) {
public String cancelQuery(String queryId, AsyncQueryRequestContext asyncQueryRequestContext) {
Optional<AsyncQueryJobMetadata> asyncQueryJobMetadata =
asyncQueryJobMetadataStorageService.getJobMetadata(queryId);
if (asyncQueryJobMetadata.isPresent()) {
return sparkQueryDispatcher.cancelJob(asyncQueryJobMetadata.get());
return sparkQueryDispatcher.cancelJob(asyncQueryJobMetadata.get(), asyncQueryRequestContext);
}
throw new AsyncQueryNotFoundException(String.format("QueryId: %s not found", queryId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.amazonaws.services.emrserverless.model.JobRunState;
import org.json.JSONObject;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
Expand Down Expand Up @@ -54,7 +55,9 @@ protected abstract JSONObject getResponseFromResultIndex(
protected abstract JSONObject getResponseFromExecutor(
AsyncQueryJobMetadata asyncQueryJobMetadata);

public abstract String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata);
public abstract String cancelJob(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext);

public abstract DispatchQueryResponse submit(
DispatchQueryRequest request, DispatchQueryContext context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
Expand Down Expand Up @@ -61,7 +62,9 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob
}

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
public String cancelJob(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
emrServerlessClient.cancelJobRun(
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId(), false);
return asyncQueryJobMetadata.getQueryId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,11 @@ public DispatchQueryResponse submit(
long startTime = System.currentTimeMillis();
try {
IndexQueryDetails indexDetails = context.getIndexQueryDetails();
FlintIndexMetadata indexMetadata = getFlintIndexMetadata(indexDetails);
FlintIndexMetadata indexMetadata =
getFlintIndexMetadata(indexDetails, context.getAsyncQueryRequestContext());

getIndexOp(dispatchQueryRequest, indexDetails).apply(indexMetadata);
getIndexOp(dispatchQueryRequest, indexDetails)
.apply(indexMetadata, context.getAsyncQueryRequestContext());

String asyncQueryId =
storeIndexDMLResult(
Expand Down Expand Up @@ -146,9 +148,11 @@ private FlintIndexOp getIndexOp(
}
}

private FlintIndexMetadata getFlintIndexMetadata(IndexQueryDetails indexDetails) {
private FlintIndexMetadata getFlintIndexMetadata(
IndexQueryDetails indexDetails, AsyncQueryRequestContext asyncQueryRequestContext) {
Map<String, FlintIndexMetadata> indexMetadataMap =
flintIndexMetadataService.getFlintIndexMetadata(indexDetails.openSearchIndexName());
flintIndexMetadataService.getFlintIndexMetadata(
indexDetails.openSearchIndexName(), asyncQueryRequestContext);
if (!indexMetadataMap.containsKey(indexDetails.openSearchIndexName())) {
throw new IllegalStateException(
String.format(
Expand All @@ -174,7 +178,9 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob
}

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
public String cancelJob(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
throw new IllegalArgumentException("can't cancel index DML query");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.json.JSONObject;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryResponse;
Expand Down Expand Up @@ -71,7 +72,9 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob
}

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
public String cancelJob(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
String queryId = asyncQueryJobMetadata.getQueryId();
getStatementByQueryId(
asyncQueryJobMetadata.getSessionId(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.Map;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryRequest;
Expand Down Expand Up @@ -51,18 +52,21 @@ public RefreshQueryHandler(
}

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
public String cancelJob(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
String datasourceName = asyncQueryJobMetadata.getDatasourceName();
Map<String, FlintIndexMetadata> indexMetadataMap =
flintIndexMetadataService.getFlintIndexMetadata(asyncQueryJobMetadata.getIndexName());
flintIndexMetadataService.getFlintIndexMetadata(
asyncQueryJobMetadata.getIndexName(), asyncQueryRequestContext);
if (!indexMetadataMap.containsKey(asyncQueryJobMetadata.getIndexName())) {
throw new IllegalStateException(
String.format(
"Couldn't fetch flint index: %s details", asyncQueryJobMetadata.getIndexName()));
}
FlintIndexMetadata indexMetadata = indexMetadataMap.get(asyncQueryJobMetadata.getIndexName());
FlintIndexOp jobCancelOp = flintIndexOpFactory.getCancel(datasourceName);
jobCancelOp.apply(indexMetadata);
jobCancelOp.apply(indexMetadata, asyncQueryRequestContext);
return asyncQueryJobMetadata.getQueryId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,11 @@ public JSONObject getQueryResponse(AsyncQueryJobMetadata asyncQueryJobMetadata)
.getQueryResponse(asyncQueryJobMetadata);
}

public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
public String cancelJob(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
return getAsyncQueryHandlerForExistingQuery(asyncQueryJobMetadata)
.cancelJob(asyncQueryJobMetadata);
.cancelJob(asyncQueryJobMetadata, asyncQueryRequestContext);
}

private AsyncQueryHandler getAsyncQueryHandlerForExistingQuery(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.Map;
import org.opensearch.sql.datasource.model.DataSourceMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryJobMetadata;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.client.EMRServerlessClient;
import org.opensearch.sql.spark.client.StartJobRequest;
import org.opensearch.sql.spark.dispatcher.model.DispatchQueryContext;
Expand Down Expand Up @@ -46,7 +47,9 @@ public StreamingQueryHandler(
}

@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
public String cancelJob(
AsyncQueryJobMetadata asyncQueryJobMetadata,
AsyncQueryRequestContext asyncQueryRequestContext) {
throw new IllegalArgumentException(
"can't cancel index DML query, using ALTER auto_refresh=off statement to stop job, using"
+ " VACUUM statement to stop job and delete data");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.sql.spark.flint;

import java.util.Map;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.dispatcher.model.FlintIndexOptions;

/** Interface for FlintIndexMetadataReader */
Expand All @@ -15,16 +16,22 @@ public interface FlintIndexMetadataService {
* Retrieves a map of {@link FlintIndexMetadata} instances matching the specified index pattern.
*
* @param indexPattern indexPattern.
* @param asyncQueryRequestContext request context passed to AsyncQueryExecutorService
* @return A map of {@link FlintIndexMetadata} instances against indexName, each providing
* metadata access for a matched index. Returns an empty list if no indices match the pattern.
*/
Map<String, FlintIndexMetadata> getFlintIndexMetadata(String indexPattern);
Map<String, FlintIndexMetadata> getFlintIndexMetadata(
String indexPattern, AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Performs validation and updates flint index to manual refresh.
*
* @param indexName indexName.
* @param flintIndexOptions flintIndexOptions.
* @param asyncQueryRequestContext request context passed to AsyncQueryExecutorService
*/
void updateIndexToManualRefresh(String indexName, FlintIndexOptions flintIndexOptions);
void updateIndexToManualRefresh(
String indexName,
FlintIndexOptions flintIndexOptions,
AsyncQueryRequestContext asyncQueryRequestContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,58 @@
package org.opensearch.sql.spark.flint;

import java.util.Optional;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;

/**
* Abstraction over flint index state storage. Flint index state will maintain the status of each
* flint index.
*/
public interface FlintIndexStateModelService {
FlintIndexStateModel createFlintIndexStateModel(FlintIndexStateModel flintIndexStateModel);

Optional<FlintIndexStateModel> getFlintIndexStateModel(String id, String datasourceName);
/**
* Create Flint index state record
*
* @param flintIndexStateModel the model to be saved
* @param asyncQueryRequestContext the request context passed to AsyncQueryExecutorService
* @return saved model
*/
FlintIndexStateModel createFlintIndexStateModel(
FlintIndexStateModel flintIndexStateModel, AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Get Flint index state record
*
* @param id ID(latestId) of the Flint index state record
* @param datasourceName datasource name
* @param asyncQueryRequestContext the request context passed to AsyncQueryExecutorService
* @return retrieved model
*/
Optional<FlintIndexStateModel> getFlintIndexStateModel(
String id, String datasourceName, AsyncQueryRequestContext asyncQueryRequestContext);

/**
* Update Flint index state record
*
* @param flintIndexStateModel the model to be updated
* @param flintIndexState new state
* @param datasourceName Datasource name
* @param asyncQueryRequestContext the request context passed to AsyncQueryExecutorService
* @return Updated model
*/
FlintIndexStateModel updateFlintIndexState(
FlintIndexStateModel flintIndexStateModel,
FlintIndexState flintIndexState,
String datasourceName);
String datasourceName,
AsyncQueryRequestContext asyncQueryRequestContext);

boolean deleteFlintIndexStateModel(String id, String datasourceName);
/**
* Delete Flint index state record
*
* @param id ID(latestId) of the Flint index state record
* @param datasourceName datasource name
* @param asyncQueryRequestContext the request context passed to AsyncQueryExecutorService
* @return true if deleted, otherwise false
*/
boolean deleteFlintIndexStateModel(
String id, String datasourceName, AsyncQueryRequestContext asyncQueryRequestContext);
}
Loading

0 comments on commit 14a80a9

Please sign in to comment.