Skip to content

Commit

Permalink
Migrate client transports to Apache HttpClient / Core 5.x
Browse files Browse the repository at this point in the history
Signed-off-by: Owais Kazi <owaiskazi19@gmail.com>
  • Loading branch information
owaiskazi19 committed Nov 4, 2022
1 parent 39c1c3a commit f179359
Show file tree
Hide file tree
Showing 11 changed files with 104 additions and 92 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ configurations.all {
resolutionStrategy {
force "joda-time:joda-time:2.10.13"
force "commons-logging:commons-logging:1.2"
force "org.apache.httpcomponents:httpcore:4.4.15"
force "org.apache.httpcomponents:httpcore5:5.1.4"
force "commons-codec:commons-codec:1.15"

force "org.mockito:mockito-core:2.25.0"
Expand Down
12 changes: 3 additions & 9 deletions src/main/java/org/opensearch/ad/AnomalyDetectorExtension.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,11 @@
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.sdk.*;
import org.opensearch.sdk.Extension;
import org.opensearch.sdk.ExtensionRestHandler;
import org.opensearch.sdk.ExtensionSettings;
import org.opensearch.sdk.ExtensionsRunner;
import org.opensearch.sdk.SDKClient;
import org.opensearch.threadpool.ThreadPool;

import com.google.common.collect.ImmutableList;
Expand All @@ -44,13 +44,7 @@ public ExtensionSettings getExtensionSettings() {

@Override
public List<ExtensionRestHandler> getExtensionRestHandlers() {
List<ExtensionRestHandler> handler = null;
try {
handler = List.of(new RestCreateDetectorAction());
} catch (IOException e) {
e.printStackTrace();
}
return handler;
return List.of(new RestCreateDetectorAction());
}

@Override
Expand Down Expand Up @@ -107,7 +101,7 @@ private static ExtensionSettings initializeSettings() throws IOException {
return settings;
}

public OpenSearchClient getClient() throws IOException {
public OpenSearchClient getClient() {
SDKClient sdkClient = new SDKClient();
OpenSearchClient client = sdkClient.initializeClient("localhost", 9200);
return client;
Expand Down
5 changes: 3 additions & 2 deletions src/main/java/org/opensearch/ad/auth/UserIdentity.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@
import java.util.Map;
import java.util.Objects;

import org.apache.http.util.EntityUtils;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.opensearch.client.Response;
import org.opensearch.common.Nullable;
import org.opensearch.common.Strings;
Expand Down Expand Up @@ -110,7 +111,7 @@ public UserIdentity(
* @param response The security plugin response.
* @throws IOException If there was an error receiving the response.
*/
public UserIdentity(final Response response) throws IOException {
public UserIdentity(final Response response) throws IOException, ParseException {
this(EntityUtils.toString(response.getEntity()));
}

Expand Down
47 changes: 17 additions & 30 deletions src/main/java/org/opensearch/ad/rest/RestCreateDetectorAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.ANOMALY_DETECTORS_INDEX_MAPPING_FILE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.rest.RestRequest.Method.*;
import static org.opensearch.rest.RestRequest.Method.POST;
import static org.opensearch.rest.RestStatus.BAD_REQUEST;
import static org.opensearch.rest.RestStatus.NOT_FOUND;
import static org.opensearch.rest.RestStatus.OK;

import java.io.*;
Expand All @@ -23,7 +25,6 @@
import org.opensearch.ad.model.DetectorInternalState;
import org.opensearch.ad.settings.EnabledSetting;
import org.opensearch.client.json.JsonpMapper;
import org.opensearch.client.json.jackson.JacksonJsonpMapper;
import org.opensearch.client.opensearch.OpenSearchClient;
import org.opensearch.client.opensearch._types.mapping.TypeMapping;
import org.opensearch.client.opensearch.core.IndexRequest;
Expand Down Expand Up @@ -51,7 +52,6 @@
import org.opensearch.search.aggregations.metrics.InternalSum;
import org.opensearch.search.aggregations.metrics.SumAggregationBuilder;

import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.google.common.base.Charsets;
import com.google.common.io.Resources;
import jakarta.json.stream.JsonParser;
Expand All @@ -61,8 +61,6 @@ public class RestCreateDetectorAction implements ExtensionRestHandler {
private AnomalyDetectorExtension anomalyDetectorExtension = new AnomalyDetectorExtension();
private OpenSearchClient sdkClient = anomalyDetectorExtension.getClient();

public RestCreateDetectorAction() throws IOException {}

@Override
public List<Route> routes() {
return List.of(new Route(POST, "/detectors"));
Expand Down Expand Up @@ -156,7 +154,7 @@ private NamedXContentRegistry.Entry registerAggregation(SearchPlugin.Aggregation

private CreateIndexRequest initAnomalyDetectorIndex() throws FileNotFoundException {
JsonpMapper mapper = sdkClient._transport().jsonpMapper();
((JacksonJsonpMapper) mapper).objectMapper().registerModule(new JavaTimeModule());
// ((JacksonJsonpMapper) mapper).objectMapper().registerModule(new JavaTimeModule());
JsonParser parser = null;
try {
parser = mapper
Expand Down Expand Up @@ -186,33 +184,24 @@ public ExtensionRestResponse handleRequest(ExtensionRestRequest request) {
}
Method method = request.method();

NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(getNamedXWriteables());
XContentParser parser = null;
try {
parser = request.contentParser(xContentRegistry);
} catch (Exception e) {
e.printStackTrace();
if (!Method.POST.equals(method)) {
return new ExtensionRestResponse(
request,
NOT_FOUND,
"Extension REST action improperly configured to handle " + request.toString()
);
}

AnomalyDetector detector = null;
NamedXContentRegistry xContentRegistry = new NamedXContentRegistry(getNamedXWriteables());
XContentParser parser;
AnomalyDetector detector;
XContentBuilder builder = null;
CreateIndexRequest createIndexRequest;
try {
parser = request.contentParser(xContentRegistry);
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);
detector = AnomalyDetector.parse(parser);
} catch (Exception e) {
logger.info("Exception", e);
e.printStackTrace();
}

CreateIndexRequest createIndexRequest = null;
try {
createIndexRequest = initAnomalyDetectorIndex();
} catch (FileNotFoundException e) {
logger.info("File Not Found", e);
e.printStackTrace();
}

XContentBuilder builder = null;
try {
CreateIndexResponse createIndexResponse = sdkClient.indices().create(createIndexRequest);
if (createIndexResponse.acknowledged()) {
IndexResponse indexResponse = indexAnomalyDetector(detector);
Expand All @@ -230,10 +219,8 @@ public ExtensionRestResponse handleRequest(ExtensionRestRequest request) {
e.printStackTrace();
}
}

} catch (Exception e) {
logger.info("Exception", e);
e.printStackTrace();
return new ExtensionRestResponse(request, BAD_REQUEST, builder);
}
return new ExtensionRestResponse(request, OK, builder);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@
import java.util.Locale;
import java.util.Map;

import org.apache.http.HttpHeaders;
import org.apache.http.message.BasicHeader;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.message.BasicHeader;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorExecutionInput;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import java.util.Set;
import java.util.function.ToDoubleFunction;

import org.apache.http.HttpHeaders;
import org.apache.http.message.BasicHeader;
import org.apache.http.util.EntityUtils;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.ParseException;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.http.message.BasicHeader;
import org.junit.Before;
import org.opensearch.ad.mock.model.MockSimpleLog;
import org.opensearch.ad.model.ADTaskProfile;
Expand Down Expand Up @@ -62,7 +63,7 @@ public ToXContentObject[] getHistoricalAnomalyDetector(String detectorId, boolea
return getAnomalyDetector(detectorId, header, false, returnTask, client);
}

public ADTaskProfile getADTaskProfile(String detectorId) throws IOException {
public ADTaskProfile getADTaskProfile(String detectorId) throws IOException, ParseException {
Response profileResponse = TestHelpers
.makeRequest(
client(),
Expand Down Expand Up @@ -99,7 +100,8 @@ public Response ingestSimpleMockLog(
ToDoubleFunction<Integer> valueFunc,
int ipSize,
int categorySize
) throws IOException {
) throws IOException,
ParseException {
TestHelpers
.makeRequest(
client(),
Expand Down Expand Up @@ -150,7 +152,7 @@ public Response ingestSimpleMockLog(
return bulkResponse;
}

public ADTaskProfile parseADTaskProfile(Response profileResponse) throws IOException {
public ADTaskProfile parseADTaskProfile(Response profileResponse) throws IOException, ParseException {
String profileResult = EntityUtils.toString(profileResponse.getEntity());
XContentParser parser = TestHelpers.parser(profileResult);
ADTaskProfile adTaskProfile = null;
Expand All @@ -166,7 +168,8 @@ public ADTaskProfile parseADTaskProfile(Response profileResponse) throws IOExcep
return adTaskProfile;
}

protected void ingestTestDataForHistoricalAnalysis(String indexName, int detectionIntervalInMinutes) throws IOException {
protected void ingestTestDataForHistoricalAnalysis(String indexName, int detectionIntervalInMinutes) throws IOException,
ParseException {
ingestSimpleMockLog(indexName, 10, 3000, detectionIntervalInMinutes, (i) -> {
if (i % 500 == 0) {
return randomDoubleBetween(100, 1000, true);
Expand Down
62 changes: 45 additions & 17 deletions src/test/java/org/opensearch/ad/ODFERestTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,23 @@
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.conn.ssl.NoopHostnameVerifier;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.message.BasicHeader;
import org.apache.http.ssl.SSLContextBuilder;
import javax.net.ssl.SSLEngine;

import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.impl.auth.BasicCredentialsProvider;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManagerBuilder;
import org.apache.hc.client5.http.ssl.ClientTlsStrategyBuilder;
import org.apache.hc.client5.http.ssl.NoopHostnameVerifier;
import org.apache.hc.core5.function.Factory;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.hc.core5.http.nio.ssl.TlsStrategy;
import org.apache.hc.core5.reactor.ssl.TlsDetails;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.apache.hc.core5.util.Timeout;
import org.junit.After;
import org.opensearch.client.Request;
import org.opensearch.client.Response;
Expand Down Expand Up @@ -134,7 +142,7 @@ protected RestClient buildClient(Settings settings, HttpHost[] hosts) throws IOE
@After
protected void wipeAllODFEIndices() throws IOException {
Response response = adminClient().performRequest(new Request("GET", "/_cat/indices?format=json&expand_wildcards=all"));
XContentType xContentType = XContentType.fromMediaType(response.getEntity().getContentType().getValue());
XContentType xContentType = XContentType.fromMediaType(response.getEntity().getContentType());
try (
XContentParser parser = xContentType
.xContent()
Expand Down Expand Up @@ -176,14 +184,33 @@ protected static void configureHttpsClient(RestClientBuilder builder, Settings s
String password = Optional
.ofNullable(System.getProperty("password"))
.orElseThrow(() -> new RuntimeException("password is missing"));
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(userName, password));
BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider
.setCredentials(
new AuthScope(new HttpHost("localhost", 9200)),
new UsernamePasswordCredentials(userName, password.toCharArray())
);
try {
return httpClientBuilder
.setDefaultCredentialsProvider(credentialsProvider)
final TlsStrategy tlsStrategy = ClientTlsStrategyBuilder
.create()
.setSslContext(SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build())
// disable the certificate since our testing cluster just uses the default security configuration
.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE)
.setSSLContext(SSLContextBuilder.create().loadTrustMaterial(null, (chains, authType) -> true).build());
.setHostnameVerifier(NoopHostnameVerifier.INSTANCE)
// See please https://issues.apache.org/jira/browse/HTTPCLIENT-2219
.setTlsDetailsFactory(new Factory<SSLEngine, TlsDetails>() {
@Override
public TlsDetails create(final SSLEngine sslEngine) {
return new TlsDetails(sslEngine.getSession(), sslEngine.getApplicationProtocol());
}
})
.build();

final PoolingAsyncClientConnectionManager connectionManager = PoolingAsyncClientConnectionManagerBuilder
.create()
.setTlsStrategy(tlsStrategy)
.build();

return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider).setConnectionManager(connectionManager);
} catch (Exception e) {
throw new RuntimeException(e);
}
Expand All @@ -192,7 +219,8 @@ protected static void configureHttpsClient(RestClientBuilder builder, Settings s
final String socketTimeoutString = settings.get(CLIENT_SOCKET_TIMEOUT);
final TimeValue socketTimeout = TimeValue
.parseTimeValue(socketTimeoutString == null ? "60s" : socketTimeoutString, CLIENT_SOCKET_TIMEOUT);
builder.setRequestConfigCallback(conf -> conf.setSocketTimeout(Math.toIntExact(socketTimeout.getMillis())));
builder
.setRequestConfigCallback(conf -> conf.setResponseTimeout(Timeout.ofMilliseconds(Math.toIntExact(socketTimeout.getMillis()))));
if (settings.hasValue(CLIENT_PATH_PREFIX)) {
builder.setPathPrefix(settings.get(CLIENT_PATH_PREFIX));
}
Expand Down
16 changes: 7 additions & 9 deletions src/test/java/org/opensearch/ad/TestHelpers.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

package org.opensearch.ad;

import static org.apache.http.entity.ContentType.APPLICATION_JSON;
import static org.opensearch.cluster.node.DiscoveryNodeRole.BUILT_IN_ROLES;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
Expand All @@ -38,11 +37,10 @@
import java.util.function.Consumer;
import java.util.stream.IntStream;

import org.apache.http.Header;
import org.apache.http.HttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.hc.core5.http.ContentType;
import org.apache.hc.core5.http.Header;
import org.apache.hc.core5.http.HttpEntity;
import org.apache.hc.core5.http.io.entity.StringEntity;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.util.Strings;
Expand Down Expand Up @@ -169,7 +167,7 @@ public static Response makeRequest(
String jsonEntity,
List<Header> headers
) throws IOException {
HttpEntity httpEntity = Strings.isBlank(jsonEntity) ? null : new NStringEntity(jsonEntity, ContentType.APPLICATION_JSON);
HttpEntity httpEntity = Strings.isBlank(jsonEntity) ? null : new StringEntity(jsonEntity, ContentType.APPLICATION_JSON);
return makeRequest(client, method, endpoint, params, httpEntity, headers);
}

Expand Down Expand Up @@ -1396,11 +1394,11 @@ public static ADTask randomAdTask(
}

public static HttpEntity toHttpEntity(ToXContentObject object) throws IOException {
return new StringEntity(toJsonString(object), APPLICATION_JSON);
return new StringEntity(toJsonString(object), ContentType.APPLICATION_JSON);
}

public static HttpEntity toHttpEntity(String jsonString) throws IOException {
return new StringEntity(jsonString, APPLICATION_JSON);
return new StringEntity(jsonString, ContentType.APPLICATION_JSON);
}

public static String toJsonString(ToXContentObject object) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;

import org.apache.http.HttpHeaders;
import org.apache.http.message.BasicHeader;
import org.apache.hc.core5.http.HttpHeaders;
import org.apache.hc.core5.http.message.BasicHeader;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.core.Logger;
import org.junit.Ignore;
Expand Down
Loading

0 comments on commit f179359

Please sign in to comment.