Skip to content

Commit

Permalink
Fix handler for existing query
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
  • Loading branch information
ykmr1224 committed Sep 4, 2024
1 parent c7ce0ac commit d5e13b4
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public String cancelJob(
@Override
public DispatchQueryResponse submit(
DispatchQueryRequest dispatchQueryRequest, DispatchQueryContext context) {
leaseManager.borrow(new LeaseRequest(JobType.BATCH, dispatchQueryRequest.getDatasource()));
leaseManager.borrow(new LeaseRequest(JobType.REFRESH, dispatchQueryRequest.getDatasource()));

DispatchQueryResponse resp = super.submit(dispatchQueryRequest, context);
DataSourceMetadata dataSourceMetadata = context.getDataSourceMetadata();
Expand All @@ -84,7 +84,7 @@ public DispatchQueryResponse submit(
.resultIndex(resp.getResultIndex())
.sessionId(resp.getSessionId())
.datasourceName(dataSourceMetadata.getName())
.jobType(JobType.BATCH)
.jobType(JobType.REFRESH)
.indexName(context.getIndexQueryDetails().openSearchIndexName())
.status(QueryState.WAITING)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ private AsyncQueryHandler getAsyncQueryHandlerForExistingQuery(
return queryHandlerFactory.getInteractiveQueryHandler();
} else if (IndexDMLHandler.isIndexDMLQuery(asyncQueryJobMetadata.getJobId())) {
return queryHandlerFactory.getIndexDMLHandler();
} else if (asyncQueryJobMetadata.getJobType() == JobType.BATCH) {
} else if (asyncQueryJobMetadata.getJobType() == JobType.REFRESH) {
return queryHandlerFactory.getRefreshQueryHandler(asyncQueryJobMetadata.getAccountId());
} else if (asyncQueryJobMetadata.getJobType() == JobType.STREAMING) {
return queryHandlerFactory.getStreamingQueryHandler(asyncQueryJobMetadata.getAccountId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
public enum JobType {
INTERACTIVE("interactive"),
STREAMING("streaming"),
REFRESH("refresh"),
BATCH("batch");

private String text;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,7 @@ void testDispatchCreateManualRefreshIndexQuery() {
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
assertEquals(expected, startJobRequestArgumentCaptor.getValue());
assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
assertEquals(JobType.BATCH, dispatchQueryResponse.getJobType());
verifyNoInteractions(flintIndexMetadataService);
}

Expand Down Expand Up @@ -757,6 +758,7 @@ void testRefreshIndexQuery() {
verify(emrServerlessClient, times(1)).startJobRun(startJobRequestArgumentCaptor.capture());
assertEquals(expected, startJobRequestArgumentCaptor.getValue());
assertEquals(EMR_JOB_ID, dispatchQueryResponse.getJobId());
assertEquals(JobType.REFRESH, dispatchQueryResponse.getJobType());
verifyNoInteractions(flintIndexMetadataService);
}

Expand Down Expand Up @@ -932,12 +934,7 @@ void testDispatchWithUnSupportedDataSourceType() {

@Test
void testCancelJob() {
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID, false))
.thenReturn(
new CancelJobRunResult()
.withJobRunId(EMR_JOB_ID)
.withApplicationId(EMRS_APPLICATION_ID));
givenCancelJobRunSucceed();

String queryId =
sparkQueryDispatcher.cancelJob(asyncQueryJobMetadata(), asyncQueryRequestContext);
Expand Down Expand Up @@ -997,6 +994,25 @@ void testCancelQueryWithInvalidStatementId() {

@Test
void testCancelQueryWithNoSessionId() {
givenCancelJobRunSucceed();

String queryId =
sparkQueryDispatcher.cancelJob(asyncQueryJobMetadata(), asyncQueryRequestContext);

Assertions.assertEquals(QUERY_ID, queryId);
}

@Test
void testCancelBatchJob() {
givenCancelJobRunSucceed();

String queryId =
sparkQueryDispatcher.cancelJob(asyncQueryJobMetadata(JobType.BATCH), asyncQueryRequestContext);

Assertions.assertEquals(QUERY_ID, queryId);
}

private void givenCancelJobRunSucceed() {
when(emrServerlessClientFactory.getClient(any())).thenReturn(emrServerlessClient);
when(emrServerlessClient.cancelJobRun(EMRS_APPLICATION_ID, EMR_JOB_ID, false))
.thenReturn(
Expand Down Expand Up @@ -1273,11 +1289,16 @@ private DispatchQueryRequest dispatchQueryRequestWithSessionId(String query, Str
}

private AsyncQueryJobMetadata asyncQueryJobMetadata() {
return asyncQueryJobMetadata(JobType.INTERACTIVE);
}

private AsyncQueryJobMetadata asyncQueryJobMetadata(JobType jobType) {
return AsyncQueryJobMetadata.builder()
.queryId(QUERY_ID)
.applicationId(EMRS_APPLICATION_ID)
.jobId(EMR_JOB_ID)
.datasourceName(MY_GLUE)
.jobType(jobType)
.build();
}

Expand Down

0 comments on commit d5e13b4

Please sign in to comment.