Skip to content

Commit

Permalink
Tracking indicesToMarkIneligibleForAutoRelease instead of a Map and a…
Browse files Browse the repository at this point in the history
…ddressing other minor comments
  • Loading branch information
Bukhtawar committed May 31, 2019
1 parent c90c84f commit 92e7d7b
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@

package org.elasticsearch.cluster.routing.allocation;

import java.util.HashMap;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -116,20 +115,18 @@ public void onNewInfo(ClusterInfo info) {
ClusterState state = clusterStateSupplier.get();
Set<String> indicesToMarkReadOnly = new HashSet<>();
RoutingNodes routingNodes = state.getRoutingNodes();
Map<String, Boolean> indexAutoReleaseEligibility = new HashMap<>();
// Ensure we release indices on nodes that have a usage response from node stats
markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indexAutoReleaseEligibility);
Set<String> indicesToMarkIneligibleForAutoRelease = new HashSet<>();
//Ensure we release indices on nodes that have a usage response from node stats
markNodesMissingUsageIneligibleForRelease(routingNodes, usages, indicesToMarkIneligibleForAutoRelease);
for (ObjectObjectCursor<String, DiskUsage> entry : usages) {
String node = entry.key;
DiskUsage usage = entry.value;
warnAboutDiskIfNeeded(usage);
RoutingNode routingNode = state.getRoutingNodes().node(node);
RoutingNode routingNode = routingNodes.node(node);
// Only unblock index if all nodes that contain shards of it are below the high disk watermark
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes()
|| usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) {
markEligiblityForAutoRelease(routingNode, indexAutoReleaseEligibility, false);
} else {
markEligiblityForAutoRelease(routingNode, indexAutoReleaseEligibility, true);
markIneligiblityForAutoRelease(routingNode, indicesToMarkIneligibleForAutoRelease);
}
if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdFloodStage().getBytes() ||
usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdFloodStage()) {
Expand Down Expand Up @@ -179,9 +176,9 @@ public void onNewInfo(ClusterInfo info) {

// Get set of indices that are eligible to be automatically unblocked
// Only collect indices that are currently blocked
Set<String> indicesToAutoRelease = indexAutoReleaseEligibility.entrySet().stream()
.filter(Map.Entry::getValue)
.map(Map.Entry::getKey)
final String[] indices = state.routingTable().indicesRouting().keys().toArray(String.class);
Set<String> indicesToAutoRelease = Arrays.stream(indices)
.filter(index -> indicesToMarkIneligibleForAutoRelease.contains(index) == false)
.filter(index -> state.getBlocks().hasIndexBlock(index, IndexMetaData.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK))
.collect(Collectors.toCollection(HashSet::new));

Expand All @@ -190,7 +187,7 @@ public void onNewInfo(ClusterInfo info) {
logger.info("Releasing read-only allow delete block on indices: [{}]", indicesToAutoRelease);
updateIndicesReadOnly(indicesToAutoRelease, false);
} else {
deprecationLogger.deprecated("es.disk.auto_release_flood_stage_block will be removed in 8.0.0");
deprecationLogger.deprecated("[{}] will be removed in 8.0.0", DiskThresholdSettings.AUTO_RELEASE_INDEX_ENABLED_KEY);
}
}
indicesToMarkReadOnly.removeIf(index -> state.getBlocks().indexBlocked(ClusterBlockLevel.WRITE, index));
Expand All @@ -202,24 +199,20 @@ public void onNewInfo(ClusterInfo info) {


private void markNodesMissingUsageIneligibleForRelease(RoutingNodes routingNodes, ImmutableOpenMap<String, DiskUsage> usages,
Map<String, Boolean> indexAutoReleaseEligibility) {
if (routingNodes.size() != usages.size()) {
for (RoutingNode routingNode : routingNodes) {
if (!usages.keys().contains(routingNode.nodeId())) {
markEligiblityForAutoRelease(routingNode, indexAutoReleaseEligibility, false);
}
Set<String> indicesToMarkIneligibleForAutoRelease) {
for (RoutingNode routingNode : routingNodes) {
if (usages.containsKey(routingNode.nodeId()) == false) {
markIneligiblityForAutoRelease(routingNode, indicesToMarkIneligibleForAutoRelease);
}
}

}

private void markEligiblityForAutoRelease(RoutingNode routingNode, Map<String, Boolean> indexAutoReleaseEligibility,
boolean eligible) {
private void markIneligiblityForAutoRelease(RoutingNode routingNode, Set<String> indicesToMarkIneligibleForAutoRelease) {
if (routingNode != null) {
for (ShardRouting routing : routingNode) {
String indexName = routing.index().getName();
boolean value = indexAutoReleaseEligibility.getOrDefault(indexName, true);
indexAutoReleaseEligibility.put(indexName, value && eligible);
indicesToMarkIneligibleForAutoRelease.add(indexName);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ public class DiskThresholdSettings {
private volatile Double freeDiskThresholdFloodStage;
private volatile ByteSizeValue freeBytesThresholdFloodStage;
private static boolean autoReleaseIndexEnabled;
public static final String AUTO_RELEASE_INDEX_ENABLED_KEY = "es.disk.auto_release_flood_stage_block";

static {
final String AUTO_RELEASE_INDEX_ENABLED_KEY = "es.disk.auto_release_flood_stage_block";
final String property = System.getProperty(AUTO_RELEASE_INDEX_ENABLED_KEY);
if (property == null) {
autoReleaseIndexEnabled = true;
Expand Down

0 comments on commit 92e7d7b

Please sign in to comment.