Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

REST high-level client: add Cluster Health API #29331

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,16 @@

import org.apache.http.Header;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;

import static java.util.Collections.emptySet;
import static java.util.Collections.singleton;

/**
* A wrapper for the {@link RestHighLevelClient} that provides methods for accessing the Cluster API.
Expand Down Expand Up @@ -63,4 +67,29 @@ public void putSettingsAsync(ClusterUpdateSettingsRequest clusterUpdateSettingsR
restHighLevelClient.performRequestAsyncAndParseEntity(clusterUpdateSettingsRequest, Request::clusterPutSettings,
ClusterUpdateSettingsResponse::fromXContent, listener, emptySet(), headers);
}

/**
* Get cluster health using the Cluster Health API
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html"> Cluster Health API on elastic.co</a>
* <p>
* If timeout occurred, {@link ClusterHealthResponse} will have isTimedOut() == true and status() == RestStatus.REQUEST_TIMEOUT
*/
public ClusterHealthResponse health(ClusterHealthRequest healthRequest, Header... headers) throws IOException {
return restHighLevelClient.performRequestAndParseEntity(healthRequest, Request::clusterHealth, ClusterHealthResponse::fromXContent,
singleton(RestStatus.REQUEST_TIMEOUT.getStatus()), headers);
}

/**
* Asynchronously get cluster health using the Cluster Health API
* <p>
* See
* <a href="https://www.elastic.co/guide/en/elasticsearch/reference/current/cluster-health.html"> Cluster Health API on elastic.co</a>
* If timeout occurred, {@link ClusterHealthResponse} will have isTimedOut() == true and status() == RestStatus.REQUEST_TIMEOUT
*/
public void healthAsync(ClusterHealthRequest healthRequest, ActionListener<ClusterHealthResponse> listener, Header... headers) {
restHighLevelClient.performRequestAsyncAndParseEntity(healthRequest, Request::clusterHealth, ClusterHealthResponse::fromXContent,
listener, singleton(RestStatus.REQUEST_TIMEOUT.getStatus()), headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.http.entity.ContentType;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.get.GetAliasesRequest;
Expand Down Expand Up @@ -58,7 +59,9 @@
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.SuppressForbidden;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -583,6 +586,23 @@ static Request clusterPutSettings(ClusterUpdateSettingsRequest clusterUpdateSett
return new Request(HttpPut.METHOD_NAME, "/_cluster/settings", parameters.getParams(), entity);
}

static Request clusterHealth(ClusterHealthRequest healthRequest) {
Params params = Params.builder();
params.withWaitForStatus(healthRequest.waitForStatus());
params.withWaitForNoRelocatingShards(healthRequest.waitForNoRelocatingShards());
params.withWaitForNoInitializingShards(healthRequest.waitForNoInitializingShards());
params.withWaitForActiveShards(healthRequest.waitForActiveShards());
params.withWaitForNodes(healthRequest.waitForNodes());
params.withWaitForEvents(healthRequest.waitForEvents());
params.withTimeout(healthRequest.timeout());
params.withMasterTimeout(healthRequest.masterNodeTimeout());
params.withLocal(healthRequest.local());
params.withLevel(healthRequest.level());
String[] indices = healthRequest.indices() == null ? Strings.EMPTY_ARRAY : healthRequest.indices();
String endpoint = endpoint("_cluster/health", indices);
return new Request(HttpGet.METHOD_NAME, endpoint, params.getParams(), null);
}

static Request rollover(RolloverRequest rolloverRequest) throws IOException {
Params params = Params.builder();
params.withTimeout(rolloverRequest.timeout());
Expand Down Expand Up @@ -643,6 +663,10 @@ static String endpoint(String[] indices, String endpoint, String[] suffixes) {
.addCommaSeparatedPathParts(suffixes).build();
}

static String endpoint(String endpoint, String[] suffixes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not loving that we have a similar endpoint method that takes Strings[] indices, String endpoint as arguments. Can we use directly EndpointBuilder here like we do in a few other places rather than adding this new method?

return new EndpointBuilder().addPathPartAsIs(endpoint).addCommaSeparatedPathParts(suffixes).build();
}

static String endpoint(String[] indices, String endpoint, String type) {
return new EndpointBuilder().addCommaSeparatedPathParts(indices).addPathPartAsIs(endpoint).addPathPart(type).build();
}
Expand Down Expand Up @@ -833,6 +857,42 @@ Params withIncludeDefaults(boolean includeDefaults) {
return this;
}

Params withWaitForStatus(ClusterHealthStatus status) {
if (status != null) {
return putParam("wait_for_status", status.name().toLowerCase(Locale.ROOT));
}
return this;
}

Params withWaitForNoRelocatingShards(boolean waitNoRelocatingShards) {
if (waitNoRelocatingShards) {
return putParam("wait_for_no_relocating_shards", Boolean.TRUE.toString());
}
return this;
}

Params withWaitForNoInitializingShards(boolean waitNoInitShards) {
if (waitNoInitShards) {
return putParam("wait_for_no_initializing_shards", Boolean.TRUE.toString());
}
return this;
}

Params withWaitForNodes(String waitForNodes) {
return putParam("wait_for_nodes", waitForNodes);
}

Params withLevel(String level) {
return putParam("level", level);
}

Params withWaitForEvents(Priority waitForEvents) {
if (waitForEvents != null) {
return putParam("wait_for_events", waitForEvents.name().toLowerCase(Locale.ROOT));
}
return this;
}

Map<String, String> getParams() {
return Collections.unmodifiableMap(params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,13 @@
package org.elasticsearch.client;

import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsResponse;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.cluster.health.ClusterIndexHealth;
import org.elasticsearch.cluster.health.ClusterShardHealth;
import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
Expand All @@ -34,6 +39,7 @@
import java.util.HashMap;
import java.util.Map;

import static java.util.Collections.emptyMap;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.notNullValue;
Expand Down Expand Up @@ -105,4 +111,107 @@ public void testClusterUpdateSettingNonExistent() {
assertThat(exception.getMessage(), equalTo(
"Elasticsearch exception [type=illegal_argument_exception, reason=transient setting [" + setting + "], not recognized]"));
}

public void testClusterHealthGreen() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.GREEN));
emptyClusterAssertion(response);
}

public void testClusterHealthYellow() throws IOException {
createIndex("index", Settings.EMPTY);
createIndex("index2", Settings.EMPTY);
ClusterHealthRequest request = new ClusterHealthRequest();
request.timeout("5s");
boolean requestOneIndex = randomBoolean();
if (requestOneIndex) {
request.indices("index");
}
String level = randomFrom("default-shards", "cluster", "indices", "shards");
if (!"default-shards".equals(level)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default-shards is a non supported value right? One more reason to have a specific method for each value here, it would clarify this test I think

request.level(level);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would prefer that we don't want randomize anything in this integration test. These are quite hard to debug if they fail, I would prefer to have different test methods here for the main code paths.

ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(false));
assertThat(response.status(), equalTo(RestStatus.OK));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(response.getActivePrimaryShards(), equalTo(requestOneIndex? 5 : 10));
assertThat(response.getNumberOfDataNodes(), equalTo(1));
assertThat(response.getNumberOfNodes(), equalTo(1));
assertThat(response.getActiveShards(), equalTo(requestOneIndex? 5 : 10));
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
assertThat(response.getInitializingShards(), equalTo(0));
assertThat(response.getUnassignedShards(), equalTo(requestOneIndex? 5 : 10));
assertThat(response.getActiveShardsPercent(), equalTo(50d));
if ("shards".equals(level) || "indices".equals(level) || "default-shards".equals(level)) {
assertThat(response.getIndices().size(), equalTo(requestOneIndex ? 1 : 2));
for (Map.Entry<String, ClusterIndexHealth> entry : response.getIndices().entrySet()) {
indexAssertion(entry.getKey(), entry.getValue(), level);
}
} else {
assertThat(response.getIndices().size(), equalTo(0));
}
}

private void indexAssertion(String indexName, ClusterIndexHealth indexHealth, String level) {
assertThat(indexHealth, notNullValue());
assertThat(indexHealth.getIndex(),equalTo(indexName));
assertThat(indexHealth.getActivePrimaryShards(),equalTo(5));
assertThat(indexHealth.getActiveShards(),equalTo(5));
assertThat(indexHealth.getNumberOfReplicas(),equalTo(1));
assertThat(indexHealth.getInitializingShards(),equalTo(0));
assertThat(indexHealth.getUnassignedShards(),equalTo(5));
assertThat(indexHealth.getRelocatingShards(),equalTo(0));
assertThat(indexHealth.getStatus(),equalTo(ClusterHealthStatus.YELLOW));
if ("shards".equals(level) || "default-shards".equals(level)) {
assertThat(indexHealth.getShards().size(), equalTo(5));
for (Map.Entry<Integer, ClusterShardHealth> entry : indexHealth.getShards().entrySet()) {
shardAssertion(entry.getKey(), entry.getValue());
}
} else {
assertThat(indexHealth.getShards().size(), equalTo(0));
}
}

private void shardAssertion(int shardId, ClusterShardHealth shardHealth) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe call it assertYellowShard ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, all these assert methods could be static?

assertThat(shardHealth, notNullValue());
assertThat(shardHealth.getShardId(), equalTo(shardId));
assertThat(shardHealth.getStatus(), equalTo(ClusterHealthStatus.YELLOW));
assertThat(shardHealth.getActiveShards(), equalTo(1));
assertThat(shardHealth.getInitializingShards(), equalTo(0));
assertThat(shardHealth.getUnassignedShards(), equalTo(1));
assertThat(shardHealth.getRelocatingShards(), equalTo(0));
}

public void testClusterHealthNotFoundIndex() throws IOException {
ClusterHealthRequest request = new ClusterHealthRequest("notexisted-index");
request.timeout("5s");
ClusterHealthResponse response = execute(request, highLevelClient().cluster()::health, highLevelClient().cluster()::healthAsync);

assertThat(response, notNullValue());
assertThat(response.isTimedOut(), equalTo(true));
assertThat(response.status(), equalTo(RestStatus.REQUEST_TIMEOUT));
assertThat(response.getStatus(), equalTo(ClusterHealthStatus.RED));
emptyClusterAssertion(response);
}

public static void emptyClusterAssertion(ClusterHealthResponse response) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe call it assertNoIndices ? that is the empty part right?

assertThat(response.getIndices(), equalTo(emptyMap()));
assertThat(response.getActivePrimaryShards(), equalTo(0));
assertThat(response.getNumberOfDataNodes(), equalTo(1));
assertThat(response.getNumberOfNodes(), equalTo(1));
assertThat(response.getActiveShards(), equalTo(0));
assertThat(response.getDelayedUnassignedShards(), equalTo(0));
assertThat(response.getInitializingShards(), equalTo(0));
assertThat(response.getUnassignedShards(), equalTo(0));
assertThat(response.getActiveShardsPercent(), equalTo(100d));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.http.util.EntityUtils;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest.AliasActions;
Expand Down Expand Up @@ -67,8 +68,10 @@
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.action.support.replication.ReplicationRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.cluster.health.ClusterHealthStatus;
import org.elasticsearch.common.CheckedBiConsumer;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -127,6 +130,7 @@
import static org.elasticsearch.search.RandomSearchRequestGenerator.randomSearchRequest;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertToXContentEquivalent;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;

public class RequestTests extends ESTestCase {
Expand Down Expand Up @@ -1325,6 +1329,78 @@ public void testClusterPutSettings() throws IOException {
assertEquals(expectedParams, expectedRequest.getParameters());
}

public void testClusterHealth() {
ClusterHealthRequest healthRequest = new ClusterHealthRequest();
Map<String, String> expectedParams = new HashMap<>();
setRandomLocal(healthRequest, expectedParams);
if (randomBoolean()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we also test the case where we set masterNodeTimeout but not timeout?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a switch which covers all the 4 permutations (none set, both set, only timeout set, only masterNodeTimeout set) would be easier to read?

String timeout = randomTimeValue();
healthRequest.timeout(timeout);
expectedParams.put("timeout", timeout);
if (randomBoolean()) {
String masterTimeout = randomTimeValue();
healthRequest.masterNodeTimeout(masterTimeout);
expectedParams.put("master_timeout", masterTimeout);
} else {
// If Master Timeout wasn't set it uses the same value as Timeout
expectedParams.put("master_timeout", timeout);
}
} else {
expectedParams.put("timeout", "30s");
expectedParams.put("master_timeout", "30s");
}
setRandomWaitForActiveShards(healthRequest::waitForActiveShards, expectedParams);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add the default value as argument here instead?

if (!expectedParams.containsKey("wait_for_active_shards")) {
expectedParams.put("wait_for_active_shards", "0");
}
if (randomBoolean()) {
String level = randomFrom("cluster", "indices", "shards");
healthRequest.level(level);
expectedParams.put("level", level);
} else {
expectedParams.put("level", "shards");
}
if (randomBoolean()) {
Priority priority = randomFrom(Priority.values());
healthRequest.waitForEvents(priority);
expectedParams.put("wait_for_events", priority.name().toLowerCase(Locale.ROOT));
}
if (randomBoolean()) {
ClusterHealthStatus status = randomFrom(ClusterHealthStatus.values());
healthRequest.waitForStatus(status);
expectedParams.put("wait_for_status", status.name().toLowerCase(Locale.ROOT));
}
if (randomBoolean()) {
boolean waitForNoInitializingShards = randomBoolean();
healthRequest.waitForNoInitializingShards(waitForNoInitializingShards);
if (waitForNoInitializingShards) {
expectedParams.put("wait_for_no_initializing_shards", Boolean.TRUE.toString());
}
}
if (randomBoolean()) {
boolean waitForNoRelocatingShards = randomBoolean();
healthRequest.waitForNoRelocatingShards(waitForNoRelocatingShards);
if (waitForNoRelocatingShards) {
expectedParams.put("wait_for_no_relocating_shards", Boolean.TRUE.toString());
}
}
String[] indices = randomIndicesNames(0, 5);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

like we do in other places, could you also the case where indices is assigned to null? e.g. String[] indices = randomBoolean() ? null : randomIndicesNames(0, 5);

if (indices.length > 0) {
healthRequest.indices(indices);
}

Request request = Request.clusterHealth(healthRequest);
assertThat(request, notNullValue());
assertThat(request.getMethod(), equalTo(HttpGet.METHOD_NAME));
assertThat(request.getEntity(), nullValue());
if (indices.length > 0) {
assertThat(request.getEndpoint(), equalTo("/_cluster/health/" + String.join(",", indices)));
} else {
assertThat(request.getEndpoint(), equalTo("/_cluster/health"));
}
assertThat(request.getParameters(), equalTo(expectedParams));
}

public void testRollover() throws IOException {
RolloverRequest rolloverRequest = new RolloverRequest(randomAlphaOfLengthBetween(3, 10),
randomBoolean() ? null : randomAlphaOfLengthBetween(3, 10));
Expand Down Expand Up @@ -1501,6 +1577,7 @@ public void testEndpoint() {
new String[]{"type1", "type2"}, "_endpoint"));
assertEquals("/index1,index2/_endpoint/suffix1,suffix2", Request.endpoint(new String[]{"index1", "index2"},
"_endpoint", new String[]{"suffix1", "suffix2"}));
assertEquals("/_endpoint/suffix1,suffix2", Request.endpoint("_endpoint", new String[]{"suffix1", "suffix2"}));
}

public void testCreateContentType() {
Expand Down
Loading