Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REST high-level client: add Cluster Health API #29331

Merged
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@

import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;

import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;

/**
* A wrapper for the {@link RestHighLevelClient} that provides methods for accessing the Cluster API.
Expand Down Expand Up @@ -63,4 +67,29 @@ public void putSettingsAsync(ClusterUpdateSettingsRequest clusterUpdateSettingsR
restHighLevelClient.performRequestAsyncAndParseEntity(clusterUpdateSettingsRequest, RequestConverters::clusterPutSettings,
ClusterUpdateSettingsResponse::fromXContent, listener, emptySet(), headers);
}

/**
* Get cluster health using the Cluster Health API
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html"> Cluster Health API on elastic.co</a>
* <p>
* If timeout occurred, {@link ClusterHealthResponse} will have isTimedOut() == true and status() == RestStatus.REQUEST_TIMEOUT
*/
public ClusterHealthResponse health(ClusterHealthRequest healthRequest, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(healthRequest, RequestConverters::clusterHealth,
ClusterHealthResponse::fromXContent, singleton(RestStatus.REQUEST_TIMEOUT.getStatus()), headers);
}

/**
* Asynchronously get cluster health using the Cluster Health API
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html"> Cluster Health API on elastic.co</a>
* If timeout occurred, {@link ClusterHealthResponse} will have isTimedOut() == true and status() == RestStatus.REQUEST_TIMEOUT
*/
public void healthAsync(ClusterHealthRequest healthRequest, ActionListener<ClusterHealthResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity(healthRequest, RequestConverters::clusterHealth,
ClusterHealthResponse::fromXContent, listener, singleton(RestStatus.REQUEST_TIMEOUT.getStatus()), headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.http.entity.ContentType;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
import org.elasticsearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
Expand Down Expand Up @@ -72,7 +73,9 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -693,6 +696,28 @@ static Request listTasks(ListTasksRequest listTaskRequest) {
return request;
}

static Request clusterHealth(ClusterHealthRequest healthRequest) {
String[] indices = healthRequest.indices() == null ? Strings.EMPTY_ARRAY : healthRequest.indices();
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_cluster/health")
.addCommaSeparatedPathParts(indices)
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);

new Params(request)
.withWaitForStatus(healthRequest.waitForStatus())
.withWaitForNoRelocatingShards(healthRequest.waitForNoRelocatingShards())
.withWaitForNoInitializingShards(healthRequest.waitForNoInitializingShards())
.withWaitForActiveShards(healthRequest.waitForActiveShards())
.withWaitForNodes(healthRequest.waitForNodes())
.withWaitForEvents(healthRequest.waitForEvents())
.withTimeout(healthRequest.timeout())
.withMasterTimeout(healthRequest.masterNodeTimeout())
.withLocal(healthRequest.local())
.withLevel(healthRequest.level());
return request;
}

static Request rollover(RolloverRequest rolloverRequest) throws IOException {
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
.addPathPart(rolloverRequest.getNewIndexName()).build();
Expand Down Expand Up @@ -1083,6 +1108,42 @@ Params withVerify(boolean verify) {
}
return this;
}

Params withWaitForStatus(ClusterHealthStatus status) {
if (status != null) {
return putParam("wait_for_status", status.name().toLowerCase(Locale.ROOT));
}
return this;
}

Params withWaitForNoRelocatingShards(boolean waitNoRelocatingShards) {
if (waitNoRelocatingShards) {
return putParam("wait_for_no_relocating_shards", Boolean.TRUE.toString());
}
return this;
}

Params withWaitForNoInitializingShards(boolean waitNoInitShards) {
if (waitNoInitShards) {
return putParam("wait_for_no_initializing_shards", Boolean.TRUE.toString());
}
return this;
}

Params withWaitForNodes(String waitForNodes) {
return putParam("wait_for_nodes", waitForNodes);
}

Params withLevel(ClusterHealthRequest.Level level) {
return putParam("level", level.name().toLowerCase(Locale.ROOT));
}

Params withWaitForEvents(Priority waitForEvents) {
if (waitForEvents != null) {
return putParam("wait_for_events", waitForEvents.name().toLowerCase(Locale.ROOT));
}
return this;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
package org.elasticsearch.client;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand All @@ -34,6 +39,7 @@
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -105,4 +111,136 @@ public void testClusterUpdateSettingNonExistent() {
assertThat(exception.getMessage(), equalTo(
"Elasticsearch exception [type=illegal_argument_exception, reason=transient setting [" + setting + "], not recognized]"));
}

public void testClusterHealthGreen() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertNoIndices(response);
}

public void testClusterHealthYellowClusterLevel() throws IOException {
createIndex("index", Settings.EMPTY);
createIndex("index2", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout("5s");
request.level(ClusterHealthRequest.Level.CLUSTER);
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertYellowShards(response);
assertThat(response.getIndices().size(), equalTo(0));
}

public void testClusterHealthYellowIndicesLevel() throws IOException {
createIndex("index", Settings.EMPTY);
createIndex("index2", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout("5s");
request.level(ClusterHealthRequest.Level.INDICES);
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertYellowShards(response);
assertThat(response.getIndices().size(), equalTo(2));
for (Map.Entry<String, ClusterIndexHealth> entry : response.getIndices().entrySet()) {
assertYellowIndex(entry.getKey(), entry.getValue(), true);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer that we don't want randomize anything in this integration test. These are quite hard to debug if they fail, I would prefer to have different test methods here for the main code paths.

}

private static void assertYellowShards(ClusterHealthResponse response) {
assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.getActivePrimaryShards(), equalTo(2));
assertThat(response.getNumberOfDataNodes(), equalTo(1));
assertThat(response.getNumberOfNodes(), equalTo(1));
assertThat(response.getActiveShards(), equalTo(2));
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
assertThat(response.getInitializingShards(), equalTo(0));
assertThat(response.getUnassignedShards(), equalTo(2));
assertThat(response.getActiveShardsPercent(), equalTo(50d));
}

public void testClusterHealthYellowSpecificIndex() throws IOException {
createIndex("index", Settings.EMPTY);
createIndex("index2", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest("index");
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.getActivePrimaryShards(), equalTo(1));
assertThat(response.getNumberOfDataNodes(), equalTo(1));
assertThat(response.getNumberOfNodes(), equalTo(1));
assertThat(response.getActiveShards(), equalTo(1));
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
assertThat(response.getInitializingShards(), equalTo(0));
assertThat(response.getUnassignedShards(), equalTo(1));
assertThat(response.getActiveShardsPercent(), equalTo(50d));
assertThat(response.getIndices().size(), equalTo(1));
Map.Entry<String, ClusterIndexHealth> index = response.getIndices().entrySet().iterator().next();
assertYellowIndex(index.getKey(), index.getValue(), false);
}

private static void assertYellowIndex(String indexName, ClusterIndexHealth indexHealth, boolean emptyShards) {
assertThat(indexHealth, notNullValue());
assertThat(indexHealth.getIndex(),equalTo(indexName));
assertThat(indexHealth.getActivePrimaryShards(),equalTo(1));
assertThat(indexHealth.getActiveShards(),equalTo(1));
assertThat(indexHealth.getNumberOfReplicas(),equalTo(1));
assertThat(indexHealth.getInitializingShards(),equalTo(0));
assertThat(indexHealth.getUnassignedShards(),equalTo(1));
assertThat(indexHealth.getRelocatingShards(),equalTo(0));
assertThat(indexHealth.getStatus(),equalTo(ClusterHealthStatus.YELLOW));
if (emptyShards) {
assertThat(indexHealth.getShards().size(), equalTo(0));
} else {
assertThat(indexHealth.getShards().size(), equalTo(1));
for (Map.Entry<Integer, ClusterShardHealth> entry : indexHealth.getShards().entrySet()) {
assertYellowShard(entry.getKey(), entry.getValue());
}
}
}

private static void assertYellowShard(int shardId, ClusterShardHealth shardHealth) {
assertThat(shardHealth, notNullValue());
assertThat(shardHealth.getShardId(), equalTo(shardId));
assertThat(shardHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(shardHealth.getActiveShards(), equalTo(1));
assertThat(shardHealth.getInitializingShards(), equalTo(0));
assertThat(shardHealth.getUnassignedShards(), equalTo(1));
assertThat(shardHealth.getRelocatingShards(), equalTo(0));
}

public void testClusterHealthNotFoundIndex() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest("notexisted-index");
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(true));
assertThat(response.status(), equalTo(RestStatus.REQUEST_TIMEOUT));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));
assertNoIndices(response);
}

private static void assertNoIndices(ClusterHealthResponse response) {
assertThat(response.getIndices(), equalTo(emptyMap()));
assertThat(response.getActivePrimaryShards(), equalTo(0));
assertThat(response.getNumberOfDataNodes(), equalTo(1));
assertThat(response.getNumberOfNodes(), equalTo(1));
assertThat(response.getActiveShards(), equalTo(0));
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
assertThat(response.getInitializingShards(), equalTo(0));
assertThat(response.getUnassignedShards(), equalTo(0));
assertThat(response.getActiveShardsPercent(), equalTo(100d));
}
}
Loading