Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger committed Jul 17, 2024
1 parent 3d70c7d commit 8db87b0
Show file tree
Hide file tree
Showing 7 changed files with 456 additions and 37 deletions.
2 changes: 1 addition & 1 deletion async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ singleCompoundStatement
;

beginEndCompoundBlock
: BEGIN compoundBody END
: beginLabel? BEGIN compoundBody END endLabel?
;

compoundBody
Expand Down
2 changes: 1 addition & 1 deletion async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ repositories {


dependencies {
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
implementation "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"

api project(':core')
api project(':async-query-core')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@
import org.opensearch.common.xcontent.json.JsonXContent;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.core.xcontent.XContentParserUtils;
import org.opensearch.index.engine.DocumentMissingException;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
import org.opensearch.jobscheduler.spi.schedule.ScheduleParser;
import org.opensearch.sql.spark.scheduler.exceptions.AsyncQuerySchedulerException;
import org.opensearch.sql.spark.scheduler.job.OpenSearchRefreshIndexJob;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;
import org.opensearch.threadpool.ThreadPool;
Expand Down Expand Up @@ -82,13 +82,13 @@ public void scheduleJob(OpenSearchRefreshIndexJobRequest request) {
} catch (VersionConflictEngineException exception) {
throw new IllegalArgumentException("A job already exists with name: " + request.getName());
} catch (Exception e) {
throw new AsyncQuerySchedulerException(e);
throw new RuntimeException(e);
}

if (indexResponse.getResult().equals(DocWriteResponse.Result.CREATED)) {
LOG.debug("Job : {} successfully created", request.getName());
} else {
throw new AsyncQuerySchedulerException(
throw new RuntimeException(
"Schedule job failed with result : " + indexResponse.getResult().getLowercase());
}
}
Expand All @@ -115,15 +115,21 @@ public void updateJob(OpenSearchRefreshIndexJobRequest request) throws IOExcepti
UpdateRequest updateRequest = new UpdateRequest(SCHEDULER_INDEX_NAME, request.getName());
updateRequest.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE);
updateRequest.doc(request.toXContent(JsonXContent.contentBuilder(), EMPTY_PARAMS));

ActionFuture<UpdateResponse> updateResponseActionFuture = client.update(updateRequest);
UpdateResponse updateResponse = updateResponseActionFuture.actionGet();
UpdateResponse updateResponse;
try {
ActionFuture<UpdateResponse> updateResponseActionFuture = client.update(updateRequest);
updateResponse = updateResponseActionFuture.actionGet();
} catch (DocumentMissingException exception) {
throw new IllegalArgumentException("Job with name: " + request.getName() + " doesn't exist");
} catch (Exception e) {
throw new RuntimeException(e);
}

if (updateResponse.getResult().equals(DocWriteResponse.Result.UPDATED)
|| updateResponse.getResult().equals(DocWriteResponse.Result.NOOP)) {
LOG.debug("Job : {} successfully updated", request.getName());
} else {
throw new AsyncQuerySchedulerException(
throw new RuntimeException(
"Update job failed with result : " + updateResponse.getResult().getLowercase());
}
}
Expand All @@ -141,15 +147,15 @@ public void removeJob(String jobId) {
if (deleteResponse.getResult().equals(DocWriteResponse.Result.DELETED)) {
LOG.debug("Job : {} successfully deleted", jobId);
} else if (deleteResponse.getResult().equals(DocWriteResponse.Result.NOT_FOUND)) {
throw new AsyncQuerySchedulerException("Job : " + jobId + " doesn't exist");
throw new IllegalArgumentException("Job : " + jobId + " doesn't exist");
} else {
throw new AsyncQuerySchedulerException(
throw new RuntimeException(
"Remove job failed with result : " + deleteResponse.getResult().getLowercase());
}
}

/** Creates the async query scheduler index with specified mappings and settings. */
private void createAsyncQuerySchedulerIndex() {
void createAsyncQuerySchedulerIndex() {
try {
InputStream mappingFileStream =
OpenSearchAsyncQueryScheduler.class
Expand All @@ -171,11 +177,11 @@ private void createAsyncQuerySchedulerIndex() {
if (createIndexResponse.isAcknowledged()) {
LOG.debug("Index: {} creation Acknowledged", SCHEDULER_INDEX_NAME);
} else {
throw new AsyncQuerySchedulerException("Index creation is not acknowledged.");
throw new RuntimeException("Index creation is not acknowledged.");
}
} catch (Throwable e) {
LOG.error("Error creating index: {}", SCHEDULER_INDEX_NAME, e);
throw new AsyncQuerySchedulerException(
throw new RuntimeException(
"Internal server error while creating "
+ SCHEDULER_INDEX_NAME
+ " index: "
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.opensearch.sql.spark.scheduler.OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME;

import java.io.IOException;
import java.time.Instant;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Answers;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;
import org.opensearch.threadpool.ThreadPool;

public class OpenSearchAsyncQuerySchedulerTest {

private static final String TEST_SCHEDULER_INDEX_NAME = "testQS";

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private Client client;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ClusterService clusterService;

@Mock(answer = Answers.RETURNS_DEEP_STUBS)
private ThreadPool threadPool;

@Mock private ActionFuture<IndexResponse> indexResponseActionFuture;

@Mock private ActionFuture<UpdateResponse> updateResponseActionFuture;

@Mock private ActionFuture<DeleteResponse> deleteResponseActionFuture;

@Mock private ActionFuture<CreateIndexResponse> createIndexResponseActionFuture;

@Mock private IndexResponse indexResponse;

@Mock private UpdateResponse updateResponse;

private OpenSearchAsyncQueryScheduler scheduler;

@BeforeEach
public void setup() {
MockitoAnnotations.openMocks(this);
scheduler = new OpenSearchAsyncQueryScheduler();
scheduler.loadJobResource(client, clusterService, threadPool);
}

@Test
public void testScheduleJob() {
when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME))
.thenReturn(Boolean.FALSE);
when(client.admin().indices().create(any(CreateIndexRequest.class)))
.thenReturn(createIndexResponseActionFuture);
when(createIndexResponseActionFuture.actionGet())
.thenReturn(new CreateIndexResponse(true, true, TEST_SCHEDULER_INDEX_NAME));
when(client.index(any(IndexRequest.class))).thenReturn(indexResponseActionFuture);
when(indexResponseActionFuture.actionGet()).thenReturn(indexResponse);
when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.CREATED);

OpenSearchRefreshIndexJobRequest request =
OpenSearchRefreshIndexJobRequest.builder()
.jobName("testJob")
.lastUpdateTime(Instant.now())
.build();

scheduler.scheduleJob(request);

// Verify index created
verify(client.admin().indices(), Mockito.times(1)).create(ArgumentMatchers.any());

// Verify doc indexed
ArgumentCaptor<IndexRequest> captor = ArgumentCaptor.forClass(IndexRequest.class);
verify(client, Mockito.times(1)).index(captor.capture());
IndexRequest capturedRequest = captor.getValue();
assertEquals(request.getName(), capturedRequest.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy());
}

@Test
public void testScheduleJobWithExistingJob() {
when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME))
.thenReturn(Boolean.TRUE);

OpenSearchRefreshIndexJobRequest request =
OpenSearchRefreshIndexJobRequest.builder()
.jobName("testJob")
.lastUpdateTime(Instant.now())
.build();

when(client.index(any(IndexRequest.class))).thenThrow(VersionConflictEngineException.class);

IllegalArgumentException exception =
assertThrows(
IllegalArgumentException.class,
() -> {
scheduler.scheduleJob(request);
});

verify(client, Mockito.times(1)).index(ArgumentCaptor.forClass(IndexRequest.class).capture());
assertEquals("A job already exists with name: testJob", exception.getMessage());
}

@Test
public void testUnscheduleJob() throws IOException {
String jobId = "testJob";

when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true);

when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse);
when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.UPDATED);

when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture);

scheduler.unscheduleJob(jobId);

ArgumentCaptor<UpdateRequest> captor = ArgumentCaptor.forClass(UpdateRequest.class);
verify(client).update(captor.capture());

UpdateRequest capturedRequest = captor.getValue();
assertEquals(jobId, capturedRequest.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy());
}

