Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Purge unused node info and transport addr info after fetching shard stats. #77266

Closed

Conversation

howardhuanghua
Copy link
Contributor

Related to #76218.
To prevent memory explosion, purge unused node info and transport addr info after fetching shard stats.

@elasticsearchmachine elasticsearchmachine added v8.0.0 external-contributor Pull request authored by a developer outside the Elasticsearch team labels Sep 3, 2021
@DaveCTurner DaveCTurner added the :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) label Sep 3, 2021
@elasticmachine elasticmachine added the Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. label Sep 3, 2021
@elasticmachine
Copy link
Collaborator

Pinging @elastic/es-distributed (Team:Distributed)

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do this differently, see comment below.

@@ -32,10 +33,25 @@ protected BaseNodeResponse(DiscoveryNode node) {
this.node = node;
}

public void purgeNodeInfo() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather not add to this class, nor make it any more mutable than it already is. Instead, let's copy the information we need out of the response and into a different object which is smaller and can be kept around for longer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I throught to add new objects to replace NodeGatewayStartedShards and NodeStoreFilesMetadata, change their base class to not extend from BaseNodesResponse. But it seems it would have a lot of dependencies, I will try to check the dependencies.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to change NodeGatewayStartedShards or NodeStoreFilesMetadata either. My understanding is that the memory is all held by this field:

private final Map<String, NodeEntry<T>> cache = new HashMap<>();

Its type parameter T is required to be a BaseNodesResponse but really this parameter represents both the response type and the cache entry type and I don't think there's any reason for those types to be the same. Something like this kind of direction should work I think:

diff --git a/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java b/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java
index 3a7cdfee84e..10c022d44c3 100644
--- a/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java
+++ b/server/src/main/java/org/elasticsearch/gateway/AsyncShardFetch.java
@@ -45,7 +45,7 @@ import static java.util.Collections.emptySet;
  * and once the results are back, it makes sure to schedule a reroute to make sure those results will
  * be taken into account.
  */
