Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set up Rest Integration Test framework #251

Merged
merged 19 commits into from
Dec 19, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,13 @@ integTest {
systemProperty "user", System.getProperty("user")
systemProperty "password", System.getProperty("password")

// Only rest case can run with remote cluster
if (System.getProperty("tests.rest.cluster") != null) {
filter {
includeTestsMatching "org.opensearch.flowframework.rest.*IT"
}
}


// doFirst delays this block until execution time
doFirst {
Expand Down Expand Up @@ -263,6 +270,25 @@ testClusters.integTest {
}
}

// Remote Integration Tests
task integTestRemote(type: RestIntegTestTask) {
testClassesDirs = sourceSets.test.output.classesDirs
classpath = sourceSets.test.runtimeClasspath

systemProperty "https", System.getProperty("https")
systemProperty "user", System.getProperty("user")
systemProperty "password", System.getProperty("password")
systemProperty 'cluster.number_of_nodes', "${_numNodes}"
systemProperty 'tests.security.manager', 'false'
joshpalis marked this conversation as resolved.
Show resolved Hide resolved

// Run tests with remote cluster only if rest case is defined
if (System.getProperty("tests.rest.cluster") != null) {
filter {
includeTestsMatching "org.opensearch.flowframework.rest.*IT"
}
}
}

// Automatically sets up the integration test cluster locally
run {
useCluster testClusters.integTest
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,336 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.flowframework;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.core5.function.Factory;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.apache.hc.core5.util.Timeout;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.common.io.PathUtils;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.commons.rest.SecureRestClientBuilder;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.core.xcontent.DeprecationHandler;
import org.opensearch.core.xcontent.MediaType;
import org.opensearch.core.xcontent.NamedXContentRegistry;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.flowframework.model.Template;
import org.opensearch.test.rest.OpenSearchRestTestCase;
import org.junit.After;
import org.junit.Before;

import javax.net.ssl.SSLEngine;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_PER_ROUTE;
import static org.opensearch.client.RestClientBuilder.DEFAULT_MAX_CONN_TOTAL;
import static org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_ENABLED;
import static org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_FILEPATH;
import static org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_KEYPASSWORD;
import static org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_PASSWORD;
import static org.opensearch.commons.ConfigConstants.OPENSEARCH_SECURITY_SSL_HTTP_PEMCERT_FILEPATH;
import static org.opensearch.flowframework.common.CommonValue.WORKFLOW_URI;

/**
* Base rest integration test class, supports security enabled/disabled cluster
*/
public abstract class FlowFrameworkRestTestCase extends OpenSearchRestTestCase {

@Before
public void setUpSettings() throws IOException {

// Enable Flow Framework Plugin Rest APIs
Response response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"transient\":{\"plugins.flow_framework.enabled\":true}}",
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Enable ML Commons to run on non-ml nodes
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.only_run_on_ml_node\":false}}",
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

// Enable local model registration via URL
response = TestHelpers.makeRequest(
client(),
"PUT",
"_cluster/settings",
null,
"{\"persistent\":{\"plugins.ml_commons.allow_registering_model_via_url\":true}}",
ImmutableList.of(new BasicHeader(HttpHeaders.USER_AGENT, ""))
);
assertEquals(200, response.getStatusLine().getStatusCode());

}

protected boolean isHttps() {
boolean isHttps = Optional.ofNullable(System.getProperty("https")).map("true"::equalsIgnoreCase).orElse(false);
if (isHttps) {
// currently only external cluster is supported for security enabled testing
if (!Optional.ofNullable(System.getProperty("tests.rest.cluster")).isPresent()) {
throw new RuntimeException("cluster url should be provided for security enabled testing");
}
}

return isHttps;
}

@Override
protected String getProtocol() {
return isHttps() ? "https" : "http";
}

@Override
protected Settings restAdminSettings() {
return Settings.builder()
// disable the warning exception for admin client since it's only used for cleanup.
.put("strictDeprecationMode", false)
.put("http.port", 9200)
.put(OPENSEARCH_SECURITY_SSL_HTTP_ENABLED, isHttps())
.put(OPENSEARCH_SECURITY_SSL_HTTP_PEMCERT_FILEPATH, "sample.pem")
.put(OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_FILEPATH, "test-kirk.jks")
.put(OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_PASSWORD, "changeit")
.put(OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_KEYPASSWORD, "changeit")
.build();
}

// Utility fn for deleting indices. Should only be used when not allowed in a regular context
// (e.g., deleting system indices)
protected static void deleteIndexWithAdminClient(String name) throws IOException {
Request request = new Request("DELETE", "/" + name);
adminClient().performRequest(request);
}

// Utility fn for checking if an index exists. Should only be used when not allowed in a regular context
// (e.g., checking existence of system indices)
protected static boolean indexExistsWithAdminClient(String indexName) throws IOException {
Request request = new Request("HEAD", "/" + indexName);
Response response = adminClient().performRequest(request);
return RestStatus.OK.getStatus() == response.getStatusLine().getStatusCode();
}

@Override
protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOException {
boolean strictDeprecationMode = settings.getAsBoolean("strictDeprecationMode", true);
RestClientBuilder builder = RestClient.builder(hosts);
if (isHttps()) {
String keystore = settings.get(OPENSEARCH_SECURITY_SSL_HTTP_KEYSTORE_FILEPATH);
if (Objects.nonNull(keystore)) {
URI uri = null;
try {
uri = this.getClass().getClassLoader().getResource("security/sample.pem").toURI();
} catch (URISyntaxException e) {
throw new RuntimeException(e);
}
Path configPath = PathUtils.get(uri).getParent().toAbsolutePath();
return new SecureRestClientBuilder(settings, configPath).build();
} else {
configureHttpsClient(builder, settings);
builder.setStrictDeprecationMode(strictDeprecationMode);
return builder.build();
}

} else {
configureClient(builder, settings);
builder.setStrictDeprecationMode(strictDeprecationMode);
return builder.build();
}

}

@SuppressWarnings("unchecked")
@After
protected void wipeAllODFEIndices() throws IOException {
owaiskazi19 marked this conversation as resolved.
Show resolved Hide resolved
Response response = adminClient().performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all"));
MediaType xContentType = MediaType.fromMediaType(response.getEntity().getContentType());
try (
XContentParser parser = xContentType.xContent()
.createParser(
NamedXContentRegistry.EMPTY,
DeprecationHandler.THROW_UNSUPPORTED_OPERATION,
response.getEntity().getContent()
)
) {
XContentParser.Token token = parser.nextToken();
List<Map<String, Object>> parserList = null;
if (token == XContentParser.Token.START_ARRAY) {
parserList = parser.listOrderedMap().stream().map(obj -> (Map<String, Object>) obj).collect(Collectors.toList());
} else {
parserList = Collections.singletonList(parser.mapOrdered());
}

for (Map<String, Object> index : parserList) {
String indexName = (String) index.get("index");
if (indexName != null && !".opendistro_security".equals(indexName)) {
adminClient().performRequest(new Request("DELETE", "/" + indexName));
}
}
}
}

protected static void configureHttpsClient(RestClientBuilder builder, Settings settings) throws IOException {
Map<String, String> headers = ThreadContext.buildDefaultHeaders(settings);
Header[] defaultHeaders = new Header[headers.size()];
int i = 0;
for (Map.Entry<String, String> entry : headers.entrySet()) {
defaultHeaders[i++] = new BasicHeader(entry.getKey(), entry.getValue());
}
builder.setDefaultHeaders(defaultHeaders);
builder.setHttpClientConfigCallback(httpClientBuilder -> {
String userName = Optional.ofNullable(System.getProperty("user"))
.orElseThrow(() -> new RuntimeException("user name is missing"));
String password = Optional.ofNullable(System.getProperty("password"))
.orElseThrow(() -> new RuntimeException("password is missing"));
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
final AuthScope anyScope = new AuthScope(null, -1);
credentialsProvider.setCredentials(anyScope, new UsernamePasswordCredentials(userName, password.toCharArray()));
try {
final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder.create()
.setHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setSslContext(SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build())
// See https://issues.apache.org/jira/browse/HTTPCLIENT-2219
.setTlsDetailsFactory(new Factory<SSLEngine, TlsDetails>() {
@Override
public TlsDetails create(final SSLEngine sslEngine) {
return new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol());
}
})
.build();
final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder.create()
.setMaxConnPerRoute(DEFAULT_MAX_CONN_PER_ROUTE)
.setMaxConnTotal(DEFAULT_MAX_CONN_TOTAL)
.setTlsStrategy(tlsStrategy)
.build();
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setConnectionManager(connectionManager);
} catch (Exception e) {
throw new RuntimeException(e);
}
});

final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT);
final TimeValue socketTimeout = TimeValue.parseTimeValue(
socketTimeoutString == null ? "60s" : socketTimeoutString,
CLIENT_SOCKET_TIMEOUT
);
builder.setRequestConfigCallback(conf -> {
Timeout timeout = Timeout.ofMilliseconds(Math.toIntExact(socketTimeout.getMillis()));
conf.setConnectTimeout(timeout);
conf.setResponseTimeout(timeout);
return conf;
});
if (settings.hasValue(CLIENT_PATH_PREFIX)) {
builder.setPathPrefix(settings.get(CLIENT_PATH_PREFIX));
}
}

/**
* wipeAllIndices won't work since it cannot delete security index. Use wipeAllODFEIndices instead.
*/
@Override
protected boolean preserveIndicesUponCompletion() {
return true;
}

/**
* Helper method to invoke the Create Workflow Rest Action
* @param template the template to create
* @throws Exception if the request fails
* @return a rest response
*/
protected Response createWorkflow(Template template) throws Exception {
return TestHelpers.makeRequest(client(), "POST", WORKFLOW_URI, ImmutableMap.of(), template.toJson(), null);
}

/**
* Helper method to invoke the Create Workflow Rest Action with dry run validation
* @param template the template to create
* @throws Exception if the request fails
* @return a rest response
*/
protected Response createWorkflowDryRun(Template template) throws Exception {
return TestHelpers.makeRequest(client(), "POST", WORKFLOW_URI + "?dryrun=true", ImmutableMap.of(), template.toJson(), null);
}

/**
* Helper method to invoke the Provision Workflow Rest Action
* @param workflowId the workflow ID to provision
* @throws Exception if the request fails
* @return a rest response
*/
protected Response provisionWorkflow(String workflowId) throws Exception {
return TestHelpers.makeRequest(
client(),
"POST",
String.format(Locale.ROOT, "%s/%s/%s", WORKFLOW_URI, workflowId, "_provision"),
ImmutableMap.of(),
"",
null
);
}

/**
* Helper method to invoke the Get Workflow Rest Action
* @param workflowId the workflow ID to get the status
* @throws Exception if the request fails
* @return rest response
*/
protected Response getWorkflowStatus(String workflowId) throws Exception {
return TestHelpers.makeRequest(
client(),
"GET",
String.format(Locale.ROOT, "%s/%s/%s", WORKFLOW_URI, workflowId, "_status"),
ImmutableMap.of(),
"",
null
);

}
}
Loading