Skip to content

Commit

Permalink
REST high-level client: add Cluster Health API (#29331)
Browse files Browse the repository at this point in the history
Relates to #27205
  • Loading branch information
Van0SS authored and javanna committed Jun 12, 2018
1 parent 5f84e18 commit d5e8a5c
Show file tree
Hide file tree
Showing 21 changed files with 1,588 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@

import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;

import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;

/**
* A wrapper for the {@link RestHighLevelClient} that provides methods for accessing the Cluster API.
Expand Down Expand Up @@ -95,4 +99,34 @@ public void putSettingsAsync(ClusterUpdateSettingsRequest clusterUpdateSettingsR
restHighLevelClient.performRequestAsyncAndParseEntity(clusterUpdateSettingsRequest, RequestConverters::clusterPutSettings,
ClusterUpdateSettingsResponse::fromXContent, listener, emptySet(), headers);
}

/**
* Get cluster health using the Cluster Health API.
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html"> Cluster Health API on elastic.co</a>
* <p>
* If timeout occurred, {@link ClusterHealthResponse} will have isTimedOut() == true and status() == RestStatus.REQUEST_TIMEOUT
* @param healthRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @return the response
* @throws IOException in case there is a problem sending the request or parsing back the response
*/
public ClusterHealthResponse health(ClusterHealthRequest healthRequest, RequestOptions options) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(healthRequest, RequestConverters::clusterHealth, options,
ClusterHealthResponse::fromXContent, singleton(RestStatus.REQUEST_TIMEOUT.getStatus()));
}