-public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Releasable {
+public abstract class AsyncShardFetch<Response extends BaseNodeResponse, Cached> implements Releasable {
 
     /**
      * An action that lists the relevant shard data that needs to be fetched.
@@ -58,20 +58,20 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
     protected final String type;
     protected final ShardId shardId;
     protected final String customDataPath;
-    private final Lister<BaseNodesResponse<T>, T> action;
-    private final Map<String, NodeEntry<T>> cache = new HashMap<>();
+    private final Lister<BaseNodesResponse<Response>, Response> action;
+    private final Map<String, NodeEntry<Cached>> cache = new HashMap<>();
     private final Set<String> nodesToIgnore = new HashSet<>();
     private final AtomicLong round = new AtomicLong();
     private boolean closed;
 
     @SuppressWarnings("unchecked")
     protected AsyncShardFetch(Logger logger, String type, ShardId shardId, String customDataPath,
-                              Lister<? extends BaseNodesResponse<T>, T> action) {
+                              Lister<? extends BaseNodesResponse<Response>, Response> action) {
         this.logger = logger;
         this.type = type;
         this.shardId = Objects.requireNonNull(shardId);
         this.customDataPath = Objects.requireNonNull(customDataPath);
-        this.action = (Lister<BaseNodesResponse<T>, T>) action;
+        this.action = (Lister<BaseNodesResponse<Response>, Response>) action;
     }
 
     @Override
@@ -84,7 +84,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
      */
     public synchronized int getNumberOfInFlightFetches() {
         int count = 0;
-        for (NodeEntry<T> nodeEntry : cache.values()) {
+        for (NodeEntry<Cached> nodeEntry : cache.values()) {
             if (nodeEntry.isFetching()) {
                 count++;
             }
@@ -99,18 +99,18 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
      * The ignoreNodes are nodes that are supposed to be ignored for this round, since fetching is async, we need
      * to keep them around and make sure we add them back when all the responses are fetched and returned.
      */
-    public synchronized FetchResult<T> fetchData(DiscoveryNodes nodes, Set<String> ignoreNodes) {
+    public synchronized FetchResult<Cached> fetchData(DiscoveryNodes nodes, Set<String> ignoreNodes) {
         if (closed) {
             throw new IllegalStateException(shardId + ": can't fetch data on closed async fetch");
         }
         nodesToIgnore.addAll(ignoreNodes);
         fillShardCacheWithDataNodes(cache, nodes);
-        List<NodeEntry<T>> nodesToFetch = findNodesToFetch(cache);
+        List<NodeEntry<Cached>> nodesToFetch = findNodesToFetch(cache);
         if (nodesToFetch.isEmpty() == false) {
             // mark all node as fetching and go ahead and async fetch them
             // use a unique round id to detect stale responses in processAsyncFetch
             final long fetchingRound = round.incrementAndGet();
-            for (NodeEntry<T> nodeEntry : nodesToFetch) {
+            for (NodeEntry<Cached> nodeEntry : nodesToFetch) {
                 nodeEntry.markAsFetching(fetchingRound);
             }
             DiscoveryNode[] discoNodesToFetch = nodesToFetch.stream().map(NodeEntry::getNodeId).map(nodes::get)
@@ -123,12 +123,12 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
             return new FetchResult<>(shardId, null, emptySet());
         } else {
             // nothing to fetch, yay, build the return value
-            Map<DiscoveryNode, T> fetchData = new HashMap<>();
+            Map<DiscoveryNode, Cached> fetchData = new HashMap<>();
             Set<String> failedNodes = new HashSet<>();
-            for (Iterator<Map.Entry<String, NodeEntry<T>>> it = cache.entrySet().iterator(); it.hasNext(); ) {
-                Map.Entry<String, NodeEntry<T>> entry = it.next();
+            for (Iterator<Map.Entry<String, NodeEntry<Cached>>> it = cache.entrySet().iterator(); it.hasNext(); ) {
+                Map.Entry<String, NodeEntry<Cached>> entry = it.next();
                 String nodeId = entry.getKey();
-                NodeEntry<T> nodeEntry = entry.getValue();
+                NodeEntry<Cached> nodeEntry = entry.getValue();
 
                 DiscoveryNode node = nodes.get(nodeId);
                 if (node != null) {
@@ -153,17 +153,19 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
             if (failedNodes.isEmpty() == false || allIgnoreNodes.isEmpty() == false) {
                 reroute(shardId, "nodes failed [" + failedNodes.size() + "], ignored [" + allIgnoreNodes.size() + "]");
             }
-            return new FetchResult<>(shardId, fetchData, allIgnoreNodes);
+            return new FetchResult<Cached>(shardId, fetchData, allIgnoreNodes);
         }
     }
 
+    protected abstract Cached extract(Response value);
+
     /**
      * Called by the response handler of the async action to fetch data. Verifies that its still working
      * on the same cache generation, otherwise the results are discarded. It then goes and fills the relevant data for
      * the shard (response + failures), issuing a reroute at the end of it to make sure there will be another round
      * of allocations taking this new data into account.
      */
-    protected synchronized void processAsyncFetch(List<T> responses, List<FailedNodeException> failures, long fetchingRound) {
+    protected synchronized void processAsyncFetch(List<Response> responses, List<FailedNodeException> failures, long fetchingRound) {
         if (closed) {
             // we are closed, no need to process this async fetch at all
             logger.trace("{} ignoring fetched [{}] results, already closed", shardId, type);
@@ -172,8 +174,8 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
         logger.trace("{} processing fetched [{}] results", shardId, type);
 
         if (responses != null) {
-            for (T response : responses) {
-                NodeEntry<T> nodeEntry = cache.get(response.getNode().getId());
+            for (Response response : responses) {
+                NodeEntry<Cached> nodeEntry = cache.get(response.getNode().getId());
                 if (nodeEntry != null) {
                     if (nodeEntry.getFetchingRound() != fetchingRound) {
                         assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
@@ -185,7 +187,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
                     } else {
                         // if the entry is there, for the right fetching round and not marked as failed already, process it
                         logger.trace("{} marking {} as done for [{}], result is [{}]", shardId, nodeEntry.getNodeId(), type, response);
-                        nodeEntry.doneFetching(response);
+                        nodeEntry.doneFetching(extract(response));
                     }
                 }
             }
@@ -193,7 +195,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
         if (failures != null) {
             for (FailedNodeException failure : failures) {
                 logger.trace("{} processing failure {} for [{}]", shardId, failure, type);
-                NodeEntry<T> nodeEntry = cache.get(failure.nodeId());
+                NodeEntry<Cached> nodeEntry = cache.get(failure.nodeId());
                 if (nodeEntry != null) {
                     if (nodeEntry.getFetchingRound() != fetchingRound) {
                         assert nodeEntry.getFetchingRound() > fetchingRound : "node entries only replaced by newer rounds";
@@ -235,12 +237,12 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
      * Fills the shard fetched data with new (data) nodes and a fresh NodeEntry, and removes from
      * it nodes that are no longer part of the state.
      */
-    private void fillShardCacheWithDataNodes(Map<String, NodeEntry<T>> shardCache, DiscoveryNodes nodes) {
+    private void fillShardCacheWithDataNodes(Map<String, NodeEntry<Cached>> shardCache, DiscoveryNodes nodes) {
         // verify that all current data nodes are there
         for (ObjectObjectCursor<String, DiscoveryNode> cursor : nodes.getDataNodes()) {
             DiscoveryNode node = cursor.value;
             if (shardCache.containsKey(node.getId()) == false) {
-                shardCache.put(node.getId(), new NodeEntry<T>(node.getId()));
+                shardCache.put(node.getId(), new NodeEntry<Cached>(node.getId()));
             }
         }
         // remove nodes that are not longer part of the data nodes set
@@ -251,9 +253,9 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
      * Finds all the nodes that need to be fetched. Those are nodes that have no
      * data, and are not in fetch mode.
      */
-    private List<NodeEntry<T>> findNodesToFetch(Map<String, NodeEntry<T>> shardCache) {
-        List<NodeEntry<T>> nodesToFetch = new ArrayList<>();
-        for (NodeEntry<T> nodeEntry : shardCache.values()) {
+    private List<NodeEntry<Cached>> findNodesToFetch(Map<String, NodeEntry<Cached>> shardCache) {
+        List<NodeEntry<Cached>> nodesToFetch = new ArrayList<>();
+        for (NodeEntry<Cached> nodeEntry : shardCache.values()) {
             if (nodeEntry.hasData() == false && nodeEntry.isFetching() == false) {
                 nodesToFetch.add(nodeEntry);
             }
@@ -264,8 +266,8 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
     /**
      * Are there any nodes that are fetching data?
      */
-    private boolean hasAnyNodeFetching(Map<String, NodeEntry<T>> shardCache) {
-        for (NodeEntry<T> nodeEntry : shardCache.values()) {
+    private boolean hasAnyNodeFetching(Map<String, NodeEntry<Cached>> shardCache) {
+        for (NodeEntry<Cached> nodeEntry : shardCache.values()) {
             if (nodeEntry.isFetching()) {
                 return true;
             }
@@ -279,9 +281,9 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
     // visible for testing
     void asyncFetch(final DiscoveryNode[] nodes, long fetchingRound) {
         logger.trace("{} fetching [{}] from {}", shardId, type, nodes);
-        action.list(shardId, customDataPath, nodes, new ActionListener<BaseNodesResponse<T>>() {
+        action.list(shardId, customDataPath, nodes, new ActionListener<BaseNodesResponse<Response>>() {
             @Override
-            public void onResponse(BaseNodesResponse<T> response) {
+            public void onResponse(BaseNodesResponse<Response> response) {
                 processAsyncFetch(response.getNodes(), response.failures(), fetchingRound);
             }
 
@@ -300,13 +302,13 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
      * The result of a fetch operation. Make sure to first check {@link #hasData()} before
      * fetching the actual data.
      */
-    public static class FetchResult<T extends BaseNodeResponse> {
+    public static class FetchResult<U> {
 
         private final ShardId shardId;
-        private final Map<DiscoveryNode, T> data;
+        private final Map<DiscoveryNode, U> data;
         private final Set<String> ignoreNodes;
 
-        public FetchResult(ShardId shardId, Map<DiscoveryNode, T> data, Set<String> ignoreNodes) {
+        public FetchResult(ShardId shardId, Map<DiscoveryNode, U> data, Set<String> ignoreNodes) {
             this.shardId = shardId;
             this.data = data;
             this.ignoreNodes = ignoreNodes;
@@ -324,7 +326,7 @@ public abstract class AsyncShardFetch<T extends BaseNodeResponse> implements Rel
          * Returns the actual data, note, make sure to check {@link #hasData()} first and
          * only use this when there is an actual data.
          */
-        public Map<DiscoveryNode, T> getData() {
+        public Map<DiscoveryNode, U> getData() {
             assert data != null : "getData should only be called if there is data to be fetched, please check hasData first";
             return this.data;
         }

@howardhuanghua
Copy link
Contributor Author

Hi @DaveCTurner , I have implemented CachedNodeGatewayStartedShards and CachedNodeStoreFilesMetadata, they will be extracted from the original object that extends from BaseNodeResponse . Please help to review again, thank you.

@howardhuanghua
Copy link
Contributor Author

@DaveCTurner Would you please help to review again? Thank you.

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @howardhuanghua this looks like the right sort of shape. I will get to review it properly in the coming days but I left one comment from a quick scan.

@DaveCTurner DaveCTurner self-requested a review September 15, 2021 08:49
@DaveCTurner
Copy link
Contributor

@elasticmachine ok to test

@DaveCTurner
Copy link
Contributor

I'm going to discuss this with some colleagues as I remain concerned about the memory usage while the master hasn't received a response from each node yet, and I think we could reduce that memory usage overall (and in more places too) by re-using the DiscoveryNode objects we already have instead of making new ones. That'd mean we don't need to make this change: rather than an extra ~2kB for our own DiscoveryNode we'd just need a 4 byte reference to an existing object. Bear with us.

@howardhuanghua
Copy link
Contributor Author

OK. Then we need to consider DiscoveryNode's mutiple usage scenarios and different version capabilities. Looking forward to the results of your discussion.

DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request Sep 19, 2021
We read various objects from the wire that already exist in the cluster
state. The most notable is `DiscoveryNode` which can consume ~2kB in
heap for each fresh object, but rarely changes, so it's pretty wasteful
to use fresh objects here. There could be thousands (millions?) of
`DiscoveryNode` objects in flight from various `TransportNodesAction`
responses.

This branch introduces `ClusterStateReusingStreamInput` which lets the
caller capture an appropriate `ClusterState` from which to re-use
`DiscoveryNode` objects if appropriate.

Relates elastic#77266
DaveCTurner added a commit to DaveCTurner/elasticsearch that referenced this pull request Sep 20, 2021
We read various objects from the wire that already exist in the cluster
state. The most notable is `DiscoveryNode` which can consume ~2kB in
heap for each fresh object, but rarely changes, so it's pretty wasteful
to use fresh objects here. There could be thousands (millions?) of
`DiscoveryNode` objects in flight from various `TransportNodesAction`
responses.

This branch adds a `DiscoveryNode` parameter to the response
deserialisation method and makes sure that the worst offenders re-use
the local object rather than creating a fresh one:

- `TransportNodesListShardStoreMetadata`
- `TransportNodesListGatewayStartedShards`

Relates elastic#77266
@DaveCTurner
Copy link
Contributor

@howardhuanghua we would be interested in your opinion on #77991 which cuts the memory usage drastically at deserialisation time. Could you confirm whether this solves the memory problem in your system and, if not, quantify the extra savings that this PR offers?

DaveCTurner added a commit that referenced this pull request Sep 20, 2021
We read various objects from the wire that already exist in the cluster
state. The most notable is `DiscoveryNode` which can consume ~2kB in
heap for each fresh object, but rarely changes, so it's pretty wasteful
to use fresh objects here. There could be thousands (millions?) of
`DiscoveryNode` objects in flight from various `TransportNodesAction`
responses.

This branch adds a `DiscoveryNode` parameter to the response
deserialisation method and makes sure that the worst offenders re-use
the local object rather than creating a fresh one:

- `TransportNodesListShardStoreMetadata`
- `TransportNodesListGatewayStartedShards`

Relates #77266
DaveCTurner added a commit that referenced this pull request Sep 20, 2021
We read various objects from the wire that already exist in the cluster
state. The most notable is `DiscoveryNode` which can consume ~2kB in
heap for each fresh object, but rarely changes, so it's pretty wasteful
to use fresh objects here. There could be thousands (millions?) of
`DiscoveryNode` objects in flight from various `TransportNodesAction`
responses.

This branch adds a `DiscoveryNode` parameter to the response
deserialisation method and makes sure that the worst offenders re-use
the local object rather than creating a fresh one:

- `TransportNodesListShardStoreMetadata`
- `TransportNodesListGatewayStartedShards`

Relates #77266
@howardhuanghua
Copy link
Contributor Author

Could you confirm whether this solves the memory problem in your system and, if not, quantify the extra savings that this PR offers?

In our test env, we could not simulate hundreds of nodes, but in a 10 nodes and 30w shards test cluster, with this PR we could see a single smaller shard fetch result without DiscoveryNode. I have checked your fix in #77991, and I will try to backport to 7.14 and verify it in out production cluster.

@DaveCTurner
Copy link
Contributor

No feedback yet so I'm closing this for now. If benchmarking shows that this PR yields nontrivial improvements over #77991 then we'll reopen this for sure. Thanks for working on this in any case @howardhuanghua: I think this change makes sense, we just need some figures to justify the effort needed to review it properly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) external-contributor Pull request authored by a developer outside the Elasticsearch team feedback_needed Team:Distributed (Obsolete) Meta label for distributed team (obsolete). Replaced by Distributed Indexing/Coordination. v8.0.0-beta1
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants