Skip to content

Commit

Permalink
Eliminate dependency from async-query-core to legacy (#2786)
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <moritato@amazon.com>
  • Loading branch information
ykmr1224 committed Jun 28, 2024
1 parent 883cc7e commit 00f82f5
Show file tree
Hide file tree
Showing 11 changed files with 72 additions and 66 deletions.
1 change: 0 additions & 1 deletion async-query-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ dependencies {
implementation project(':core')
implementation project(':spark') // TODO: dependency to spark should be eliminated
implementation project(':datasources') // TODO: dependency to datasources should be eliminated
implementation project(':legacy') // TODO: dependency to legacy should be eliminated
implementation 'org.json:json:20231013'
implementation 'com.google.code.gson:gson:2.8.9'

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@

package org.opensearch.sql.spark.rest.model;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

import java.io.IOException;
import lombok.Data;
import org.apache.commons.lang3.Validate;
import org.opensearch.core.xcontent.XContentParser;

@Data
public class CreateAsyncQueryRequest {
Expand All @@ -32,35 +28,4 @@ public CreateAsyncQueryRequest(String query, String datasource, LangType lang, S
this.lang = Validate.notNull(lang, "lang can't be null");
this.sessionId = sessionId;
}

public static CreateAsyncQueryRequest fromXContentParser(XContentParser parser)
throws IOException {
String query = null;
LangType lang = null;
String datasource = null;
String sessionId = null;
try {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
if (fieldName.equals("query")) {
query = parser.textOrNull();
} else if (fieldName.equals("lang")) {
String langString = parser.textOrNull();
lang = LangType.fromString(langString);
} else if (fieldName.equals("datasource")) {
datasource = parser.textOrNull();
} else if (fieldName.equals("sessionId")) {
sessionId = parser.textOrNull();
} else {
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
return new CreateAsyncQueryRequest(query, datasource, lang, sessionId);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Error while parsing the request body: %s", e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@

package org.opensearch.sql.spark.client;

import static java.util.Collections.emptyList;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -32,37 +30,22 @@
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.sql.common.setting.Settings;
import org.opensearch.sql.legacy.esdomain.LocalClusterState;
import org.opensearch.sql.legacy.metrics.Metrics;
import org.opensearch.sql.opensearch.setting.OpenSearchSettings;
import org.opensearch.sql.spark.asyncquery.model.SparkSubmitParameters;
import org.opensearch.sql.spark.metrics.MetricsService;

@ExtendWith(MockitoExtension.class)
public class EmrServerlessClientImplTest {
@Mock private AWSEMRServerless emrServerless;
@Mock private OpenSearchSettings settings;
@Mock private MetricsService metricsService;

@Captor private ArgumentCaptor<StartJobRunRequest> startJobRunRequestArgumentCaptor;

@BeforeEach
public void setUp() {
doReturn(emptyList()).when(settings).getSettings();
when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_INTERVAL)).thenReturn(3600L);
when(settings.getSettingValue(Settings.Key.METRICS_ROLLING_WINDOW)).thenReturn(600L);
LocalClusterState.state().setPluginSettings(settings);
Metrics.getInstance().registerDefaultMetrics();
}

@Test
void testStartJobRun() {
StartJobRunResult response = new StartJobRunResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.sql.spark.transport.TransportCancelAsyncQueryRequestAction;
import org.opensearch.sql.spark.transport.TransportCreateAsyncQueryRequestAction;
import org.opensearch.sql.spark.transport.TransportGetAsyncQueryResultAction;
import org.opensearch.sql.spark.transport.format.CreateAsyncQueryRequestConverter;
import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionRequest;
import org.opensearch.sql.spark.transport.model.CancelAsyncQueryActionResponse;
import org.opensearch.sql.spark.transport.model.CreateAsyncQueryActionRequest;
Expand Down Expand Up @@ -119,7 +120,7 @@ private RestChannelConsumer executePostRequest(RestRequest restRequest, NodeClie
try {
MetricUtils.incrementNumericalMetric(MetricName.ASYNC_QUERY_CREATE_API_REQUEST_COUNT);
CreateAsyncQueryRequest submitJobRequest =
CreateAsyncQueryRequest.fromXContentParser(restRequest.contentParser());
CreateAsyncQueryRequestConverter.fromXContentParser(restRequest.contentParser());
Scheduler.schedule(
nodeClient,
() ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService;
import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorServiceImpl;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryResult;
import org.opensearch.sql.spark.transport.format.AsyncQueryResultResponseFormatter;
import org.opensearch.sql.spark.transport.model.AsyncQueryResult;
import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionRequest;
import org.opensearch.sql.spark.transport.model.GetAsyncQueryResultActionResponse;
import org.opensearch.tasks.Task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.opensearch.core.common.Strings;
import org.opensearch.sql.protocol.response.QueryResult;
import org.opensearch.sql.protocol.response.format.JsonResponseFormatter;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryResult;
import org.opensearch.sql.spark.transport.model.AsyncQueryResult;

/**
* JSON response format with schema header and data rows. For example,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.transport.format;

import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken;

import lombok.experimental.UtilityClass;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.LangType;

@UtilityClass
public class CreateAsyncQueryRequestConverter {
public static CreateAsyncQueryRequest fromXContentParser(XContentParser parser) {
String query = null;
LangType lang = null;
String datasource = null;
String sessionId = null;
try {
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
while (parser.nextToken() != XContentParser.Token.END_OBJECT) {
String fieldName = parser.currentName();
parser.nextToken();
if (fieldName.equals("query")) {
query = parser.textOrNull();
} else if (fieldName.equals("lang")) {
String langString = parser.textOrNull();
lang = LangType.fromString(langString);
} else if (fieldName.equals("datasource")) {
datasource = parser.textOrNull();
} else if (fieldName.equals("sessionId")) {
sessionId = parser.textOrNull();
} else {
throw new IllegalArgumentException("Unknown field: " + fieldName);
}
}
return new CreateAsyncQueryRequest(query, datasource, lang, sessionId);
} catch (Exception e) {
throw new IllegalArgumentException(
String.format("Error while parsing the request body: %s", e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
package org.opensearch.sql.spark.asyncquery.model;
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.transport.model;

import java.util.Collection;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.opensearch.sql.protocol.response.format.ResponseFormatter;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryExecutionResponse;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryRequestContext;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryResult;
import org.opensearch.sql.spark.asyncquery.model.MockFlintSparkJob;
import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext;
import org.opensearch.sql.spark.client.EMRServerlessClientFactory;
Expand All @@ -38,6 +37,7 @@
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryResponse;
import org.opensearch.sql.spark.rest.model.LangType;
import org.opensearch.sql.spark.transport.format.AsyncQueryResultResponseFormatter;
import org.opensearch.sql.spark.transport.model.AsyncQueryResult;

public class AsyncQueryGetResultSpecTest extends AsyncQueryExecutorServiceSpec {
AsyncQueryRequestContext asyncQueryRequestContext = new NullAsyncQueryRequestContext();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.transport.format;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -11,7 +16,7 @@
import java.util.Arrays;
import org.junit.jupiter.api.Test;
import org.opensearch.sql.executor.ExecutionEngine;
import org.opensearch.sql.spark.asyncquery.model.AsyncQueryResult;
import org.opensearch.sql.spark.transport.model.AsyncQueryResult;

public class AsyncQueryResultResponseFormatterTest {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.spark.rest.model;
package org.opensearch.sql.spark.transport.format;

import java.io.IOException;
import org.junit.jupiter.api.Assertions;
Expand All @@ -12,8 +12,10 @@
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest;
import org.opensearch.sql.spark.rest.model.LangType;

public class CreateAsyncQueryRequestTest {
public class CreateAsyncQueryRequestConverterTest {

@Test
public void fromXContent() throws IOException {
Expand All @@ -24,7 +26,7 @@ public void fromXContent() throws IOException {
+ " \"query\": \"select 1\"\n"
+ "}";
CreateAsyncQueryRequest queryRequest =
CreateAsyncQueryRequest.fromXContentParser(xContentParser(request));
CreateAsyncQueryRequestConverter.fromXContentParser(xContentParser(request));
Assertions.assertEquals("my_glue", queryRequest.getDatasource());
Assertions.assertEquals(LangType.SQL, queryRequest.getLang());
Assertions.assertEquals("select 1", queryRequest.getQuery());
Expand All @@ -48,7 +50,7 @@ public void fromXContentWithDuplicateFields() throws IOException {
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> CreateAsyncQueryRequest.fromXContentParser(xContentParser(request)));
() -> CreateAsyncQueryRequestConverter.fromXContentParser(xContentParser(request)));
Assertions.assertTrue(
illegalArgumentException
.getMessage()
Expand All @@ -67,7 +69,7 @@ public void fromXContentWithUnknownField() throws IOException {
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> CreateAsyncQueryRequest.fromXContentParser(xContentParser(request)));
() -> CreateAsyncQueryRequestConverter.fromXContentParser(xContentParser(request)));
Assertions.assertEquals(
"Error while parsing the request body: Unknown field: random",
illegalArgumentException.getMessage());
Expand All @@ -81,7 +83,7 @@ public void fromXContentWithWrongDatatype() throws IOException {
IllegalArgumentException illegalArgumentException =
Assertions.assertThrows(
IllegalArgumentException.class,
() -> CreateAsyncQueryRequest.fromXContentParser(xContentParser(request)));
() -> CreateAsyncQueryRequestConverter.fromXContentParser(xContentParser(request)));
Assertions.assertEquals(
"Error while parsing the request body: Can't get text on a START_ARRAY at 1:16",
illegalArgumentException.getMessage());
Expand All @@ -97,7 +99,7 @@ public void fromXContentWithSessionId() throws IOException {
+ " \"sessionId\": \"00fdjevgkf12s00q\"\n"
+ "}";
CreateAsyncQueryRequest queryRequest =
CreateAsyncQueryRequest.fromXContentParser(xContentParser(request));
CreateAsyncQueryRequestConverter.fromXContentParser(xContentParser(request));
Assertions.assertEquals("00fdjevgkf12s00q", queryRequest.getSessionId());
}

Expand Down

0 comments on commit 00f82f5

Please sign in to comment.