/**
* Asynchronously get cluster health using the Cluster Health API.
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html"> Cluster Health API on elastic.co</a>
* If timeout occurred, {@link ClusterHealthResponse} will have isTimedOut() == true and status() == RestStatus.REQUEST_TIMEOUT
* @param healthRequest the request
* @param options the request options (e.g. headers), use {@link RequestOptions#DEFAULT} if nothing needs to be customized
* @param listener the listener to be notified upon request completion
*/
public void healthAsync(ClusterHealthRequest healthRequest, RequestOptions options, ActionListener<ClusterHealthResponse> listener) {
restHighLevelClient.performRequestAsyncAndParseEntity(healthRequest, RequestConverters::clusterHealth, options,
ClusterHealthResponse::fromXContent, listener, singleton(RestStatus.REQUEST_TIMEOUT.getStatus()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.http.entity.ContentType;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.list.ListTasksRequest;
import org.elasticsearch.action.admin.cluster.repositories.delete.DeleteRepositoryRequest;
Expand Down Expand Up @@ -74,7 +75,9 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -706,6 +709,28 @@ static Request listTasks(ListTasksRequest listTaskRequest) {
return request;
}

static Request clusterHealth(ClusterHealthRequest healthRequest) {
String[] indices = healthRequest.indices() == null ? Strings.EMPTY_ARRAY : healthRequest.indices();
String endpoint = new EndpointBuilder()
.addPathPartAsIs("_cluster/health")
.addCommaSeparatedPathParts(indices)
.build();
Request request = new Request(HttpGet.METHOD_NAME, endpoint);

new Params(request)
.withWaitForStatus(healthRequest.waitForStatus())
.withWaitForNoRelocatingShards(healthRequest.waitForNoRelocatingShards())
.withWaitForNoInitializingShards(healthRequest.waitForNoInitializingShards())
.withWaitForActiveShards(healthRequest.waitForActiveShards())
.withWaitForNodes(healthRequest.waitForNodes())
.withWaitForEvents(healthRequest.waitForEvents())
.withTimeout(healthRequest.timeout())
.withMasterTimeout(healthRequest.masterNodeTimeout())
.withLocal(healthRequest.local())
.withLevel(healthRequest.level());
return request;
}

static Request rollover(RolloverRequest rolloverRequest) throws IOException {
String endpoint = new EndpointBuilder().addPathPart(rolloverRequest.getAlias()).addPathPartAsIs("_rollover")
.addPathPart(rolloverRequest.getNewIndexName()).build();
Expand Down Expand Up @@ -1124,6 +1149,42 @@ Params withVerify(boolean verify) {
}
return this;
}

Params withWaitForStatus(ClusterHealthStatus status) {
if (status != null) {
return putParam("wait_for_status", status.name().toLowerCase(Locale.ROOT));
}
return this;
}

Params withWaitForNoRelocatingShards(boolean waitNoRelocatingShards) {
if (waitNoRelocatingShards) {
return putParam("wait_for_no_relocating_shards", Boolean.TRUE.toString());
}
return this;
}

Params withWaitForNoInitializingShards(boolean waitNoInitShards) {
if (waitNoInitShards) {
return putParam("wait_for_no_initializing_shards", Boolean.TRUE.toString());
}
return this;
}

Params withWaitForNodes(String waitForNodes) {
return putParam("wait_for_nodes", waitForNodes);
}

Params withLevel(ClusterHealthRequest.Level level) {
return putParam("level", level.name().toLowerCase(Locale.ROOT));
}

Params withWaitForEvents(Priority waitForEvents) {
if (waitForEvents != null) {
return putParam("wait_for_events", waitForEvents.name().toLowerCase(Locale.ROOT));
}
return this;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
package org.elasticsearch.client;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand All @@ -34,6 +39,7 @@
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -108,4 +114,136 @@ public void testClusterUpdateSettingNonExistent() {
assertThat(exception.getMessage(), equalTo(
"Elasticsearch exception [type=illegal_argument_exception, reason=transient setting [" + setting + "], not recognized]"));
}

public void testClusterHealthGreen() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.GREEN));
assertNoIndices(response);
}

public void testClusterHealthYellowClusterLevel() throws IOException {
createIndex("index", Settings.EMPTY);
createIndex("index2", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout("5s");
request.level(ClusterHealthRequest.Level.CLUSTER);
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertYellowShards(response);
assertThat(response.getIndices().size(), equalTo(0));
}

public void testClusterHealthYellowIndicesLevel() throws IOException {
createIndex("index", Settings.EMPTY);
createIndex("index2", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout("5s");
request.level(ClusterHealthRequest.Level.INDICES);
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertYellowShards(response);
assertThat(response.getIndices().size(), equalTo(2));
for (Map.Entry<String, ClusterIndexHealth> entry : response.getIndices().entrySet()) {
assertYellowIndex(entry.getKey(), entry.getValue(), true);
}
}

private static void assertYellowShards(ClusterHealthResponse response) {
assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.getActivePrimaryShards(), equalTo(2));
assertThat(response.getNumberOfDataNodes(), equalTo(1));
assertThat(response.getNumberOfNodes(), equalTo(1));
assertThat(response.getActiveShards(), equalTo(2));
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
assertThat(response.getInitializingShards(), equalTo(0));
assertThat(response.getUnassignedShards(), equalTo(2));
assertThat(response.getActiveShardsPercent(), equalTo(50d));
}

public void testClusterHealthYellowSpecificIndex() throws IOException {
createIndex("index", Settings.EMPTY);
createIndex("index2", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest("index");
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.getActivePrimaryShards(), equalTo(1));
assertThat(response.getNumberOfDataNodes(), equalTo(1));
assertThat(response.getNumberOfNodes(), equalTo(1));
assertThat(response.getActiveShards(), equalTo(1));
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
assertThat(response.getInitializingShards(), equalTo(0));
assertThat(response.getUnassignedShards(), equalTo(1));
assertThat(response.getActiveShardsPercent(), equalTo(50d));
assertThat(response.getIndices().size(), equalTo(1));
Map.Entry<String, ClusterIndexHealth> index = response.getIndices().entrySet().iterator().next();
assertYellowIndex(index.getKey(), index.getValue(), false);
}

private static void assertYellowIndex(String indexName, ClusterIndexHealth indexHealth, boolean emptyShards) {
assertThat(indexHealth, notNullValue());
assertThat(indexHealth.getIndex(),equalTo(indexName));
assertThat(indexHealth.getActivePrimaryShards(),equalTo(1));
assertThat(indexHealth.getActiveShards(),equalTo(1));
assertThat(indexHealth.getNumberOfReplicas(),equalTo(1));
assertThat(indexHealth.getInitializingShards(),equalTo(0));
assertThat(indexHealth.getUnassignedShards(),equalTo(1));
assertThat(indexHealth.getRelocatingShards(),equalTo(0));
assertThat(indexHealth.getStatus(),equalTo(ClusterHealthStatus.YELLOW));
if (emptyShards) {
assertThat(indexHealth.getShards().size(), equalTo(0));
} else {
assertThat(indexHealth.getShards().size(), equalTo(1));
for (Map.Entry<Integer, ClusterShardHealth> entry : indexHealth.getShards().entrySet()) {
assertYellowShard(entry.getKey(), entry.getValue());
}
}
}

private static void assertYellowShard(int shardId, ClusterShardHealth shardHealth) {
assertThat(shardHealth, notNullValue());
assertThat(shardHealth.getShardId(), equalTo(shardId));
assertThat(shardHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(shardHealth.getActiveShards(), equalTo(1));
assertThat(shardHealth.getInitializingShards(), equalTo(0));
assertThat(shardHealth.getUnassignedShards(), equalTo(1));
assertThat(shardHealth.getRelocatingShards(), equalTo(0));
}

public void testClusterHealthNotFoundIndex() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest("notexisted-index");
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(true));
assertThat(response.status(), equalTo(RestStatus.REQUEST_TIMEOUT));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));
assertNoIndices(response);
}

private static void assertNoIndices(ClusterHealthResponse response) {
assertThat(response.getIndices(), equalTo(emptyMap()));
assertThat(response.getActivePrimaryShards(), equalTo(0));
assertThat(response.getNumberOfDataNodes(), equalTo(1));
assertThat(response.getNumberOfNodes(), equalTo(1));
assertThat(response.getActiveShards(), equalTo(0));
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
assertThat(response.getInitializingShards(), equalTo(0));
assertThat(response.getUnassignedShards(), equalTo(0));
assertThat(response.getActiveShardsPercent(), equalTo(100d));
}
}
Loading

0 comments on commit d5e8a5c

Please sign in to comment.