Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport stable/operate-8.5] feat: allow for disabling cluster health checks #23372

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
import org.elasticsearch.client.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;
Expand All @@ -70,21 +69,24 @@ public class ElasticsearchConnector {

private static final Logger LOGGER = LoggerFactory.getLogger(ElasticsearchConnector.class);

@Autowired private OperateProperties operateProperties;

private final OperateProperties operateProperties;
private ElasticsearchClient elasticsearchClient;

public static void closeEsClient(RestHighLevelClient esClient) {
public ElasticsearchConnector(final OperateProperties operateProperties) {
this.operateProperties = operateProperties;
}

public static void closeEsClient(final RestHighLevelClient esClient) {
if (esClient != null) {
try {
esClient.close();
} catch (IOException e) {
} catch (final IOException e) {
LOGGER.error("Could not close esClient", e);
}
}
}

public static void closeEsClient(ElasticsearchClient esClient) {
public static void closeEsClient(final ElasticsearchClient esClient) {
if (esClient != null) {
esClient.shutdown();
}
Expand All @@ -111,15 +113,20 @@ public ElasticsearchClient elasticsearchClient() {

// And create the API client
elasticsearchClient = new ElasticsearchClient(transport);
if (!checkHealth(elasticsearchClient)) {
LOGGER.warn("Elasticsearch cluster is not accessible");

if (operateProperties.getElasticsearch().isHealthCheckEnabled()) {
if (!checkHealth(elasticsearchClient)) {
LOGGER.warn("Elasticsearch cluster is not accessible");
} else {
LOGGER.debug("Elasticsearch connection was successfully created.");
}
} else {
LOGGER.debug("Elasticsearch connection was successfully created.");
LOGGER.warn("Elasticsearch cluster health check is disabled.");
}
return elasticsearchClient;
}

public boolean checkHealth(ElasticsearchClient elasticsearchClient) {
public boolean checkHealth(final ElasticsearchClient elasticsearchClient) {
final ElasticsearchProperties elsConfig = operateProperties.getElasticsearch();
try {
return RetryOperation.<Boolean>newBuilder()
Expand All @@ -140,7 +147,7 @@ public boolean checkHealth(ElasticsearchClient elasticsearchClient) {
})
.build()
.retry();
} catch (Exception e) {
} catch (final Exception e) {
throw new OperateRuntimeException("Couldn't connect to Elasticsearch. Abort.", e);
}
}
Expand All @@ -166,13 +173,13 @@ public void tearDown() {
if (elasticsearchClient != null) {
try {
elasticsearchClient._transport().close();
} catch (IOException e) {
} catch (final IOException e) {
throw new RuntimeException(e);
}
}
}

public RestHighLevelClient createEsClient(ElasticsearchProperties elsConfig) {
public RestHighLevelClient createEsClient(final ElasticsearchProperties elsConfig) {
LOGGER.debug("Creating Elasticsearch connection...");
final RestClientBuilder restClientBuilder =
RestClient.builder(getHttpHost(elsConfig))
Expand All @@ -186,16 +193,21 @@ public RestHighLevelClient createEsClient(ElasticsearchProperties elsConfig) {
new RestHighLevelClientBuilder(restClientBuilder.build())
.setApiCompatibilityMode(true)
.build();
if (!checkHealth(esClient)) {
LOGGER.warn("Elasticsearch cluster is not accessible");
if (operateProperties.getElasticsearch().isHealthCheckEnabled()) {
if (!checkHealth(esClient)) {
LOGGER.warn("Elasticsearch cluster is not accessible");
} else {
LOGGER.debug("Elasticsearch connection was successfully created.");
}
} else {
LOGGER.debug("Elasticsearch connection was successfully created.");
LOGGER.warn("Elasticsearch cluster health check is disabled.");
}
return esClient;
}

protected HttpAsyncClientBuilder configureHttpClient(
HttpAsyncClientBuilder httpAsyncClientBuilder, ElasticsearchProperties elsConfig) {
final HttpAsyncClientBuilder httpAsyncClientBuilder,
final ElasticsearchProperties elsConfig) {
setupAuthentication(httpAsyncClientBuilder, elsConfig);
if (elsConfig.getSsl() != null) {
setupSSLContext(httpAsyncClientBuilder, elsConfig.getSsl());
Expand All @@ -204,18 +216,18 @@ protected HttpAsyncClientBuilder configureHttpClient(
}

private void setupSSLContext(
HttpAsyncClientBuilder httpAsyncClientBuilder, SslProperties sslConfig) {
final HttpAsyncClientBuilder httpAsyncClientBuilder, final SslProperties sslConfig) {
try {
httpAsyncClientBuilder.setSSLContext(getSSLContext(sslConfig));
if (!sslConfig.isVerifyHostname()) {
httpAsyncClientBuilder.setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE);
}
} catch (Exception e) {
} catch (final Exception e) {
LOGGER.error("Error in setting up SSLContext", e);
}
}

private SSLContext getSSLContext(SslProperties sslConfig)
private SSLContext getSSLContext(final SslProperties sslConfig)
throws KeyStoreException, NoSuchAlgorithmException, KeyManagementException {
final KeyStore truststore = loadCustomTrustStore(sslConfig);
final TrustStrategy trustStrategy =
Expand All @@ -228,7 +240,7 @@ private SSLContext getSSLContext(SslProperties sslConfig)
}
}

private KeyStore loadCustomTrustStore(SslProperties sslConfig) {
private KeyStore loadCustomTrustStore(final SslProperties sslConfig) {
try {
final KeyStore trustStore = KeyStore.getInstance(KeyStore.getDefaultType());
trustStore.load(null);
Expand All @@ -238,7 +250,7 @@ private KeyStore loadCustomTrustStore(SslProperties sslConfig) {
setCertificateInTrustStore(trustStore, serverCertificate);
}
return trustStore;
} catch (Exception e) {
} catch (final Exception e) {
final String message =
"Could not create certificate trustStore for the secured Elasticsearch Connection!";
throw new OperateRuntimeException(message, e);
Expand All @@ -250,7 +262,7 @@ private void setCertificateInTrustStore(
try {
final Certificate cert = loadCertificateFromPath(serverCertificate);
trustStore.setCertificateEntry("elasticsearch-host", cert);
} catch (Exception e) {
} catch (final Exception e) {
final String message =
"Could not load configured server certificate for the secured Elasticsearch Connection!";
throw new OperateRuntimeException(message, e);
Expand All @@ -260,7 +272,8 @@ private void setCertificateInTrustStore(
private Certificate loadCertificateFromPath(final String certificatePath)
throws IOException, CertificateException {
final Certificate cert;
try (BufferedInputStream bis = new BufferedInputStream(new FileInputStream(certificatePath))) {
try (final BufferedInputStream bis =
new BufferedInputStream(new FileInputStream(certificatePath))) {
final CertificateFactory cf = CertificateFactory.getInstance("X.509");

if (bis.available() > 0) {
Expand All @@ -284,17 +297,17 @@ private Builder setTimeouts(final Builder builder, final ElasticsearchProperties
return builder;
}

private HttpHost getHttpHost(ElasticsearchProperties elsConfig) {
private HttpHost getHttpHost(final ElasticsearchProperties elsConfig) {
try {
final URI uri = new URI(elsConfig.getUrl());
return new HttpHost(uri.getHost(), uri.getPort(), uri.getScheme());
} catch (URISyntaxException e) {
} catch (final URISyntaxException e) {
throw new OperateRuntimeException("Error in url: " + elsConfig.getUrl(), e);
}
}

private void setupAuthentication(
final HttpAsyncClientBuilder builder, ElasticsearchProperties elsConfig) {
final HttpAsyncClientBuilder builder, final ElasticsearchProperties elsConfig) {
final String username = elsConfig.getUsername();
final String password = elsConfig.getPassword();

Expand All @@ -309,7 +322,7 @@ private void setupAuthentication(
builder.setDefaultCredentialsProvider(credentialsProvider);
}

public boolean checkHealth(RestHighLevelClient esClient) {
public boolean checkHealth(final RestHighLevelClient esClient) {
final ElasticsearchProperties elsConfig = operateProperties.getElasticsearch();
try {
return RetryOperation.<Boolean>newBuilder()
Expand All @@ -328,7 +341,7 @@ public boolean checkHealth(RestHighLevelClient esClient) {
})
.build()
.retry();
} catch (Exception e) {
} catch (final Exception e) {
throw new OperateRuntimeException("Couldn't connect to Elasticsearch. Abort.", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,15 @@ public OpensearchConnector(
@Primary
public OpenSearchClient openSearchClient() {
final OpenSearchClient openSearchClient = createOsClient(operateProperties.getOpensearch());
try {
final HealthResponse response = openSearchClient.cluster().health();
LOGGER.info("OpenSearch cluster health: {}", response.status());
} catch (final IOException e) {
LOGGER.error("Error in getting health status from {}", "localhost:9205", e);
if (operateProperties.getOpensearch().isHealthCheckEnabled()) {
try {
final HealthResponse response = openSearchClient.cluster().health();
LOGGER.info("OpenSearch cluster health: {}", response.status());
} catch (final IOException e) {
LOGGER.error("Error in getting health status from {}", "localhost:9205", e);
}
} else {
LOGGER.warn("OpenSearch cluster health check is disabled.");
}
return openSearchClient;
}
Expand All @@ -112,19 +116,23 @@ public OpenSearchClient openSearchClient() {
public OpenSearchAsyncClient openSearchAsyncClient() {
final OpenSearchAsyncClient openSearchClient =
createAsyncOsClient(operateProperties.getOpensearch());
final CompletableFuture<HealthResponse> healthResponse;
try {
healthResponse = openSearchClient.cluster().health();
healthResponse.whenComplete(
(response, e) -> {
if (e != null) {
LOGGER.error("Error in getting health status from {}", "localhost:9205", e);
} else {
LOGGER.info("OpenSearch cluster health: {}", response.status());
}
});
} catch (final IOException e) {
throw new RuntimeException(e);
if (operateProperties.getOpensearch().isHealthCheckEnabled()) {
final CompletableFuture<HealthResponse> healthResponse;
try {
healthResponse = openSearchClient.cluster().health();
healthResponse.whenComplete(
(response, e) -> {
if (e != null) {
LOGGER.error("Error in getting health status from {}", "localhost:9205", e);
} else {
LOGGER.info("OpenSearch cluster health: {}", response.status());
}
});
} catch (final IOException e) {
throw new RuntimeException(e);
}
} else {
LOGGER.warn("OpenSearch cluster health check is disabled.");
}
return openSearchClient;
}
Expand Down Expand Up @@ -164,25 +172,28 @@ public OpenSearchAsyncClient createAsyncOsClient(final OpensearchProperties osCo
final OpenSearchTransport transport = builder.build();
final OpenSearchAsyncClient openSearchAsyncClient = new OpenSearchAsyncClient(transport);

final CompletableFuture<HealthResponse> healthResponse;
try {
healthResponse = openSearchAsyncClient.cluster().health();
healthResponse.whenComplete(
(response, e) -> {
if (e != null) {
LOGGER.error("Error in getting health status from {}", "localhost:9205", e);
} else {
LOGGER.info("OpenSearch cluster health: {}", response.status());
}
});
} catch (final IOException e) {
throw new OperateRuntimeException(e);
}

if (!checkHealth(openSearchAsyncClient)) {
LOGGER.warn("OpenSearch cluster is not accessible");
if (operateProperties.getOpensearch().isHealthCheckEnabled()) {
final CompletableFuture<HealthResponse> healthResponse;
try {
healthResponse = openSearchAsyncClient.cluster().health();
healthResponse.whenComplete(
(response, e) -> {
if (e != null) {
LOGGER.error("Error in getting health status from {}", "localhost:9205", e);
} else {
LOGGER.info("OpenSearch cluster health: {}", response.status());
}
});
} catch (final IOException e) {
throw new OperateRuntimeException(e);
}
if (!checkHealth(openSearchAsyncClient)) {
LOGGER.warn("OpenSearch cluster is not accessible");
} else {
LOGGER.debug("OpenSearch connection was successfully created.");
}
} else {
LOGGER.debug("OpenSearch connection was successfully created.");
LOGGER.warn("OpenSearch cluster health check is disabled.");
}
return openSearchAsyncClient;
}
Expand Down Expand Up @@ -229,17 +240,21 @@ public OpenSearchClient createOsClient(final OpensearchProperties osConfig) {

final OpenSearchTransport transport = builder.build();
final OpenSearchClient openSearchClient = new ExtendedOpenSearchClient(transport);
try {
final HealthResponse response = openSearchClient.cluster().health();
LOGGER.info("OpenSearch cluster health: {}", response.status());
} catch (final IOException e) {
LOGGER.error("Error in getting health status from {}", "localhost:9205", e);
}
if (operateProperties.getOpensearch().isHealthCheckEnabled()) {
try {
final HealthResponse response = openSearchClient.cluster().health();
LOGGER.info("OpenSearch cluster health: {}", response.status());
} catch (final IOException e) {
LOGGER.error("Error in getting health status from {}", "localhost:9205", e);
}

if (!checkHealth(openSearchClient)) {
LOGGER.warn("OpenSearch cluster is not accessible");
if (!checkHealth(openSearchClient)) {
LOGGER.warn("OpenSearch cluster is not accessible");
} else {
LOGGER.debug("OpenSearch connection was successfully created.");
}
} else {
LOGGER.debug("OpenSearch connection was successfully created.");
LOGGER.warn("OpenSearch cluster health check is disabled.");
}
return openSearchClient;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class ElasticsearchProperties {

private boolean createSchema = true;

/** Indicates whether operate does a proper health check for ES clusters. */
private boolean healthCheckEnabled = true;

private String url;
private String username;
private String password;
Expand Down Expand Up @@ -125,6 +128,14 @@ public void setCreateSchema(final boolean createSchema) {
this.createSchema = createSchema;
}

public boolean isHealthCheckEnabled() {
return healthCheckEnabled;
}

public void setHealthCheckEnabled(final boolean healthCheckEnabled) {
this.healthCheckEnabled = healthCheckEnabled;
}

public String getPassword() {
return password;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ public class OpensearchProperties {

private boolean createSchema = true;

/** Indicates whether operate does a proper health check for ES/OS clusters. */
private boolean healthCheckEnabled = true;

private String url;
private String username;
private String password;
Expand Down Expand Up @@ -127,6 +130,14 @@ public void setCreateSchema(final boolean createSchema) {
this.createSchema = createSchema;
}

public boolean isHealthCheckEnabled() {
return healthCheckEnabled;
}

public void setHealthCheckEnabled(final boolean healthCheckEnabled) {
this.healthCheckEnabled = healthCheckEnabled;
}

public String getPassword() {
return password;
}
Expand Down
Loading
Loading