@Test
public void testUpdateJob() throws IOException {
OpenSearchRefreshIndexJobRequest request =
OpenSearchRefreshIndexJobRequest.builder()
.jobName("testJob")
.lastUpdateTime(Instant.now())
.build();

when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true);

when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse);
when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.UPDATED);

when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture);

scheduler.updateJob(request);

ArgumentCaptor<UpdateRequest> captor = ArgumentCaptor.forClass(UpdateRequest.class);
verify(client).update(captor.capture());

UpdateRequest capturedRequest = captor.getValue();
assertEquals(request.getName(), capturedRequest.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy());
}

@Test
public void testRemoveJob() {
String jobId = "testJob";

when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(true);

DeleteResponse deleteResponse = mock(DeleteResponse.class);
when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse);
when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.DELETED);

when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture);

scheduler.removeJob(jobId);

ArgumentCaptor<DeleteRequest> captor = ArgumentCaptor.forClass(DeleteRequest.class);
verify(client).delete(captor.capture());

DeleteRequest capturedRequest = captor.getValue();
assertEquals(jobId, capturedRequest.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy());
}

@Test
public void testCreateAsyncQuerySchedulerIndex() {
when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false);

CreateIndexResponse createIndexResponse = mock(CreateIndexResponse.class);
when(createIndexResponseActionFuture.actionGet()).thenReturn(createIndexResponse);
when(createIndexResponse.isAcknowledged()).thenReturn(true);

when(client.admin().indices().create(any(CreateIndexRequest.class)))
.thenReturn(createIndexResponseActionFuture);

scheduler.createAsyncQuerySchedulerIndex();

ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
verify(client.admin().indices()).create(captor.capture());

CreateIndexRequest capturedRequest = captor.getValue();
assertEquals(SCHEDULER_INDEX_NAME, capturedRequest.index());
}

@Test
public void testCreateAsyncQuerySchedulerIndexFailure() throws IOException {
when(clusterService.state().routingTable().hasIndex(SCHEDULER_INDEX_NAME)).thenReturn(false);

when(client.admin().indices().create(any(CreateIndexRequest.class)))
.thenThrow(new RuntimeException("Error creating index"));

RuntimeException exception =
assertThrows(
RuntimeException.class,
() -> {
scheduler.createAsyncQuerySchedulerIndex();
});

assertEquals(
"Internal server error while creating .async-query-scheduler index: Error creating index",
exception.getMessage());
}
}
Loading

0 comments on commit 8db87b0

Please sign in to comment.