Skip to content

Commit

Permalink
Add unit test
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <clingzhi@amazon.com>
  • Loading branch information
noCharger committed Jul 17, 2024
1 parent 3d70c7d commit d7e0c95
Show file tree
Hide file tree
Showing 6 changed files with 497 additions and 2 deletions.
2 changes: 1 addition & 1 deletion async-query-core/src/main/antlr/SqlBaseParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ singleCompoundStatement
;

beginEndCompoundBlock
: BEGIN compoundBody END
: beginLabel? BEGIN compoundBody END endLabel?
;

compoundBody
Expand Down
1 change: 1 addition & 0 deletions async-query/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ repositories {

dependencies {
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"
testCompileOnly "org.opensearch:opensearch-job-scheduler-spi:${opensearch_build}"

api project(':core')
api project(':async-query-core')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ public void removeJob(String jobId) {
}

/** Creates the async query scheduler index with specified mappings and settings. */
private void createAsyncQuerySchedulerIndex() {
void createAsyncQuerySchedulerIndex() {
try {
InputStream mappingFileStream =
OpenSearchAsyncQueryScheduler.class
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.scheduler;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.time.Instant;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.action.DocWriteResponse;
import org.opensearch.action.admin.indices.create.CreateIndexRequest;
import org.opensearch.action.admin.indices.create.CreateIndexResponse;
import org.opensearch.action.delete.DeleteRequest;
import org.opensearch.action.delete.DeleteResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.action.ActionFuture;
import org.opensearch.index.engine.VersionConflictEngineException;
import org.opensearch.sql.spark.scheduler.model.OpenSearchRefreshIndexJobRequest;

public class OpenSearchAsyncQuerySchedulerTest {

@Mock private Client client;

@Mock private ClusterService clusterService;

@Mock private ActionFuture<IndexResponse> indexResponseActionFuture;

@Mock private ActionFuture<UpdateResponse> updateResponseActionFuture;

@Mock private ActionFuture<DeleteResponse> deleteResponseActionFuture;

@Mock private ActionFuture<CreateIndexResponse> createIndexResponseActionFuture;

private OpenSearchAsyncQueryScheduler scheduler;

@BeforeEach
public void setup() {
MockitoAnnotations.initMocks(this);
scheduler = new OpenSearchAsyncQueryScheduler();
scheduler.loadJobResource(client, clusterService, null);
}

@Test
public void testScheduleJob() throws IOException {
OpenSearchRefreshIndexJobRequest request =
OpenSearchRefreshIndexJobRequest.builder()
.jobName("testJob")
.lastUpdateTime(Instant.now())
.build();

IndexResponse indexResponse = mock(IndexResponse.class);
when(indexResponseActionFuture.actionGet()).thenReturn(indexResponse);
when(indexResponse.getResult()).thenReturn(DocWriteResponse.Result.CREATED);

when(client.index(any(IndexRequest.class))).thenReturn(indexResponseActionFuture);

scheduler.scheduleJob(request);

ArgumentCaptor<IndexRequest> captor = ArgumentCaptor.forClass(IndexRequest.class);
verify(client).index(captor.capture());

IndexRequest capturedRequest = captor.getValue();
assertEquals(request.getName(), capturedRequest.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy());
}

@Test
public void testScheduleJobWithExistingJob() throws IOException {
OpenSearchRefreshIndexJobRequest request =
OpenSearchRefreshIndexJobRequest.builder()
.jobName("testJob")
.lastUpdateTime(Instant.now())
.build();

when(client.index(any(IndexRequest.class))).thenThrow(VersionConflictEngineException.class);

IllegalArgumentException exception =
assertThrows(
IllegalArgumentException.class,
() -> {
scheduler.scheduleJob(request);
});

assertEquals("A job already exists with name: testJob", exception.getMessage());
}

@Test
public void testUnscheduleJob() throws IOException {
String jobId = "testJob";

when(clusterService
.state()
.routingTable()
.hasIndex(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME))
.thenReturn(true);

UpdateResponse updateResponse = mock(UpdateResponse.class);
when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse);
when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.UPDATED);

when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture);

scheduler.unscheduleJob(jobId);

ArgumentCaptor<UpdateRequest> captor = ArgumentCaptor.forClass(UpdateRequest.class);
verify(client).update(captor.capture());

UpdateRequest capturedRequest = captor.getValue();
assertEquals(jobId, capturedRequest.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy());
}

@Test
public void testUpdateJob() throws IOException {
OpenSearchRefreshIndexJobRequest request =
OpenSearchRefreshIndexJobRequest.builder()
.jobName("testJob")
.lastUpdateTime(Instant.now())
.build();

when(clusterService
.state()
.routingTable()
.hasIndex(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME))
.thenReturn(true);

UpdateResponse updateResponse = mock(UpdateResponse.class);
when(updateResponseActionFuture.actionGet()).thenReturn(updateResponse);
when(updateResponse.getResult()).thenReturn(DocWriteResponse.Result.UPDATED);

when(client.update(any(UpdateRequest.class))).thenReturn(updateResponseActionFuture);

scheduler.updateJob(request);

ArgumentCaptor<UpdateRequest> captor = ArgumentCaptor.forClass(UpdateRequest.class);
verify(client).update(captor.capture());

UpdateRequest capturedRequest = captor.getValue();
assertEquals(request.getName(), capturedRequest.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy());
}

@Test
public void testRemoveJob() {
String jobId = "testJob";

when(clusterService
.state()
.routingTable()
.hasIndex(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME))
.thenReturn(true);

DeleteResponse deleteResponse = mock(DeleteResponse.class);
when(deleteResponseActionFuture.actionGet()).thenReturn(deleteResponse);
when(deleteResponse.getResult()).thenReturn(DocWriteResponse.Result.DELETED);

when(client.delete(any(DeleteRequest.class))).thenReturn(deleteResponseActionFuture);

scheduler.removeJob(jobId);

ArgumentCaptor<DeleteRequest> captor = ArgumentCaptor.forClass(DeleteRequest.class);
verify(client).delete(captor.capture());

DeleteRequest capturedRequest = captor.getValue();
assertEquals(jobId, capturedRequest.id());
assertEquals(WriteRequest.RefreshPolicy.IMMEDIATE, capturedRequest.getRefreshPolicy());
}

@Test
public void testCreateAsyncQuerySchedulerIndex() throws IOException {
when(clusterService
.state()
.routingTable()
.hasIndex(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME))
.thenReturn(false);

CreateIndexResponse createIndexResponse = mock(CreateIndexResponse.class);
when(createIndexResponseActionFuture.actionGet()).thenReturn(createIndexResponse);
when(createIndexResponse.isAcknowledged()).thenReturn(true);

when(client.admin().indices().create(any(CreateIndexRequest.class)))
.thenReturn(createIndexResponseActionFuture);

scheduler.createAsyncQuerySchedulerIndex();

ArgumentCaptor<CreateIndexRequest> captor = ArgumentCaptor.forClass(CreateIndexRequest.class);
verify(client.admin().indices()).create(captor.capture());

CreateIndexRequest capturedRequest = captor.getValue();
assertEquals(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME, capturedRequest.index());
}

@Test
public void testCreateAsyncQuerySchedulerIndexFailure() throws IOException {
when(clusterService
.state()
.routingTable()
.hasIndex(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME))
.thenReturn(false);

when(client.admin().indices().create(any(CreateIndexRequest.class)))
.thenThrow(new RuntimeException("Error creating index"));

RuntimeException exception =
assertThrows(
RuntimeException.class,
() -> {
scheduler.createAsyncQuerySchedulerIndex();
});

assertEquals(
"Internal server error while creating .async-query-scheduler index: Error creating index",
exception.getMessage());
}
}
Loading

0 comments on commit d7e0c95

Please sign in to comment.