From d440aae525eabd6f56fd684f74897687a6f4890e Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 1 Jul 2020 08:04:45 +0100 Subject: [PATCH 1/3] Account for remaining recovery in disk allocator Today the disk-based shard allocator accounts for incoming shards by subtracting the estimated size of the incoming shard from the free space on the node. This is an overly conservative estimate if the incoming shard has almost finished its recovery since in that case it is already consuming most of the disk space it needs. This change adds to the shard stats a measure of how much larger each store is expected to grow, computed from the ongoing recovery, and uses this to account for the disk usage of incoming shards more accurately. Backport of #58029 to 7.x --- docs/reference/cluster/nodes-stats.asciidoc | 12 ++ docs/reference/cluster/stats.asciidoc | 15 +- .../test/nodes.stats/40_store_stats.yml | 22 +++ .../cluster/ClusterInfoServiceIT.java | 9 + .../indices/recovery/IndexRecoveryIT.java | 58 ++++++ .../elasticsearch/cluster/ClusterInfo.java | 175 +++++++++++++++++- .../cluster/InternalClusterInfoService.java | 80 +++++--- .../allocation/DiskThresholdMonitor.java | 24 ++- .../decider/DiskThresholdDecider.java | 14 +- .../org/elasticsearch/index/IndexService.java | 2 +- .../elasticsearch/index/shard/IndexShard.java | 6 +- .../elasticsearch/index/shard/ShardPath.java | 2 +- .../index/shard/StoreRecovery.java | 5 +- .../org/elasticsearch/index/store/Store.java | 7 +- .../elasticsearch/index/store/StoreStats.java | 41 +++- .../indices/NodeIndicesStats.java | 8 + .../indices/recovery/RecoveryState.java | 37 ++++ .../indices/recovery/RecoveryTarget.java | 2 + .../blobstore/FileRestoreContext.java | 2 + .../cluster/ClusterInfoTests.java | 20 +- .../elasticsearch/cluster/DiskUsageTests.java | 7 +- .../allocation/DiskThresholdMonitorTests.java | 87 ++++++--- .../decider/DiskThresholdDeciderTests.java | 34 +++- .../DiskThresholdDeciderUnitTests.java | 6 +- .../index/shard/IndexShardTests.java | 2 + .../elasticsearch/index/store/StoreTests.java | 20 +- .../indices/recovery/RecoveryTargetTests.java | 15 +- .../MockInternalClusterInfoService.java | 2 +- .../ShardFollowTaskReplicationTests.java | 1 + .../engine/FollowEngineIndexShardTests.java | 1 + .../ClusterStatsMonitoringDocTests.java | 3 +- .../indices/IndexStatsMonitoringDocTests.java | 2 +- .../IndicesStatsMonitoringDocTests.java | 2 +- .../node/NodeStatsMonitoringDocTests.java | 2 +- 34 files changed, 641 insertions(+), 84 deletions(-) create mode 100644 rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/40_store_stats.yml diff --git a/docs/reference/cluster/nodes-stats.asciidoc b/docs/reference/cluster/nodes-stats.asciidoc index a1b1a2fd5268b..34bfdffc89655 100644 --- a/docs/reference/cluster/nodes-stats.asciidoc +++ b/docs/reference/cluster/nodes-stats.asciidoc @@ -245,6 +245,18 @@ Total size of all shards assigned to the node. `size_in_bytes`:: (integer) Total size, in bytes, of all shards assigned to the node. + +`reserved`:: +(<>) +A prediction of how much larger the shard stores on this node will eventually +grow due to ongoing peer recoveries, restoring snapshots, and similar +activities. A value of `-1b` indicates that this is not available. + +`reserved_in_bytes`:: +(integer) +A prediction, in bytes, of how much larger the shard stores on this node will +eventually grow due to ongoing peer recoveries, restoring snapshots, and +similar activities. A value of `-1` indicates that this is not available. ======= `indexing`:: diff --git a/docs/reference/cluster/stats.asciidoc b/docs/reference/cluster/stats.asciidoc index db725d2c6a088..6c243f7d55935 100644 --- a/docs/reference/cluster/stats.asciidoc +++ b/docs/reference/cluster/stats.asciidoc @@ -231,6 +231,17 @@ Total size of all shards assigned to selected nodes. `size_in_bytes`:: (integer) Total size, in bytes, of all shards assigned to selected nodes. + +`reserved`:: +(<>) +A prediction of how much larger the shard stores will eventually grow due to +ongoing peer recoveries, restoring snapshots, and similar activities. + +`reserved_in_bytes`:: +(integer) +A prediction, in bytes, of how much larger the shard stores will eventually +grow due to ongoing peer recoveries, restoring snapshots, and similar +activities. ===== `fielddata`:: @@ -1135,7 +1146,9 @@ The API returns the following response: }, "store": { "size": "16.2kb", - "size_in_bytes": 16684 + "size_in_bytes": 16684, + "reserved": "0b", + "reserved_in_bytes": 0 }, "fielddata": { "memory_size": "0b", diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/40_store_stats.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/40_store_stats.yml new file mode 100644 index 0000000000000..accfbd4cd7cda --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/nodes.stats/40_store_stats.yml @@ -0,0 +1,22 @@ +--- +"Store stats": + - skip: + version: " - 7.99.99" + reason: "reserved_in_bytes field is not returned in prior versions" + features: [arbitrary_key] + + - do: + nodes.info: + node_id: _master + - set: + nodes._arbitrary_key_: master + + - do: + nodes.stats: + metric: [ indices ] + index_metric: [ store ] + + - is_false: nodes.$master.discovery + - is_true: nodes.$master.indices.store + - gte: { nodes.$master.indices.store.size_in_bytes: 0 } + - gte: { nodes.$master.indices.store.reserved_in_bytes: -1 } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java index 8878afc690be1..07e221a7eacce 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/cluster/ClusterInfoServiceIT.java @@ -29,6 +29,7 @@ import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.allocation.decider.EnableAllocationDecider; import org.elasticsearch.cluster.service.ClusterService; @@ -162,6 +163,8 @@ public void testClusterInfoServiceCollectsInformation() { IndexService indexService = indicesService.indexService(shard.index()); IndexShard indexShard = indexService.getShardOrNull(shard.id()); assertEquals(indexShard.shardPath().getRootDataPath().toString(), dataPath); + + assertTrue(info.getReservedSpace(nodeId, dataPath).containsShardId(shard.shardId())); } } @@ -232,6 +235,7 @@ public void testClusterInfoServiceInformationClearOnError() { assertThat(info.getNodeLeastAvailableDiskUsages().size(), equalTo(0)); assertThat(info.getNodeMostAvailableDiskUsages().size(), equalTo(0)); assertThat(info.shardSizes.size(), equalTo(0)); + assertThat(info.reservedSpace.size(), equalTo(0)); // check we recover blockingActionFilter.blockActions(); @@ -242,5 +246,10 @@ public void testClusterInfoServiceInformationClearOnError() { assertThat(info.getNodeMostAvailableDiskUsages().size(), equalTo(2)); assertThat(info.shardSizes.size(), greaterThan(0)); + RoutingTable routingTable = client().admin().cluster().prepareState().clear().setRoutingTable(true).get().getState().routingTable(); + for (ShardRouting shard : routingTable.allShards()) { + assertTrue(info.getReservedSpace(shard.currentNodeId(), info.getDataPath(shard)).containsShardId(shard.shardId())); + } + } } diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 34551bd89551f..40946ad020b22 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -83,7 +83,9 @@ import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.store.Store; +import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.indices.IndicesService; +import org.elasticsearch.indices.NodeIndicesStats; import org.elasticsearch.indices.analysis.AnalysisModule; import org.elasticsearch.indices.flush.SyncedFlushUtil; import org.elasticsearch.indices.recovery.RecoveryState.Stage; @@ -146,6 +148,7 @@ import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasSize; @@ -1834,4 +1837,59 @@ public void testCancelRecoveryWithAutoExpandReplicas() throws Exception { } }); } + + public void testReservesBytesDuringPeerRecoveryPhaseOne() throws Exception { + internalCluster().startNode(); + List dataNodes = internalCluster().startDataOnlyNodes(2); + String indexName = "test-index"; + createIndex(indexName, Settings.builder() + .put("index.number_of_shards", 1).put("index.number_of_replicas", 0) + .put("index.routing.allocation.include._name", String.join(",", dataNodes)).build()); + ensureGreen(indexName); + final List indexRequests = IntStream.range(0, between(10, 500)) + .mapToObj(n -> client().prepareIndex(indexName).setSource("foo", "bar")) + .collect(Collectors.toList()); + indexRandom(randomBoolean(), true, true, indexRequests); + assertThat(client().admin().indices().prepareFlush(indexName).get().getFailedShards(), equalTo(0)); + + ClusterState clusterState = client().admin().cluster().prepareState().get().getState(); + DiscoveryNode nodeWithPrimary = clusterState.nodes().get(clusterState.routingTable() + .index(indexName).shard(0).primaryShard().currentNodeId()); + MockTransportService transportService = (MockTransportService) internalCluster() + .getInstance(TransportService.class, nodeWithPrimary.getName()); + + final AtomicBoolean fileInfoIntercepted = new AtomicBoolean(); + final AtomicBoolean fileChunkIntercepted = new AtomicBoolean(); + transportService.addSendBehavior((connection, requestId, action, request, options) -> { + if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO)) { + if (fileInfoIntercepted.compareAndSet(false, true)) { + final NodeIndicesStats nodeIndicesStats = client().admin().cluster().prepareNodesStats(connection.getNode().getId()) + .clear().setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Store)).get().getNodes().get(0).getIndices(); + assertThat(nodeIndicesStats.getStore().getReservedSize().getBytes(), equalTo(0L)); + assertThat(nodeIndicesStats.getShardStats(clusterState.metadata().index(indexName).getIndex()) + .stream().flatMap(s -> Arrays.stream(s.getShards())).map(s -> s.getStats().getStore().getReservedSize().getBytes()) + .collect(Collectors.toList()), + everyItem(equalTo(StoreStats.UNKNOWN_RESERVED_BYTES))); + } + } else if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) { + if (fileChunkIntercepted.compareAndSet(false, true)) { + assertThat(client().admin().cluster().prepareNodesStats(connection.getNode().getId()).clear() + .setIndices(new CommonStatsFlags(CommonStatsFlags.Flag.Store)).get().getNodes().get(0) + .getIndices().getStore().getReservedSize().getBytes(), + greaterThan(0L)); + } + } + connection.sendRequest(requestId, action, request, options); + }); + + assertAcked(client().admin().indices().prepareUpdateSettings(indexName) + .setSettings(Settings.builder().put("index.number_of_replicas", 1))); + ensureGreen(); + assertTrue(fileInfoIntercepted.get()); + assertTrue(fileChunkIntercepted.get()); + + assertThat(client().admin().cluster().prepareNodesStats().get().getNodes().stream() + .mapToLong(n -> n.getIndices().getStore().getReservedSize().getBytes()).sum(), equalTo(0L)); + } + } diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index d004e34d06efd..8444425fce07a 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -19,6 +19,8 @@ package org.elasticsearch.cluster; +import com.carrotsearch.hppc.ObjectHashSet; +import com.carrotsearch.hppc.cursors.ObjectCursor; import com.carrotsearch.hppc.cursors.ObjectObjectCursor; import org.elasticsearch.cluster.routing.ShardRouting; @@ -29,9 +31,13 @@ import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.xcontent.ToXContentFragment; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.StoreStats; import java.io.IOException; import java.util.Map; +import java.util.Objects; + /** * ClusterInfo is an object representing a map of nodes to {@link DiskUsage} * and a map of shard ids to shard sizes, see @@ -44,9 +50,10 @@ public class ClusterInfo implements ToXContentFragment, Writeable { final ImmutableOpenMap shardSizes; public static final ClusterInfo EMPTY = new ClusterInfo(); final ImmutableOpenMap routingToDataPath; + final ImmutableOpenMap reservedSpace; protected ClusterInfo() { - this(ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of()); + this(ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of()); } /** @@ -56,15 +63,18 @@ protected ClusterInfo() { * @param mostAvailableSpaceUsage a node id to disk usage mapping for the path that has the most available space on the node. * @param shardSizes a shardkey to size in bytes mapping per shard. * @param routingToDataPath the shard routing to datapath mapping + * @param reservedSpace reserved space per shard broken down by node and data path * @see #shardIdentifierFromRouting */ public ClusterInfo(ImmutableOpenMap leastAvailableSpaceUsage, - ImmutableOpenMap mostAvailableSpaceUsage, ImmutableOpenMap shardSizes, - ImmutableOpenMap routingToDataPath) { + ImmutableOpenMap mostAvailableSpaceUsage, ImmutableOpenMap shardSizes, + ImmutableOpenMap routingToDataPath, + ImmutableOpenMap reservedSpace) { this.leastAvailableSpaceUsage = leastAvailableSpaceUsage; this.shardSizes = shardSizes; this.mostAvailableSpaceUsage = mostAvailableSpaceUsage; this.routingToDataPath = routingToDataPath; + this.reservedSpace = reservedSpace; } public ClusterInfo(StreamInput in) throws IOException { @@ -72,6 +82,12 @@ public ClusterInfo(StreamInput in) throws IOException { Map mostMap = in.readMap(StreamInput::readString, DiskUsage::new); Map sizeMap = in.readMap(StreamInput::readString, StreamInput::readLong); Map routingMap = in.readMap(ShardRouting::new, StreamInput::readString); + Map reservedSpaceMap; + if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { + reservedSpaceMap = in.readMap(NodeAndPath::new, ReservedSpace::new); + } else { + reservedSpaceMap = Map.of(); + } ImmutableOpenMap.Builder leastBuilder = ImmutableOpenMap.builder(); this.leastAvailableSpaceUsage = leastBuilder.putAll(leastMap).build(); @@ -81,6 +97,8 @@ public ClusterInfo(StreamInput in) throws IOException { this.shardSizes = sizeBuilder.putAll(sizeMap).build(); ImmutableOpenMap.Builder routingBuilder = ImmutableOpenMap.builder(); this.routingToDataPath = routingBuilder.putAll(routingMap).build(); + ImmutableOpenMap.Builder reservedSpaceBuilder = ImmutableOpenMap.builder(); + this.reservedSpace = reservedSpaceBuilder.putAll(reservedSpaceMap).build(); } @Override @@ -109,6 +127,14 @@ public void writeTo(StreamOutput out) throws IOException { c.key.writeTo(out); out.writeString(c.value); } + + if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { + out.writeVInt(this.reservedSpace.size()); + for (ObjectObjectCursor c : this.reservedSpace) { + c.key.writeTo(out); + c.value.writeTo(out); + } + } } public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { @@ -144,11 +170,23 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws } } builder.endObject(); // end "shard_paths" + builder.startArray("reserved_sizes"); { + for (ObjectObjectCursor c : this.reservedSpace) { + builder.startObject(); { + builder.field("node_id", c.key.nodeId); + builder.field("path", c.key.path); + c.value.toXContent(builder, params); + } + builder.endObject(); // NodeAndPath + } + } + builder.endArray(); // end "reserved_sizes" return builder; } /** * Returns a node id to disk usage mapping for the path that has the least available space on the node. + * Note that this does not take account of reserved space: there may be another path with less available _and unreserved_ space. */ public ImmutableOpenMap getNodeLeastAvailableDiskUsages() { return this.leastAvailableSpaceUsage; @@ -156,6 +194,7 @@ public ImmutableOpenMap getNodeLeastAvailableDiskUsages() { /** * Returns a node id to disk usage mapping for the path that has the most available space on the node. + * Note that this does not take account of reserved space: there may be another path with more available _and unreserved_ space. */ public ImmutableOpenMap getNodeMostAvailableDiskUsages() { return this.mostAvailableSpaceUsage; @@ -183,6 +222,14 @@ public long getShardSize(ShardRouting shardRouting, long defaultValue) { return shardSize == null ? defaultValue : shardSize; } + /** + * Returns the reserved space for each shard on the given node/path pair + */ + public ReservedSpace getReservedSpace(String nodeId, String dataPath) { + final ReservedSpace result = reservedSpace.get(new NodeAndPath(nodeId, dataPath)); + return result == null ? ReservedSpace.EMPTY : result; + } + /** * Method that incorporates the ShardId for the shard into a string that * includes a 'p' or 'r' depending on whether the shard is a primary. @@ -190,4 +237,126 @@ public long getShardSize(ShardRouting shardRouting, long defaultValue) { static String shardIdentifierFromRouting(ShardRouting shardRouting) { return shardRouting.shardId().toString() + "[" + (shardRouting.primary() ? "p" : "r") + "]"; } + + /** + * Represents a data path on a node + */ + public static class NodeAndPath { + public final String nodeId; + public final String path; + + public NodeAndPath(String nodeId, String path) { + this.nodeId = Objects.requireNonNull(nodeId); + this.path = Objects.requireNonNull(path); + } + + public NodeAndPath(StreamInput in) throws IOException { + this.nodeId = in.readString(); + this.path = in.readString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + NodeAndPath that = (NodeAndPath) o; + return nodeId.equals(that.nodeId) && path.equals(that.path); + } + + @Override + public int hashCode() { + return Objects.hash(nodeId, path); + } + + public void writeTo(StreamOutput out) throws IOException { + out.writeString(nodeId); + out.writeString(path); + } + } + + /** + * Represents the total amount of "reserved" space on a particular data path, together with the set of shards considered. + */ + public static class ReservedSpace { + + public static final ReservedSpace EMPTY = new ReservedSpace(0, new ObjectHashSet<>()); + + private final long total; + private final ObjectHashSet shardIds; + + private ReservedSpace(long total, ObjectHashSet shardIds) { + this.total = total; + this.shardIds = shardIds; + } + + ReservedSpace(StreamInput in) throws IOException { + total = in.readVLong(); + final int shardIdCount = in.readVInt(); + shardIds = new ObjectHashSet<>(shardIdCount); + for (int i = 0; i < shardIdCount; i++) { + shardIds.add(new ShardId(in)); + } + } + + void writeTo(StreamOutput out) throws IOException { + out.writeVLong(total); + out.writeVInt(shardIds.size()); + for (ObjectCursor shardIdCursor : shardIds) { + shardIdCursor.value.writeTo(out); + } + } + + public long getTotal() { + return total; + } + + public boolean containsShardId(ShardId shardId) { + return shardIds.contains(shardId); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReservedSpace that = (ReservedSpace) o; + return total == that.total && + shardIds.equals(that.shardIds); + } + + @Override + public int hashCode() { + return Objects.hash(total, shardIds); + } + + void toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field("total", total); + builder.startArray("shards"); { + for (ObjectCursor shardIdCursor : shardIds) { + shardIdCursor.value.toXContent(builder, params); + } + } + builder.endArray(); // end "shards" + } + + public static class Builder { + private long total; + private ObjectHashSet shardIds = new ObjectHashSet<>(); + + public ReservedSpace build() { + assert shardIds != null : "already built"; + final ReservedSpace reservedSpace = new ReservedSpace(total, shardIds); + shardIds = null; + return reservedSpace; + } + + public Builder add(ShardId shardId, long reservedBytes) { + assert shardIds != null : "already built"; + assert reservedBytes >= 0 : reservedBytes; + shardIds.add(shardId); + total += reservedBytes; + return this; + } + } + } + } diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index de25666178b46..47cb2f051b38c 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -44,11 +44,14 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException; +import org.elasticsearch.index.store.StoreStats; import org.elasticsearch.monitor.fs.FsInfo; import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.ReceiveTimeoutTransportException; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -80,8 +83,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode private volatile ImmutableOpenMap leastAvailableSpaceUsages; private volatile ImmutableOpenMap mostAvailableSpaceUsages; - private volatile ImmutableOpenMap shardRoutingToDataPath; - private volatile ImmutableOpenMap shardSizes; + private volatile IndicesStatsSummary indicesStatsSummary; private volatile boolean isMaster = false; private volatile boolean enabled; private volatile TimeValue fetchTimeout; @@ -93,8 +95,7 @@ public class InternalClusterInfoService implements ClusterInfoService, LocalNode public InternalClusterInfoService(Settings settings, ClusterService clusterService, ThreadPool threadPool, NodeClient client) { this.leastAvailableSpaceUsages = ImmutableOpenMap.of(); this.mostAvailableSpaceUsages = ImmutableOpenMap.of(); - this.shardRoutingToDataPath = ImmutableOpenMap.of(); - this.shardSizes = ImmutableOpenMap.of(); + this.indicesStatsSummary = IndicesStatsSummary.EMPTY; this.clusterService = clusterService; this.threadPool = threadPool; this.client = client; @@ -200,7 +201,9 @@ public void clusterChanged(ClusterChangedEvent event) { @Override public ClusterInfo getClusterInfo() { - return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, shardSizes, shardRoutingToDataPath); + final IndicesStatsSummary indicesStatsSummary = this.indicesStatsSummary; // single volatile read + return new ClusterInfo(leastAvailableSpaceUsages, mostAvailableSpaceUsages, + indicesStatsSummary.shardSizes, indicesStatsSummary.shardRoutingToDataPath, indicesStatsSummary.reservedSpace); } /** @@ -318,15 +321,22 @@ public void onFailure(Exception e) { } }); - final CountDownLatch indicesLatch = updateIndicesStats(new ActionListener() { + final CountDownLatch indicesLatch = updateIndicesStats(new ActionListener<>() { @Override public void onResponse(IndicesStatsResponse indicesStatsResponse) { - ShardStats[] stats = indicesStatsResponse.getShards(); - ImmutableOpenMap.Builder newShardSizes = ImmutableOpenMap.builder(); - ImmutableOpenMap.Builder newShardRoutingToDataPath = ImmutableOpenMap.builder(); - buildShardLevelInfo(logger, stats, newShardSizes, newShardRoutingToDataPath); - shardSizes = newShardSizes.build(); - shardRoutingToDataPath = newShardRoutingToDataPath.build(); + final ShardStats[] stats = indicesStatsResponse.getShards(); + final ImmutableOpenMap.Builder shardSizeByIdentifierBuilder = ImmutableOpenMap.builder(); + final ImmutableOpenMap.Builder dataPathByShardRoutingBuilder = ImmutableOpenMap.builder(); + final Map reservedSpaceBuilders = new HashMap<>(); + buildShardLevelInfo(logger, stats, shardSizeByIdentifierBuilder, dataPathByShardRoutingBuilder, reservedSpaceBuilders); + + final ImmutableOpenMap.Builder rsrvdSpace = ImmutableOpenMap.builder(); + reservedSpaceBuilders.forEach((nodeAndPath, builder) -> rsrvdSpace.put(nodeAndPath, builder.build())); + + indicesStatsSummary = new IndicesStatsSummary( + shardSizeByIdentifierBuilder.build(), + dataPathByShardRoutingBuilder.build(), + rsrvdSpace.build()); } @Override @@ -342,8 +352,7 @@ public void onFailure(Exception e) { logger.warn("Failed to execute IndicesStatsAction for ClusterInfoUpdateJob", e); } // we empty the usages list, to be safe - we don't know what's going on. - shardSizes = ImmutableOpenMap.of(); - shardRoutingToDataPath = ImmutableOpenMap.of(); + indicesStatsSummary = IndicesStatsSummary.EMPTY; } } }); @@ -383,16 +392,27 @@ public void addListener(Consumer clusterInfoConsumer) { listeners.add(clusterInfoConsumer); } - static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder newShardSizes, - ImmutableOpenMap.Builder newShardRoutingToDataPath) { + static void buildShardLevelInfo(Logger logger, ShardStats[] stats, ImmutableOpenMap.Builder shardSizes, + ImmutableOpenMap.Builder newShardRoutingToDataPath, + Map reservedSpaceByShard) { for (ShardStats s : stats) { - newShardRoutingToDataPath.put(s.getShardRouting(), s.getDataPath()); - long size = s.getStats().getStore().sizeInBytes(); - String sid = ClusterInfo.shardIdentifierFromRouting(s.getShardRouting()); - if (logger.isTraceEnabled()) { - logger.trace("shard: {} size: {}", sid, size); + final ShardRouting shardRouting = s.getShardRouting(); + newShardRoutingToDataPath.put(shardRouting, s.getDataPath()); + + final StoreStats storeStats = s.getStats().getStore(); + final long size = storeStats.sizeInBytes(); + final long reserved = storeStats.getReservedSize().getBytes(); + + final String shardIdentifier = ClusterInfo.shardIdentifierFromRouting(shardRouting); + logger.trace("shard: {} size: {} reserved: {}", shardIdentifier, size, reserved); + shardSizes.put(shardIdentifier, size); + + if (reserved != StoreStats.UNKNOWN_RESERVED_BYTES) { + final ClusterInfo.ReservedSpace.Builder reservedSpaceBuilder = reservedSpaceByShard.computeIfAbsent( + new ClusterInfo.NodeAndPath(shardRouting.currentNodeId(), s.getDataPath()), + t -> new ClusterInfo.ReservedSpace.Builder()); + reservedSpaceBuilder.add(shardRouting.shardId(), reserved); } - newShardSizes.put(sid, size); } } @@ -446,5 +466,21 @@ static void fillDiskUsagePerNode(Logger logger, List nodeStatsArray, } } + private static class IndicesStatsSummary { + static final IndicesStatsSummary EMPTY + = new IndicesStatsSummary(ImmutableOpenMap.of(), ImmutableOpenMap.of(), ImmutableOpenMap.of()); + + final ImmutableOpenMap shardSizes; + final ImmutableOpenMap shardRoutingToDataPath; + final ImmutableOpenMap reservedSpace; + + IndicesStatsSummary(ImmutableOpenMap shardSizes, + ImmutableOpenMap shardRoutingToDataPath, + ImmutableOpenMap reservedSpace) { + this.shardSizes = shardSizes; + this.shardRoutingToDataPath = shardRoutingToDataPath; + this.reservedSpace = reservedSpace; + } + } } diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java index aa880af74249a..239725ec1a4be 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitor.java @@ -166,11 +166,11 @@ public void onNewInfo(ClusterInfo info) { logger.warn("flood stage disk watermark [{}] exceeded on {}, all indices on this node will be marked read-only", diskThresholdSettings.describeFloodStageThreshold(), usage); - } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || - usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + continue; + } - nodesOverLowThreshold.add(node); - nodesOverHighThreshold.add(node); + if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || + usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { if (routingNode != null) { // might be temporarily null if the ClusterInfoService and the ClusterService are out of step for (ShardRouting routing : routingNode) { @@ -178,6 +178,18 @@ public void onNewInfo(ClusterInfo info) { indicesNotToAutoRelease.add(indexName); } } + } + + final long reservedSpace = info.getReservedSpace(usage.getNodeId(), usage.getPath()).getTotal(); + final DiskUsage usageWithReservedSpace = new DiskUsage(usage.getNodeId(), usage.getNodeName(), usage.getPath(), + usage.getTotalBytes(), Math.max(0L, usage.getFreeBytes() - reservedSpace)); + + if (usageWithReservedSpace.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdHigh().getBytes() || + usageWithReservedSpace.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdHigh()) { + + nodesOverLowThreshold.add(node); + nodesOverHighThreshold.add(node); + if (lastRunTimeMillis.get() < currentTimeMillis - diskThresholdSettings.getRerouteInterval().millis()) { reroute = true; explanation = "high disk watermark exceeded on one or more nodes"; @@ -189,8 +201,8 @@ public void onNewInfo(ClusterInfo info) { node, diskThresholdSettings.getRerouteInterval()); } - } else if (usage.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() || - usage.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { + } else if (usageWithReservedSpace.getFreeBytes() < diskThresholdSettings.getFreeBytesThresholdLow().getBytes() || + usageWithReservedSpace.getFreeDiskAsPercentage() < diskThresholdSettings.getFreeDiskThresholdLow()) { nodesOverHighThresholdAndRelocating.remove(node); diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java index 8cd147508771c..6a36cacf2e212 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDecider.java @@ -44,6 +44,7 @@ import org.elasticsearch.index.Index; import org.elasticsearch.index.shard.ShardId; +import java.util.List; import java.util.Set; import static org.elasticsearch.cluster.routing.allocation.DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_HIGH_DISK_WATERMARK_SETTING; @@ -98,9 +99,16 @@ public DiskThresholdDecider(Settings settings, ClusterSettings clusterSettings) */ public static long sizeOfRelocatingShards(RoutingNode node, boolean subtractShardsMovingAway, String dataPath, ClusterInfo clusterInfo, Metadata metadata, RoutingTable routingTable) { - long totalSize = 0L; - - for (ShardRouting routing : node.shardsWithState(ShardRoutingState.INITIALIZING)) { + // Account for reserved space wherever it is available + final ClusterInfo.ReservedSpace reservedSpace = clusterInfo.getReservedSpace(node.nodeId(), dataPath); + long totalSize = reservedSpace.getTotal(); + // NB this counts all shards on the node when the ClusterInfoService retrieved the node stats, which may include shards that are + // no longer initializing because their recovery failed or was cancelled. + + // Where reserved space is unavailable (e.g. stats are out-of-sync) compute a conservative estimate for initialising shards + final List initializingShards = node.shardsWithState(ShardRoutingState.INITIALIZING); + initializingShards.removeIf(shardRouting -> reservedSpace.containsShardId(shardRouting.shardId())); + for (ShardRouting routing : initializingShards) { if (routing.relocatingNodeId() == null) { // in practice the only initializing-but-not-relocating shards with a nonzero expected shard size will be ones created // by a resize (shrink/split/clone) operation which we expect to happen using hard links, so they shouldn't be taking diff --git a/server/src/main/java/org/elasticsearch/index/IndexService.java b/server/src/main/java/org/elasticsearch/index/IndexService.java index 1546afcfa7851..d9fb875528803 100644 --- a/server/src/main/java/org/elasticsearch/index/IndexService.java +++ b/server/src/main/java/org/elasticsearch/index/IndexService.java @@ -372,7 +372,7 @@ private long getAvgShardSizeInBytes() throws IOException { long sum = 0; int count = 0; for (IndexShard indexShard : this) { - sum += indexShard.store().stats().sizeInBytes(); + sum += indexShard.store().stats(0L).sizeInBytes(); count++; } if (count == 0) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 490fa2034969f..23d42f39f63d7 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -236,7 +236,7 @@ Runnable getGlobalCheckpointSyncer() { private final RetentionLeaseSyncer retentionLeaseSyncer; @Nullable - private RecoveryState recoveryState; + private volatile RecoveryState recoveryState; private final RecoveryStats recoveryStats = new RecoveryStats(); private final MeanMetric refreshMetric = new MeanMetric(); @@ -1033,7 +1033,9 @@ public GetStats getStats() { public StoreStats storeStats() { try { - return store.stats(); + final RecoveryState recoveryState = this.recoveryState; + final long bytesStillToRecover = recoveryState == null ? -1L : recoveryState.getIndex().bytesStillToRecover(); + return store.stats(bytesStillToRecover == -1 ? StoreStats.UNKNOWN_RESERVED_BYTES : bytesStillToRecover); } catch (IOException e) { failShard("Failing shard because of exception during storeStats", e); throw new ElasticsearchException("io exception while building 'store stats'", e); diff --git a/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java b/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java index cec6f0a7617fe..48b47a64330cc 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java +++ b/server/src/main/java/org/elasticsearch/index/shard/ShardPath.java @@ -266,7 +266,7 @@ static NodeEnvironment.NodePath getPathWithMostFreeSpace(NodeEnvironment env) th long maxUsableBytes = Long.MIN_VALUE; for (NodeEnvironment.NodePath nodePath : paths) { FileStore fileStore = nodePath.fileStore; - long usableBytes = fileStore.getUsableSpace(); + long usableBytes = fileStore.getUsableSpace(); // NB usable bytes doesn't account for reserved space (e.g. incoming recoveries) assert usableBytes >= 0 : "usable bytes must be >= 0, got: " + usableBytes; if (bestPath == null || usableBytes > maxUsableBytes) { diff --git a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java index 5d97f623cd5ff..5eb2c587b3c42 100644 --- a/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java +++ b/server/src/main/java/org/elasticsearch/index/shard/StoreRecovery.java @@ -175,6 +175,7 @@ void addIndices(final RecoveryState.Index indexRecoveryStats, final Directory ta try (IndexWriter writer = new IndexWriter(new StatsDirectoryWrapper(hardLinkOrCopyTarget, indexRecoveryStats), iwc)) { writer.addIndexes(sources); + indexRecoveryStats.setFileDetailsComplete(); if (split) { writer.deleteDocuments(new ShardSplittingQuery(indexMetadata, shardId, hasNested)); } @@ -414,14 +415,15 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe writeEmptyRetentionLeasesFile(indexShard); } // since we recover from local, just fill the files and size + final RecoveryState.Index index = recoveryState.getIndex(); try { - final RecoveryState.Index index = recoveryState.getIndex(); if (si != null) { addRecoveredFileDetails(si, store, index); } } catch (IOException e) { logger.debug("failed to list file details", e); } + index.setFileDetailsComplete(); } else { store.createEmpty(indexShard.indexSettings().getIndexVersionCreated().luceneVersion); final String translogUUID = Translog.createEmptyTranslog( @@ -429,6 +431,7 @@ private void internalRecoverFromStore(IndexShard indexShard) throws IndexShardRe indexShard.getPendingPrimaryTerm()); store.associateIndexWithNewTranslog(translogUUID); writeEmptyRetentionLeasesFile(indexShard); + indexShard.recoveryState().getIndex().setFileDetailsComplete(); } indexShard.openEngineAndRecoverFromTranslog(); indexShard.getEngine().fillSeqNoGaps(indexShard.getPendingPrimaryTerm()); diff --git a/server/src/main/java/org/elasticsearch/index/store/Store.java b/server/src/main/java/org/elasticsearch/index/store/Store.java index b6e7dae375d75..79a4e440ee0ca 100644 --- a/server/src/main/java/org/elasticsearch/index/store/Store.java +++ b/server/src/main/java/org/elasticsearch/index/store/Store.java @@ -355,9 +355,12 @@ public CheckIndex.Status checkIndex(PrintStream out) throws IOException { } } - public StoreStats stats() throws IOException { + /** + * @param reservedBytes a prediction of how much larger the store is expected to grow, or {@link StoreStats#UNKNOWN_RESERVED_BYTES}. + */ + public StoreStats stats(long reservedBytes) throws IOException { ensureOpen(); - return new StoreStats(directory.estimateSize()); + return new StoreStats(directory.estimateSize(), reservedBytes); } /** diff --git a/server/src/main/java/org/elasticsearch/index/store/StoreStats.java b/server/src/main/java/org/elasticsearch/index/store/StoreStats.java index 0f5998f746c06..e1ab5b526f018 100644 --- a/server/src/main/java/org/elasticsearch/index/store/StoreStats.java +++ b/server/src/main/java/org/elasticsearch/index/store/StoreStats.java @@ -31,7 +31,16 @@ public class StoreStats implements Writeable, ToXContentFragment { + /** + * Sentinel value for cases where the shard does not yet know its reserved size so we must fall back to an estimate, for instance + * prior to receiving the list of files in a peer recovery. + */ + public static final long UNKNOWN_RESERVED_BYTES = -1L; + + public static final Version RESERVED_BYTES_VERSION = Version.V_7_9_0; + private long sizeInBytes; + private long reservedSize; public StoreStats() { @@ -42,10 +51,21 @@ public StoreStats(StreamInput in) throws IOException { if (in.getVersion().before(Version.V_6_0_0_alpha1)) { in.readVLong(); // throttleTimeInNanos } + if (in.getVersion().onOrAfter(RESERVED_BYTES_VERSION)) { + reservedSize = in.readZLong(); + } else { + reservedSize = UNKNOWN_RESERVED_BYTES; + } } - public StoreStats(long sizeInBytes) { + /** + * @param sizeInBytes the size of the store in bytes + * @param reservedSize a prediction of how much larger the store is expected to grow, or {@link StoreStats#UNKNOWN_RESERVED_BYTES}. + */ + public StoreStats(long sizeInBytes, long reservedSize) { + assert reservedSize == UNKNOWN_RESERVED_BYTES || reservedSize >= 0 : reservedSize; this.sizeInBytes = sizeInBytes; + this.reservedSize = reservedSize; } public void add(StoreStats stats) { @@ -53,8 +73,12 @@ public void add(StoreStats stats) { return; } sizeInBytes += stats.sizeInBytes; + reservedSize = ignoreIfUnknown(reservedSize) + ignoreIfUnknown(stats.reservedSize); } + private static long ignoreIfUnknown(long reservedSize) { + return reservedSize == UNKNOWN_RESERVED_BYTES ? 0L : reservedSize; + } public long sizeInBytes() { return sizeInBytes; @@ -72,18 +96,31 @@ public ByteSizeValue getSize() { return size(); } + /** + * A prediction of how much larger this store will eventually grow. For instance, if we are currently doing a peer recovery or restoring + * a snapshot into this store then we can account for the rest of the recovery using this field. A value of {@code -1B} indicates that + * the reserved size is unknown. + */ + public ByteSizeValue getReservedSize() { + return new ByteSizeValue(reservedSize); + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(sizeInBytes); if (out.getVersion().before(Version.V_6_0_0_alpha1)) { out.writeVLong(0L); // throttleTimeInNanos } + if (out.getVersion().onOrAfter(RESERVED_BYTES_VERSION)) { + out.writeZLong(reservedSize); + } } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.STORE); builder.humanReadableField(Fields.SIZE_IN_BYTES, Fields.SIZE, size()); + builder.humanReadableField(Fields.RESERVED_IN_BYTES, Fields.RESERVED, getReservedSize()); builder.endObject(); return builder; } @@ -92,5 +129,7 @@ static final class Fields { static final String STORE = "store"; static final String SIZE = "size"; static final String SIZE_IN_BYTES = "size_in_bytes"; + static final String RESERVED = "reserved"; + static final String RESERVED_IN_BYTES = "reserved_in_bytes"; } } diff --git a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java index aa9e880180327..9e47d78d65550 100644 --- a/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java +++ b/server/src/main/java/org/elasticsearch/indices/NodeIndicesStats.java @@ -247,6 +247,14 @@ private Map createStatsByIndex() { return statsMap; } + public List getShardStats(Index index) { + if (statsByShard == null) { + return null; + } else { + return statsByShard.get(index); + } + } + static final class Fields { static final String INDICES = "indices"; } diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 002c22442a599..5bef3ddd006d5 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -203,6 +203,7 @@ public synchronized RecoveryState setStage(Stage stage) { getTranslog().start(); break; case FINALIZE: + assert getIndex().bytesStillToRecover() >= 0 : "moving to stage FINALIZE without completing file details"; validateAndSetStage(Stage.TRANSLOG, stage); getTranslog().stop(); break; @@ -693,6 +694,7 @@ public String toString() { public static class Index extends Timer implements ToXContentFragment, Writeable { private final Map fileDetails = new HashMap<>(); + private boolean fileDetailsComplete; public static final long UNKNOWN = -1L; @@ -709,6 +711,15 @@ public Index(StreamInput in) throws IOException { File file = new File(in); fileDetails.put(file.name, file); } + if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + fileDetailsComplete = in.readBoolean(); + } else { + // This flag is used by disk-based allocation to decide whether the remaining bytes measurement is accurate or not; if not + // then it falls back on an estimate. There's only a very short window in which the file details are present but incomplete + // so this is a reasonable approximation, and the stats reported to the disk-based allocator don't hit this code path + // anyway since they always use IndexShard#getRecoveryState which is never transported over the wire. + fileDetailsComplete = fileDetails.isEmpty() == false; + } sourceThrottlingInNanos = in.readLong(); targetThrottleTimeInNanos = in.readLong(); } @@ -721,6 +732,9 @@ public synchronized void writeTo(StreamOutput out) throws IOException { for (File file : files) { file.writeTo(out); } + if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + out.writeBoolean(fileDetailsComplete); + } out.writeLong(sourceThrottlingInNanos); out.writeLong(targetThrottleTimeInNanos); } @@ -732,16 +746,22 @@ public synchronized List fileDetails() { public synchronized void reset() { super.reset(); fileDetails.clear(); + fileDetailsComplete = false; sourceThrottlingInNanos = UNKNOWN; targetThrottleTimeInNanos = UNKNOWN; } public synchronized void addFileDetail(String name, long length, boolean reused) { + assert fileDetailsComplete == false : "addFileDetail for [" + name + "] when file details are already complete"; File file = new File(name, length, reused); File existing = fileDetails.put(name, file); assert existing == null : "file [" + name + "] is already reported"; } + public synchronized void setFileDetailsComplete() { + fileDetailsComplete = true; + } + public synchronized void addRecoveredBytesToFile(String name, long bytes) { File file = fileDetails.get(name); file.addRecoveredBytes(bytes); @@ -865,6 +885,23 @@ public synchronized long totalRecoverBytes() { return total; } + /** + * @return number of bytes still to recover, i.e. {@link Index#totalRecoverBytes()} minus {@link Index#recoveredBytes()}, or + * {@code -1} if the full set of files to recover is not yet known + */ + public synchronized long bytesStillToRecover() { + if (fileDetailsComplete == false) { + return -1L; + } + long total = 0L; + for (File file : fileDetails.values()) { + if (file.reused() == false) { + total += file.length() - file.recovered(); + } + } + return total; + } + /** * percent of bytes recovered out of total files bytes *to be* recovered */ diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java index 150fcaeeea77c..89059c8f734f6 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryTarget.java @@ -288,6 +288,7 @@ private void ensureRefCount() { @Override public void prepareForTranslogOperations(int totalTranslogOps, ActionListener listener) { ActionListener.completeWith(listener, () -> { + state().getIndex().setFileDetailsComplete(); // ops-based recoveries don't send the file details state().getTranslog().totalOperations(totalTranslogOps); indexShard().openEngineAndSkipTranslogRecovery(); return null; @@ -403,6 +404,7 @@ public void receiveFileInfo(List phase1FileNames, for (int i = 0; i < phase1FileNames.size(); i++) { index.addFileDetail(phase1FileNames.get(i), phase1FileSizes.get(i), false); } + index.setFileDetailsComplete(); state().getTranslog().totalOperations(totalTranslogOps); state().getTranslog().totalOperationsOnStart(totalTranslogOps); return null; diff --git a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java index 1255071bb9908..4faea615a38ef 100644 --- a/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java +++ b/server/src/main/java/org/elasticsearch/repositories/blobstore/FileRestoreContext.java @@ -129,6 +129,8 @@ public void restore(SnapshotFiles snapshotFiles, Store store, ActionListener randomDiskUsage() { @@ -78,4 +79,19 @@ private static ImmutableOpenMap randomRoutingToDataPath() return builder.build(); } + private static ImmutableOpenMap randomReservedSpace() { + int numEntries = randomIntBetween(0, 128); + ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(numEntries); + for (int i = 0; i < numEntries; i++) { + final ClusterInfo.NodeAndPath key = new ClusterInfo.NodeAndPath(randomAlphaOfLength(10), randomAlphaOfLength(10)); + final ClusterInfo.ReservedSpace.Builder valueBuilder = new ClusterInfo.ReservedSpace.Builder(); + for (int j = between(0,10); j > 0; j--) { + ShardId shardId = new ShardId(randomAlphaOfLength(32), randomAlphaOfLength(32), randomIntBetween(0, Integer.MAX_VALUE)); + valueBuilder.add(shardId, between(0, Integer.MAX_VALUE)); + } + builder.put(key, valueBuilder.build()); + } + return builder.build(); + } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java index a03337e41dbe6..a5674994b61b0 100644 --- a/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/DiskUsageTests.java @@ -38,6 +38,7 @@ import java.nio.file.Path; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import static java.util.Collections.emptyMap; @@ -104,14 +105,14 @@ public void testFillShardLevelInfo() { test_0 = ShardRoutingHelper.moveToStarted(test_0); Path test0Path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve("0"); CommonStats commonStats0 = new CommonStats(); - commonStats0.store = new StoreStats(100); + commonStats0.store = new StoreStats(100, 0L); ShardRouting test_1 = ShardRouting.newUnassigned(new ShardId(index, 1), false, PeerRecoverySource.INSTANCE, new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "foo")); test_1 = ShardRoutingHelper.initialize(test_1, "node2"); test_1 = ShardRoutingHelper.moveToStarted(test_1); Path test1Path = createTempDir().resolve("indices").resolve(index.getUUID()).resolve("1"); CommonStats commonStats1 = new CommonStats(); - commonStats1.store = new StoreStats(1000); + commonStats1.store = new StoreStats(1000, 0L); ShardStats[] stats = new ShardStats[] { new ShardStats(test_0, new ShardPath(false, test0Path, test0Path, test_0.shardId()), commonStats0 , null, null, null), new ShardStats(test_1, new ShardPath(false, test1Path, test1Path, test_1.shardId()), commonStats1 , null, null, null) @@ -119,7 +120,7 @@ public void testFillShardLevelInfo() { ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder routingToPath = ImmutableOpenMap.builder(); ClusterState state = ClusterState.builder(new ClusterName("blarg")).version(0).build(); - InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath); + InternalClusterInfoService.buildShardLevelInfo(logger, stats, shardSizes, routingToPath, new HashMap<>()); assertEquals(2, shardSizes.size()); assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_0))); assertTrue(shardSizes.containsKey(ClusterInfo.shardIdentifierFromRouting(test_1))); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java index 9961cbefe05c7..3356ebee64aec 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/DiskThresholdMonitorTests.java @@ -41,6 +41,7 @@ import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.test.MockLogAppender; import org.elasticsearch.test.junit.annotations.TestLogging; @@ -100,7 +101,7 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4)); builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 30)); - monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + monitor.onNewInfo(clusterInfo(builder.build())); assertFalse(reroute.get()); assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get()); @@ -109,7 +110,7 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4)); builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5)); currentTime.addAndGet(randomLongBetween(60001, 120000)); - monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + monitor.onNewInfo(clusterInfo(builder.build())); assertTrue(reroute.get()); assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indices.get()); IndexMetadata indexMetadata = IndexMetadata.builder(clusterState.metadata().index("test_2")).settings(Settings.builder() @@ -145,7 +146,7 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi builder = ImmutableOpenMap.builder(); builder.put("node1", new DiskUsage("node1","node1", "/foo/bar", 100, 4)); builder.put("node2", new DiskUsage("node2","node2", "/foo/bar", 100, 5)); - monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + monitor.onNewInfo(clusterInfo(builder.build())); assertTrue(reroute.get()); assertEquals(Collections.singleton("test_1"), indices.get()); } @@ -181,12 +182,12 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi // should not reroute when all disks are ok currentTime.addAndGet(randomLongBetween(0, 120000)); - monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + monitor.onNewInfo(clusterInfo(allDisksOk)); assertNull(listenerReference.get()); // should reroute when one disk goes over the watermark currentTime.addAndGet(randomLongBetween(0, 120000)); - monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null)); + monitor.onNewInfo(clusterInfo(oneDiskAboveWatermark)); assertNotNull(listenerReference.get()); listenerReference.getAndSet(null).onResponse(clusterState); @@ -194,20 +195,20 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi // should not re-route again within the reroute interval currentTime.addAndGet(randomLongBetween(0, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis())); - monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + monitor.onNewInfo(clusterInfo(allDisksOk)); assertNull(listenerReference.get()); } // should reroute again when one disk is still over the watermark currentTime.addAndGet(randomLongBetween( DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000)); - monitor.onNewInfo(new ClusterInfo(oneDiskAboveWatermark, null, null, null)); + monitor.onNewInfo(clusterInfo(oneDiskAboveWatermark)); assertNotNull(listenerReference.get()); final ActionListener rerouteListener1 = listenerReference.getAndSet(null); // should not re-route again before reroute has completed currentTime.addAndGet(randomLongBetween(0, 120000)); - monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + monitor.onNewInfo(clusterInfo(allDisksOk)); assertNull(listenerReference.get()); // complete reroute @@ -217,21 +218,34 @@ protected void updateIndicesReadOnly(Set indicesToMarkReadOnly, ActionLi // should not re-route again within the reroute interval currentTime.addAndGet(randomLongBetween(0, DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis())); - monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + monitor.onNewInfo(clusterInfo(allDisksOk)); assertNull(listenerReference.get()); } // should reroute again after the reroute interval currentTime.addAndGet(randomLongBetween( DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000)); - monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + monitor.onNewInfo(clusterInfo(allDisksOk)); assertNotNull(listenerReference.get()); listenerReference.getAndSet(null).onResponse(null); // should not reroute again when it is not required currentTime.addAndGet(randomLongBetween(0, 120000)); - monitor.onNewInfo(new ClusterInfo(allDisksOk, null, null, null)); + monitor.onNewInfo(clusterInfo(allDisksOk)); assertNull(listenerReference.get()); + + // should reroute again when one disk has reserved space that pushes it over the high watermark + final ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(1); + builder.put(new ClusterInfo.NodeAndPath("node1", "/foo/bar"), + new ClusterInfo.ReservedSpace.Builder().add(new ShardId("baz", "quux", 0), between(41, 100)).build()); + final ImmutableOpenMap reservedSpaces = builder.build(); + + currentTime.addAndGet(randomLongBetween( + DiskThresholdSettings.CLUSTER_ROUTING_ALLOCATION_REROUTE_INTERVAL_SETTING.get(Settings.EMPTY).millis() + 1, 120000)); + monitor.onNewInfo(clusterInfo(allDisksOk, reservedSpaces)); + assertNotNull(listenerReference.get()); + listenerReference.getAndSet(null).onResponse(null); + } public void testAutoReleaseIndices() { @@ -253,6 +267,16 @@ public void testAutoReleaseIndices() { .nodes(DiscoveryNodes.builder().add(newNode("node1")).add(newNode("node2"))).build(), allocation); assertThat(clusterState.getRoutingTable().shardsWithState(ShardRoutingState.STARTED).size(), equalTo(8)); + final ImmutableOpenMap.Builder reservedSpacesBuilder + = ImmutableOpenMap.builder(); + final int reservedSpaceNode1 = between(0, 10); + reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node1", "/foo/bar"), + new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode1).build()); + final int reservedSpaceNode2 = between(0, 10); + reservedSpacesBuilder.put(new ClusterInfo.NodeAndPath("node2", "/foo/bar"), + new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), reservedSpaceNode2).build()); + ImmutableOpenMap reservedSpaces = reservedSpacesBuilder.build(); + DiskThresholdMonitor monitor = new DiskThresholdMonitor(Settings.EMPTY, () -> clusterState, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, () -> 0L, (reason, priority, listener) -> { @@ -275,10 +299,20 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener ImmutableOpenMap.Builder builder = ImmutableOpenMap.builder(); builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4))); builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4))); - monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); assertEquals(new HashSet<>(Arrays.asList("test_1", "test_2")), indicesToMarkReadOnly.get()); assertNull(indicesToRelease.get()); + // Reserved space is ignored when applying block + indicesToMarkReadOnly.set(null); + indicesToRelease.set(null); + builder = ImmutableOpenMap.builder(); + builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(5, 90))); + builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(5, 90))); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); + assertNull(indicesToMarkReadOnly.get()); + assertNull(indicesToRelease.get()); + // Change cluster state so that "test_2" index is blocked (read only) IndexMetadata indexMetadata = IndexMetadata.builder(clusterState.metadata().index("test_2")).settings(Settings.builder() .put(clusterState.metadata() @@ -313,17 +347,17 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener builder = ImmutableOpenMap.builder(); builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 100))); builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(0, 4))); - monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); assertThat(indicesToMarkReadOnly.get(), contains("test_1")); assertNull(indicesToRelease.get()); - // When free disk on node1 and node2 goes above 10% high watermark, then only release index block + // When free disk on node1 and node2 goes above 10% high watermark then release index block, ignoring reserved space indicesToMarkReadOnly.set(null); indicesToRelease.set(null); builder = ImmutableOpenMap.builder(); builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(10, 100))); builder.put("node2", new DiskUsage("node2", "node2", "/foo/bar", 100, between(10, 100))); - monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + monitor.onNewInfo(clusterInfo(builder.build(), reservedSpaces)); assertNull(indicesToMarkReadOnly.get()); assertThat(indicesToRelease.get(), contains("test_2")); @@ -332,7 +366,7 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener indicesToRelease.set(null); builder = ImmutableOpenMap.builder(); builder.put("node1", new DiskUsage("node1", "node1", "/foo/bar", 100, between(0, 4))); - monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + monitor.onNewInfo(clusterInfo(builder.build())); assertThat(indicesToMarkReadOnly.get(), contains("test_1")); assertNull(indicesToRelease.get()); @@ -345,7 +379,7 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener if (randomBoolean()) { builder.put("node3", new DiskUsage("node3", "node3", "/foo/bar", 100, between(0, 100))); } - monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + monitor.onNewInfo(clusterInfo(builder.build())); assertNull(indicesToMarkReadOnly.get()); assertNull(indicesToRelease.get()); @@ -357,7 +391,7 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener if (randomBoolean()) { builder.put("node3", new DiskUsage("node3", "node3", "/foo/bar", 100, between(0, 100))); } - monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + monitor.onNewInfo(clusterInfo(builder.build())); assertNull(indicesToMarkReadOnly.get()); assertNull(indicesToRelease.get()); @@ -369,7 +403,7 @@ protected void updateIndicesReadOnly(Set indicesToUpdate, ActionListener if (randomBoolean()) { builder.put("node3", new DiskUsage("node3", "node3", "/foo/bar", 100, between(0, 100))); } - monitor.onNewInfo(new ClusterInfo(builder.build(), null, null, null)); + monitor.onNewInfo(clusterInfo(builder.build())); assertThat(indicesToMarkReadOnly.get(), contains("test_1")); assertNull(indicesToRelease.get()); } @@ -492,7 +526,6 @@ long sizeOfRelocatingShards(RoutingNode routingNode, DiskUsage diskUsage, Cluste assertSingleInfoMessage(monitor, aboveLowWatermark, "high disk watermark [90%] no longer exceeded on * but low disk watermark [85%] is still exceeded"); - } private void assertNoLogging(DiskThresholdMonitor monitor, @@ -514,7 +547,7 @@ private void assertNoLogging(DiskThresholdMonitor monitor, Loggers.addAppender(diskThresholdMonitorLogger, mockAppender); for (int i = between(1, 3); i >= 0; i--) { - monitor.onNewInfo(new ClusterInfo(diskUsages, null, null, null)); + monitor.onNewInfo(clusterInfo(diskUsages)); } mockAppender.assertAllExpectationsMatched(); @@ -564,10 +597,20 @@ private void assertLogging(DiskThresholdMonitor monitor, Logger diskThresholdMonitorLogger = LogManager.getLogger(DiskThresholdMonitor.class); Loggers.addAppender(diskThresholdMonitorLogger, mockAppender); - monitor.onNewInfo(new ClusterInfo(diskUsages, null, null, null)); + monitor.onNewInfo(clusterInfo(diskUsages)); mockAppender.assertAllExpectationsMatched(); Loggers.removeAppender(diskThresholdMonitorLogger, mockAppender); mockAppender.stop(); } + + private static ClusterInfo clusterInfo(ImmutableOpenMap diskUsages) { + return clusterInfo(diskUsages, ImmutableOpenMap.of()); + } + + private static ClusterInfo clusterInfo(ImmutableOpenMap diskUsages, + ImmutableOpenMap reservedSpace) { + return new ClusterInfo(diskUsages, null, null, null, reservedSpace); + } + } diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java index 665e5b2cd646a..f49607aba1653 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderTests.java @@ -56,6 +56,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; @@ -69,6 +70,7 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; +import static org.hamcrest.Matchers.oneOf; public class DiskThresholdDeciderTests extends ESAllocationTestCase { @@ -763,6 +765,30 @@ public void testShardRelocationsTakenIntoAccount() { strategy.reroute(clusterState, "foo"); // ensure reroute doesn't fail even though there is negative free space } + + { + clusterInfoReference.set(overfullClusterInfo); + clusterState = applyStartedShardsUntilNoChange(clusterState, strategy); + final List startedShardsWithOverfullDisk = clusterState.getRoutingNodes().shardsWithState(STARTED); + assertThat(startedShardsWithOverfullDisk.size(), equalTo(4)); + for (ShardRouting shardRouting : startedShardsWithOverfullDisk) { + // no shards on node3 since it has no free space + assertThat(shardRouting.toString(), shardRouting.currentNodeId(), oneOf("node1", "node2")); + } + + // reset free space on node 3 and reserve space on node1 + clusterInfoReference.set(new DevNullClusterInfo(usages, usages, shardSizes, + (new ImmutableOpenMap.Builder()).fPut( + new ClusterInfo.NodeAndPath("node1", "/dev/null"), + new ClusterInfo.ReservedSpace.Builder().add(new ShardId("", "", 0), between(51, 200)).build()).build())); + clusterState = applyStartedShardsUntilNoChange(clusterState, strategy); + final List startedShardsWithReservedSpace = clusterState.getRoutingNodes().shardsWithState(STARTED); + assertThat(startedShardsWithReservedSpace.size(), equalTo(4)); + for (ShardRouting shardRouting : startedShardsWithReservedSpace) { + // no shards on node1 since all its free space is reserved + assertThat(shardRouting.toString(), shardRouting.currentNodeId(), oneOf("node2", "node3")); + } + } } public void testCanRemainWithShardRelocatingAway() { @@ -1123,7 +1149,13 @@ static class DevNullClusterInfo extends ClusterInfo { DevNullClusterInfo(ImmutableOpenMap leastAvailableSpaceUsage, ImmutableOpenMap mostAvailableSpaceUsage, ImmutableOpenMap shardSizes) { - super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null); + this(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, ImmutableOpenMap.of()); + } + + DevNullClusterInfo(ImmutableOpenMap leastAvailableSpaceUsage, + ImmutableOpenMap mostAvailableSpaceUsage, + ImmutableOpenMap shardSizes, ImmutableOpenMap reservedSpace) { + super(leastAvailableSpaceUsage, mostAvailableSpaceUsage, shardSizes, null, reservedSpace); } @Override diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java index d890a294eb060..aad58f596fa75 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/allocation/decider/DiskThresholdDeciderUnitTests.java @@ -104,7 +104,7 @@ public void testCanAllocateUsesMaxAvailableSpace() { ImmutableOpenMap.Builder shardSizes = ImmutableOpenMap.builder(); shardSizes.put("[test][0][p]", 10L); // 10 bytes final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), - mostAvailableUsage.build(), shardSizes.build(), ImmutableOpenMap.of()); + mostAvailableUsage.build(), shardSizes.build(), ImmutableOpenMap.of(), ImmutableOpenMap.of()); RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Collections.singleton(decider)), clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime()); allocation.debugDecision(true); @@ -159,7 +159,7 @@ public void testCannotAllocateDueToLackOfDiskResources() { final long shardSize = randomIntBetween(110, 1000); shardSizes.put("[test][0][p]", shardSize); ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), - shardSizes.build(), ImmutableOpenMap.of()); + shardSizes.build(), ImmutableOpenMap.of(), ImmutableOpenMap.of()); RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Collections.singleton(decider)), clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime()); allocation.debugDecision(true); @@ -240,7 +240,7 @@ public void testCanRemainUsesLeastAvailableSpace() { shardSizes.put("[test][2][p]", 10L); final ClusterInfo clusterInfo = new ClusterInfo(leastAvailableUsages.build(), mostAvailableUsage.build(), - shardSizes.build(), shardRoutingMap.build()); + shardSizes.build(), shardRoutingMap.build(), ImmutableOpenMap.of()); RoutingAllocation allocation = new RoutingAllocation(new AllocationDeciders(Collections.singleton(decider)), clusterState.getRoutingNodes(), clusterState, clusterInfo, System.nanoTime()); allocation.debugDecision(true); diff --git a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java index 4dcf0734f89b5..70d6cb05a3dfe 100644 --- a/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/elasticsearch/index/shard/IndexShardTests.java @@ -2365,6 +2365,7 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh } targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT); } + recoveryState.getIndex().setFileDetailsComplete(); return null; }); } @@ -2659,6 +2660,7 @@ public void testShardActiveDuringInternalRecovery() throws IOException { shard.prepareForIndexRecovery(); // Shard is still inactive since we haven't started recovering yet assertFalse(shard.isActive()); + shard.recoveryState().getIndex().setFileDetailsComplete(); shard.openEngineAndRecoverFromTranslog(); // Shard should now be active since we did recover: assertTrue(shard.isActive()); diff --git a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java index 979710fc0df5f..24524343b3f17 100644 --- a/server/src/test/java/org/elasticsearch/index/store/StoreTests.java +++ b/server/src/test/java/org/elasticsearch/index/store/StoreTests.java @@ -463,7 +463,7 @@ private void corruptFile(Directory dir, String fileIn, String fileOut) throws IO public void assertDeleteContent(Store store, Directory dir) throws IOException { deleteContent(store.directory()); assertThat(Arrays.toString(store.directory().listAll()), store.directory().listAll().length, equalTo(0)); - assertThat(store.stats().sizeInBytes(), equalTo(0L)); + assertThat(store.stats(0L).sizeInBytes(), equalTo(0L)); assertThat(dir.listAll().length, equalTo(0)); } @@ -748,8 +748,20 @@ public void testStoreStats() throws IOException { assertTrue("expected extraFS file but got: " + extraFiles, extraFiles.startsWith("extra")); initialStoreSize += store.directory().fileLength(extraFiles); } - StoreStats stats = store.stats(); - assertEquals(stats.getSize().getBytes(), initialStoreSize); + final long reservedBytes = randomBoolean() ? StoreStats.UNKNOWN_RESERVED_BYTES :randomLongBetween(0L, Integer.MAX_VALUE); + StoreStats stats = store.stats(reservedBytes); + assertEquals(initialStoreSize, stats.getSize().getBytes()); + assertEquals(reservedBytes, stats.getReservedSize().getBytes()); + + stats.add(null); + assertEquals(initialStoreSize, stats.getSize().getBytes()); + assertEquals(reservedBytes, stats.getReservedSize().getBytes()); + + final long otherStatsBytes = randomLongBetween(0L, Integer.MAX_VALUE); + final long otherStatsReservedBytes = randomBoolean() ? StoreStats.UNKNOWN_RESERVED_BYTES :randomLongBetween(0L, Integer.MAX_VALUE); + stats.add(new StoreStats(otherStatsBytes, otherStatsReservedBytes)); + assertEquals(initialStoreSize + otherStatsBytes, stats.getSize().getBytes()); + assertEquals(Math.max(reservedBytes, 0L) + Math.max(otherStatsReservedBytes, 0L), stats.getReservedSize().getBytes()); Directory dir = store.directory(); final long length; @@ -763,7 +775,7 @@ public void testStoreStats() throws IOException { } assertTrue(numNonExtraFiles(store) > 0); - stats = store.stats(); + stats = store.stats(0L); assertEquals(stats.getSizeInBytes(), length + initialStoreSize); deleteContent(store.directory()); diff --git a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java index 1c2b5331fef30..fbb93e2b0d240 100644 --- a/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java +++ b/server/src/test/java/org/elasticsearch/indices/recovery/RecoveryTargetTests.java @@ -200,6 +200,7 @@ public void testIndex() throws Throwable { Collections.shuffle(Arrays.asList(files), random()); final RecoveryState.Index index = new RecoveryState.Index(); + assertThat(index.bytesStillToRecover(), equalTo(-1L)); if (randomBoolean()) { // initialize with some data and then reset @@ -213,13 +214,15 @@ public void testIndex() throws Throwable { index.addTargetThrottling(randomIntBetween(0, 20)); } } + if (randomBoolean()) { + index.setFileDetailsComplete(); + } if (randomBoolean()) { index.stop(); } index.reset(); } - // before we start we must report 0 assertThat(index.recoveredFilesPercent(), equalTo((float) 0.0)); assertThat(index.recoveredBytesPercent(), equalTo((float) 0.0)); @@ -242,7 +245,10 @@ public void testIndex() throws Throwable { assertThat(index.recoveredBytes(), equalTo(0L)); assertThat(index.recoveredFilesPercent(), equalTo(filesToRecover.size() == 0 ? 100.0f : 0.0f)); assertThat(index.recoveredBytesPercent(), equalTo(filesToRecover.size() == 0 ? 100.0f : 0.0f)); + assertThat(index.bytesStillToRecover(), equalTo(-1L)); + index.setFileDetailsComplete(); + assertThat(index.bytesStillToRecover(), equalTo(totalFileBytes - totalReusedBytes)); long bytesToRecover = totalFileBytes - totalReusedBytes; boolean completeRecovery = bytesToRecover == 0 || randomBoolean(); @@ -322,6 +328,7 @@ Index createObj(StreamInput in) throws IOException { assertThat(index.recoveredBytes(), equalTo(recoveredBytes)); assertThat(index.targetThrottling().nanos(), equalTo(targetThrottling)); assertThat(index.sourceThrottling().nanos(), equalTo(sourceThrottling)); + assertThat(index.bytesStillToRecover(), equalTo(totalFileBytes - totalReusedBytes - recoveredBytes)); if (index.totalRecoverFiles() == 0) { assertThat((double) index.recoveredFilesPercent(), equalTo(100.0)); assertThat((double) index.recoveredBytesPercent(), equalTo(100.0)); @@ -351,6 +358,9 @@ public void testStageSequenceEnforcement() { RecoveryState state = new RecoveryState(shardRouting, discoveryNode, shardRouting.recoverySource().getType() == RecoverySource.Type.PEER ? discoveryNode : null); for (Stage stage : stages) { + if (stage == Stage.FINALIZE) { + state.getIndex().setFileDetailsComplete(); + } state.setStage(stage); } fail("succeeded in performing the illegal sequence [" + Strings.arrayToCommaDelimitedString(stages) + "]"); @@ -369,6 +379,9 @@ public void testStageSequenceEnforcement() { shardRouting.recoverySource().getType() == RecoverySource.Type.PEER ? discoveryNode : null); for (Stage stage : list) { state.setStage(stage); + if (stage == Stage.INDEX) { + state.getIndex().setFileDetailsComplete(); + } } assertThat(state.getStage(), equalTo(Stage.DONE)); diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java index d740e2f5095d8..9b0181f355005 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/MockInternalClusterInfoService.java @@ -91,7 +91,7 @@ List adjustNodesStats(List nodesStats) { class SizeFakingClusterInfo extends ClusterInfo { SizeFakingClusterInfo(ClusterInfo delegate) { super(delegate.getNodeLeastAvailableDiskUsages(), delegate.getNodeMostAvailableDiskUsages(), - delegate.shardSizes, delegate.routingToDataPath); + delegate.shardSizes, delegate.routingToDataPath, delegate.reservedSpace); } @Override diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java index b81b29d2c1214..0290029e74d9a 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/action/ShardFollowTaskReplicationTests.java @@ -473,6 +473,7 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh leader.store().directory(), md.name(), md.name(), IOContext.DEFAULT); } } + recoveryState.getIndex().setFileDetailsComplete(); return null; }); } diff --git a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java index 2124eb57815a6..1a57e7b9a91af 100644 --- a/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java +++ b/x-pack/plugin/ccr/src/test/java/org/elasticsearch/xpack/ccr/index/engine/FollowEngineIndexShardTests.java @@ -142,6 +142,7 @@ public void restoreShard(Store store, SnapshotId snapshotId, IndexId indexId, Sh } targetStore.directory().copyFrom(sourceStore.directory(), file, file, IOContext.DEFAULT); } + recoveryState.getIndex().setFileDetailsComplete(); return null; }); } diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java index 7486f1f0bbad2..bb763377edf5d 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/cluster/ClusterStatsMonitoringDocTests.java @@ -409,7 +409,8 @@ public void testToXContent() throws IOException { + "\"deleted\":0" + "}," + "\"store\":{" - + "\"size_in_bytes\":0" + + "\"size_in_bytes\":0," + + "\"reserved_in_bytes\":0" + "}," + "\"fielddata\":{" + "\"memory_size_in_bytes\":0," diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java index 4ee60263df311..1b3bc3712961d 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndexStatsMonitoringDocTests.java @@ -325,7 +325,7 @@ private static CommonStats mockCommonStats() { commonStats.getMerge().add(no, no, no, ++iota, no, no, no, no, no, no); commonStats.getQueryCache().add(new QueryCacheStats(++iota, ++iota, ++iota, ++iota, no)); commonStats.getRequestCache().add(new RequestCacheStats(++iota, ++iota, ++iota, ++iota)); - commonStats.getStore().add(new StoreStats(++iota)); + commonStats.getStore().add(new StoreStats(++iota, no)); commonStats.getRefresh().add(new RefreshStats(no, ++iota, no, ++iota, (int) no)); final IndexingStats.Stats indexingStats = new IndexingStats.Stats(++iota, ++iota, no, no, no, no, no, no, false, ++iota); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java index 1e90e6b38e62d..759d69e106bdf 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/indices/IndicesStatsMonitoringDocTests.java @@ -144,7 +144,7 @@ public void testToXContent() throws IOException { private CommonStats mockCommonStats() { final CommonStats commonStats = new CommonStats(CommonStatsFlags.ALL); commonStats.getDocs().add(new DocsStats(1L, 0L, randomNonNegativeLong())); - commonStats.getStore().add(new StoreStats(2L)); + commonStats.getStore().add(new StoreStats(2L, 0L)); final IndexingStats.Stats indexingStats = new IndexingStats.Stats(3L, 4L, 0L, 0L, 0L, 0L, 0L, 0L, true, 5L); commonStats.getIndexing().add(new IndexingStats(indexingStats, null)); diff --git a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java index f58c31ab7769e..a11ad4a325c24 100644 --- a/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java +++ b/x-pack/plugin/monitoring/src/test/java/org/elasticsearch/xpack/monitoring/collector/node/NodeStatsMonitoringDocTests.java @@ -291,7 +291,7 @@ private static NodeStats mockNodeStats() { final CommonStats indicesCommonStats = new CommonStats(CommonStatsFlags.ALL); indicesCommonStats.getDocs().add(new DocsStats(++iota, no, randomNonNegativeLong())); indicesCommonStats.getFieldData().add(new FieldDataStats(++iota, ++iota, null)); - indicesCommonStats.getStore().add(new StoreStats(++iota)); + indicesCommonStats.getStore().add(new StoreStats(++iota, no)); final IndexingStats.Stats indexingStats = new IndexingStats.Stats(++iota, ++iota, ++iota, no, no, no, no, no, false, ++iota); indicesCommonStats.getIndexing().add(new IndexingStats(indexingStats, null)); From acb6777474470a7adda34c8922050908ab8dc814 Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 1 Jul 2020 08:19:52 +0100 Subject: [PATCH 2/3] Picky picky --- .../src/main/java/org/elasticsearch/cluster/ClusterInfo.java | 2 +- .../elasticsearch/cluster/InternalClusterInfoService.java | 2 +- .../org/elasticsearch/indices/recovery/RecoveryState.java | 5 +++-- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java index 8444425fce07a..9b0c5738055cf 100644 --- a/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java +++ b/server/src/main/java/org/elasticsearch/cluster/ClusterInfo.java @@ -86,7 +86,7 @@ public ClusterInfo(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { reservedSpaceMap = in.readMap(NodeAndPath::new, ReservedSpace::new); } else { - reservedSpaceMap = Map.of(); + reservedSpaceMap = org.elasticsearch.common.collect.Map.of(); } ImmutableOpenMap.Builder leastBuilder = ImmutableOpenMap.builder(); diff --git a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java index 47cb2f051b38c..b56ba6dc3d113 100644 --- a/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java +++ b/server/src/main/java/org/elasticsearch/cluster/InternalClusterInfoService.java @@ -321,7 +321,7 @@ public void onFailure(Exception e) { } }); - final CountDownLatch indicesLatch = updateIndicesStats(new ActionListener<>() { + final CountDownLatch indicesLatch = updateIndicesStats(new ActionListener() { @Override public void onResponse(IndicesStatsResponse indicesStatsResponse) { final ShardStats[] stats = indicesStatsResponse.getShards(); diff --git a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java index 5bef3ddd006d5..fe756daf240ac 100644 --- a/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java +++ b/server/src/main/java/org/elasticsearch/indices/recovery/RecoveryState.java @@ -36,6 +36,7 @@ import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.index.shard.ShardId; +import org.elasticsearch.index.store.StoreStats; import java.io.IOException; import java.util.ArrayList; @@ -711,7 +712,7 @@ public Index(StreamInput in) throws IOException { File file = new File(in); fileDetails.put(file.name, file); } - if (in.getVersion().onOrAfter(Version.V_8_0_0)) { + if (in.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { fileDetailsComplete = in.readBoolean(); } else { // This flag is used by disk-based allocation to decide whether the remaining bytes measurement is accurate or not; if not @@ -732,7 +733,7 @@ public synchronized void writeTo(StreamOutput out) throws IOException { for (File file : files) { file.writeTo(out); } - if (out.getVersion().onOrAfter(Version.V_8_0_0)) { + if (out.getVersion().onOrAfter(StoreStats.RESERVED_BYTES_VERSION)) { out.writeBoolean(fileDetailsComplete); } out.writeLong(sourceThrottlingInNanos); From 56f6dec6d1e957da6e6171bf685bdd8cb081003d Mon Sep 17 00:00:00 2001 From: David Turner Date: Wed, 1 Jul 2020 08:48:41 +0100 Subject: [PATCH 3/3] Missing type --- .../org/elasticsearch/indices/recovery/IndexRecoveryIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java index 40946ad020b22..11993d88738c8 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java @@ -1847,7 +1847,7 @@ public void testReservesBytesDuringPeerRecoveryPhaseOne() throws Exception { .put("index.routing.allocation.include._name", String.join(",", dataNodes)).build()); ensureGreen(indexName); final List indexRequests = IntStream.range(0, between(10, 500)) - .mapToObj(n -> client().prepareIndex(indexName).setSource("foo", "bar")) + .mapToObj(n -> client().prepareIndex(indexName, "_doc").setSource("foo", "bar")) .collect(Collectors.toList()); indexRandom(randomBoolean(), true, true, indexRequests); assertThat(client().admin().indices().prepareFlush(indexName).get().getFailedShards(), equalTo(0));