Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Handle EMR Exceptions in FlintCancelJob Operation #2589

Merged
merged 1 commit into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions docs/user/admin/settings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ SQL query::


plugins.query.executionengine.spark.session_inactivity_timeout_millis
===============================
=====================================================================

Description
-----------
Expand Down Expand Up @@ -456,7 +456,7 @@ SQL query::


plugins.query.executionengine.spark.auto_index_management.enabled
===============================
=================================================================

Description
-----------
Expand Down Expand Up @@ -492,7 +492,7 @@ SQL query::


plugins.query.executionengine.spark.session.index.ttl
===============================
=====================================================

Description
-----------
Expand Down Expand Up @@ -529,7 +529,7 @@ SQL query::


plugins.query.executionengine.spark.result.index.ttl
===============================
====================================================

Description
-----------
Expand Down Expand Up @@ -565,7 +565,7 @@ SQL query::
}

plugins.query.executionengine.async_query.enabled
===============================
=================================================

Description
-----------
Expand Down Expand Up @@ -596,7 +596,7 @@ Request::
}

plugins.query.executionengine.spark.streamingjobs.housekeeper.interval
===============================
======================================================================

Description
-----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import static org.opensearch.sql.datasources.utils.XContentParserUtils.DESCRIPTION_FIELD;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.NAME_FIELD;
import static org.opensearch.sql.datasources.utils.XContentParserUtils.STATUS_FIELD;
import static org.opensearch.sql.legacy.TestUtils.createIndexByRestClient;
import static org.opensearch.sql.legacy.TestUtils.getResponseBody;
import static org.opensearch.sql.legacy.TestUtils.isIndexExist;
import static org.opensearch.sql.legacy.TestUtils.loadDataByRestClient;

import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
Expand All @@ -37,11 +40,6 @@

public class DataSourceAPIsIT extends PPLIntegTestCase {

@Override
protected void init() throws Exception {
loadIndex(Index.DATASOURCES);
}

@After
public void cleanUp() throws IOException {
wipeAllClusterSettings();
Expand Down Expand Up @@ -397,6 +395,16 @@ public void patchDataSourceAPITest() {
@SneakyThrows
@Test
public void testOldDataSourceModelLoadingThroughGetDataSourcesAPI() {
Index index = Index.DATASOURCES;
String indexName = index.getName();
String mapping = index.getMapping();
String dataSet = index.getDataSet();
if (!isIndexExist(client(), indexName)) {
createIndexByRestClient(client(), indexName, mapping);
}
loadDataByRestClient(client(), indexName, dataSet);
// waiting for loaded indices.
Thread.sleep(1000);
// get datasource to validate the creation.
Request getRequest = getFetchDataSourceRequest(null);
Response getResponse = client().performRequest(getRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,6 @@ public interface EMRServerlessClient {
* @param jobId jobId.
* @return {@link CancelJobRunResult}
*/
CancelJobRunResult cancelJobRun(String applicationId, String jobId);
CancelJobRunResult cancelJobRun(
String applicationId, String jobId, boolean allowExceptionPropagation);
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class EmrServerlessClientImpl implements EMRServerlessClient {

private static final int MAX_JOB_NAME_LENGTH = 255;

private static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";
public static final String GENERIC_INTERNAL_SERVER_ERROR_MESSAGE = "Internal Server Error.";

public EmrServerlessClientImpl(AWSEMRServerless emrServerless) {
this.emrServerless = emrServerless;
Expand Down Expand Up @@ -98,7 +98,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
}

@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
public CancelJobRunResult cancelJobRun(
String applicationId, String jobId, boolean allowExceptionPropagation) {
CancelJobRunRequest cancelJobRunRequest =
new CancelJobRunRequest().withJobRunId(jobId).withApplicationId(applicationId);
CancelJobRunResult cancelJobRunResult =
Expand All @@ -108,10 +109,14 @@ public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
try {
return emrServerless.cancelJobRun(cancelJobRunRequest);
} catch (Throwable t) {
logger.error("Error while making cancel job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
if (allowExceptionPropagation) {
throw t;
} else {
logger.error("Error while making cancel job request to emr:", t);
MetricUtils.incrementNumericalMetric(
MetricName.EMR_CANCEL_JOB_REQUEST_FAILURE_COUNT);
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
}
});
logger.info(String.format("Job : %s cancelled", cancelJobRunResult.getJobRunId()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ protected JSONObject getResponseFromExecutor(AsyncQueryJobMetadata asyncQueryJob
@Override
public String cancelJob(AsyncQueryJobMetadata asyncQueryJobMetadata) {
emrServerlessClient.cancelJobRun(
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId());
asyncQueryJobMetadata.getApplicationId(), asyncQueryJobMetadata.getJobId(), false);
return asyncQueryJobMetadata.getQueryId().getId();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void close() {
if (model.isEmpty()) {
throw new IllegalStateException("session does not exist. " + sessionModel.getSessionId());
} else {
serverlessClient.cancelJobRun(sessionModel.getApplicationId(), sessionModel.getJobId());
serverlessClient.cancelJobRun(
sessionModel.getApplicationId(), sessionModel.getJobId(), false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

package org.opensearch.sql.spark.flint.operation;

import static org.opensearch.sql.spark.client.EmrServerlessClientImpl.GENERIC_INTERNAL_SERVER_ERROR_MESSAGE;
import static org.opensearch.sql.spark.execution.statestore.StateStore.deleteFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.getFlintIndexState;
import static org.opensearch.sql.spark.execution.statestore.StateStore.updateFlintIndexState;

import com.amazonaws.services.emrserverless.model.ValidationException;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -145,11 +147,18 @@ public void cancelStreamingJob(
String jobId = flintIndexStateModel.getJobId();
try {
emrServerlessClient.cancelJobRun(
flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId());
} catch (IllegalArgumentException e) {
// handle job does not exist case.
flintIndexStateModel.getApplicationId(), flintIndexStateModel.getJobId(), true);
} catch (ValidationException e) {
// Exception when the job is not in cancellable state and already in terminal state.
if (e.getMessage().contains("Job run is not in a cancellable state")) {
LOG.error(e);
return;
} else {
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}
} catch (Exception e) {
LOG.error(e);
return;
throw new RuntimeException(GENERIC_INTERNAL_SERVER_ERROR_MESSAGE);
}

// pull job state until timeout or cancelled.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,8 @@ public GetJobRunResult getJobRunResult(String applicationId, String jobId) {
}

@Override
public CancelJobRunResult cancelJobRun(String applicationId, String jobId) {
public CancelJobRunResult cancelJobRun(
String applicationId, String jobId, boolean allowExceptionPropagation) {
cancelJobRunCalled++;
return new CancelJobRunResult().withJobRunId(jobId);
}
Expand Down
Loading
Loading