Skip to content

Commit

Permalink
Always enforce cluster-wide shard limit (#34892)
Browse files Browse the repository at this point in the history
This removes the option to run a cluster without enforcing the
cluster-wide shard limit, making strict enforcement the default and only
behavior.  The limit can still be adjusted as desired using the cluster
settings API.
  • Loading branch information
gwbrown committed Nov 27, 2018
1 parent 4d525e3 commit 119835d
Show file tree
Hide file tree
Showing 11 changed files with 300 additions and 92 deletions.
36 changes: 16 additions & 20 deletions docs/reference/modules/cluster/misc.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -26,44 +26,40 @@ API can make the cluster read-write again.

==== Cluster Shard Limit

In a Elasticsearch 7.0 and later, there will be a soft limit on the number of
shards in a cluster, based on the number of nodes in the cluster. This is
intended to prevent operations which may unintentionally destabilize the
cluster. Prior to 7.0, actions which would result in the cluster going over the
limit will issue a deprecation warning.

NOTE: You can set the system property `es.enforce_max_shards_per_node` to `true`
to opt in to strict enforcement of the shard limit. If this system property is
set, actions which would result in the cluster going over the limit will result
in an error, rather than a deprecation warning. This property will be removed in
Elasticsearch 7.0, as strict enforcement of the limit will be the default and
only behavior.
There is a soft limit on the number of shards in a cluster, based on the number
of nodes in the cluster. This is intended to prevent operations which may
unintentionally destabilize the cluster.

If an operation, such as creating a new index, restoring a snapshot of an index,
or opening a closed index would lead to the number of shards in the cluster
going over this limit, the operation will issue a deprecation warning.
going over this limit, the operation will fail with an error indicating the
shard limit.

If the cluster is already over the limit, due to changes in node membership or
setting changes, all operations that create or open indices will issue warnings
until either the limit is increased as described below, or some indices are
setting changes, all operations that create or open indices will fail until
either the limit is increased as described below, or some indices are
<<indices-open-close,closed>> or <<indices-delete-index,deleted>> to bring the
number of shards below the limit.

Replicas count towards this limit, but closed indexes do not. An index with 5
primary shards and 2 replicas will be counted as 15 shards. Any closed index
primary shards and 2 replicas will be counted as 15 shards. Any closed index
is counted as 0, no matter how many shards and replicas it contains.

The limit defaults to 1,000 shards per node, and be dynamically adjusted using
the following property:
The limit defaults to 1,000 shards per data node, and can be dynamically
adjusted using the following property:

`cluster.max_shards_per_node`::

Controls the number of shards allowed in the cluster per node.
Controls the number of shards allowed in the cluster per data node.

For example, a 3-node cluster with the default setting would allow 3,000 shards
total, across all open indexes. If the above setting is changed to 1,500, then
total, across all open indexes. If the above setting is changed to 1,500, then
the cluster would allow 4,500 shards total.

NOTE: If there are no data nodes in the cluster, the limit will not be enforced.
This allows the creation of indices during cluster creation if dedicated master
nodes are set up before data nodes.

[[user-defined-data]]
==== User Defined Cluster Metadata

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -602,7 +602,7 @@ public void validateIndexSettings(String indexName, final Settings settings, fin
final boolean forbidPrivateIndexSettings) throws IndexCreationException {
List<String> validationErrors = getIndexSettingsValidationErrors(settings, forbidPrivateIndexSettings);

Optional<String> shardAllocation = checkShardLimit(settings, clusterState, deprecationLogger);
Optional<String> shardAllocation = checkShardLimit(settings, clusterState);
shardAllocation.ifPresent(validationErrors::add);

if (validationErrors.isEmpty() == false) {
Expand All @@ -617,14 +617,13 @@ public void validateIndexSettings(String indexName, final Settings settings, fin
*
* @param settings The settings of the index to be created.
* @param clusterState The current cluster state.
* @param deprecationLogger The logger to use to emit a deprecation warning, if appropriate.
* @return If present, an error message to be used to reject index creation. If empty, a signal that this operation may be carried out.
*/
static Optional<String> checkShardLimit(Settings settings, ClusterState clusterState, DeprecationLogger deprecationLogger) {
static Optional<String> checkShardLimit(Settings settings, ClusterState clusterState) {
int shardsToCreate = IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(settings)
* (1 + IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(settings));

return IndicesService.checkShardLimit(shardsToCreate, clusterState, deprecationLogger);
return IndicesService.checkShardLimit(shardsToCreate, clusterState);
}

List<String> getIndexSettingsValidationErrors(final Settings settings, final boolean forbidPrivateIndexSettings) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@

package org.elasticsearch.cluster.metadata;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
Expand Down Expand Up @@ -188,7 +188,7 @@ public ClusterState execute(ClusterState currentState) {
}
}

validateShardLimit(currentState, request.indices(), deprecationLogger);
validateShardLimit(currentState, request.indices());

if (indicesToOpen.isEmpty()) {
return currentState;
Expand Down Expand Up @@ -238,16 +238,15 @@ public ClusterState execute(ClusterState currentState) {
*
* @param currentState The current cluster state.
* @param indices The indices which are to be opened.
* @param deprecationLogger The logger to use to emit a deprecation warning, if appropriate.
* @throws ValidationException If this operation would take the cluster over the limit and enforcement is enabled.
*/
static void validateShardLimit(ClusterState currentState, Index[] indices, DeprecationLogger deprecationLogger) {
static void validateShardLimit(ClusterState currentState, Index[] indices) {
int shardsToOpen = Arrays.stream(indices)
.filter(index -> currentState.metaData().index(index).getState().equals(IndexMetaData.State.CLOSE))
.mapToInt(index -> getTotalShardCount(currentState, index))
.sum();

Optional<String> error = IndicesService.checkShardLimit(shardsToOpen, currentState, deprecationLogger);
Optional<String> error = IndicesService.checkShardLimit(shardsToOpen, currentState);
if (error.isPresent()) {
ValidationException ex = new ValidationException();
ex.addValidationError(error.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ public ClusterState execute(ClusterState currentState) {
int totalNewShards = Arrays.stream(request.indices())
.mapToInt(i -> getTotalNewShards(i, currentState, updatedNumberOfReplicas))
.sum();
Optional<String> error = IndicesService.checkShardLimit(totalNewShards, currentState, deprecationLogger);
Optional<String> error = IndicesService.checkShardLimit(totalNewShards, currentState);
if (error.isPresent()) {
ValidationException ex = new ValidationException();
ex.addValidationError(error.get());
Expand Down
12 changes: 2 additions & 10 deletions server/src/main/java/org/elasticsearch/indices/IndicesService.java
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.IndexScopedSettings;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Setting.Property;
Expand Down Expand Up @@ -1401,11 +1400,10 @@ public boolean isMetaDataField(String field) {
*
* @param newShards The number of shards to be added by this operation
* @param state The current cluster state
* @param deprecationLogger The logger to use for deprecation warnings
* @return If present, an error message to be given as the reason for failing
* an operation. If empty, a sign that the operation is valid.
*/
public static Optional<String> checkShardLimit(int newShards, ClusterState state, DeprecationLogger deprecationLogger) {
public static Optional<String> checkShardLimit(int newShards, ClusterState state) {
Settings theseSettings = state.metaData().settings();
int nodeCount = state.getNodes().getDataNodes().size();

Expand All @@ -1421,13 +1419,7 @@ public static Optional<String> checkShardLimit(int newShards, ClusterState state
if ((currentOpenShards + newShards) > maxShardsInCluster) {
String errorMessage = "this action would add [" + newShards + "] total shards, but this cluster currently has [" +
currentOpenShards + "]/[" + maxShardsInCluster + "] maximum shards open";
if (ENFORCE_MAX_SHARDS_PER_NODE) {
return Optional.of(errorMessage);
} else {
deprecationLogger.deprecated("In a future major version, this request will fail because {}. Before upgrading, " +
"reduce the number of shards in your cluster or adjust the cluster setting [{}].",
errorMessage, MetaData.SETTING_CLUSTER_MAX_SHARDS_PER_NODE.getKey());
}
return Optional.of(errorMessage);
}
return Optional.empty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -471,7 +472,7 @@ public void testCalculateNumRoutingShards() {
}
}

public void testShardLimitDeprecationWarning() {
public void testShardLimit() {
int nodesInCluster = randomIntBetween(2,100);
ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster);
Settings clusterSettings = Settings.builder()
Expand All @@ -487,13 +488,13 @@ public void testShardLimitDeprecationWarning() {
.build();

DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
MetaDataCreateIndexService.checkShardLimit(indexSettings, state, deprecationLogger);
Optional<String> errorMessage = MetaDataCreateIndexService.checkShardLimit(indexSettings, state);
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
int maxShards = counts.getShardsPerNode() * nodesInCluster;
assertWarnings("In a future major version, this request will fail because this action would add [" +
totalShards + "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open."+
" Before upgrading, reduce the number of shards in your cluster or adjust the cluster setting [cluster.max_shards_per_node].");
assertTrue(errorMessage.isPresent());
assertEquals("this action would add [" + totalShards + "] total shards, but this cluster currently has [" + currentShards
+ "]/[" + maxShards + "] maximum shards open", errorMessage.get());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.shards.ClusterShardLimitIT;
import org.elasticsearch.common.ValidationException;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.logging.DeprecationLogger;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -40,7 +41,7 @@

public class MetaDataIndexStateServiceTests extends ESTestCase {

public void testValidateShardLimitDeprecationWarning() {
public void testValidateShardLimit() {
int nodesInCluster = randomIntBetween(2,100);
ClusterShardLimitIT.ShardCounts counts = forDataNodeCount(nodesInCluster);
Settings clusterSettings = Settings.builder()
Expand All @@ -55,13 +56,13 @@ public void testValidateShardLimitDeprecationWarning() {
.toArray(new Index[2]);

DeprecationLogger deprecationLogger = new DeprecationLogger(logger);
MetaDataIndexStateService.validateShardLimit(state, indices, deprecationLogger);
int totalShards = counts.getFailingIndexShards() * (1 + counts.getFailingIndexReplicas());
int currentShards = counts.getFirstIndexShards() * (1 + counts.getFirstIndexReplicas());
int maxShards = counts.getShardsPerNode() * nodesInCluster;
assertWarnings("In a future major version, this request will fail because this action would add [" +
totalShards + "] total shards, but this cluster currently has [" + currentShards + "]/[" + maxShards + "] maximum shards open."+
" Before upgrading, reduce the number of shards in your cluster or adjust the cluster setting [cluster.max_shards_per_node].");
ValidationException exception = expectThrows(ValidationException.class,
() -> MetaDataIndexStateService.validateShardLimit(state, indices));
assertEquals("Validation Failed: 1: this action would add [" + totalShards + "] total shards, but this cluster currently has [" +
currentShards + "]/[" + maxShards + "] maximum shards open;", exception.getMessage());
}

public static ClusterState createClusterForShardLimitTest(int nodesInCluster, int openIndexShards, int openIndexReplicas,
Expand Down
Loading

0 comments on commit 119835d

Please sign in to comment.