diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodeResponse.java b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodeResponse.java index be28bbc9d87a6..3b77886110a9e 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodeResponse.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/BaseNodeResponse.java @@ -20,11 +20,11 @@ */ public abstract class BaseNodeResponse extends TransportResponse { - private DiscoveryNode node; + private final DiscoveryNode node; protected BaseNodeResponse(StreamInput in) throws IOException { super(in); - node = new DiscoveryNode(in); + node = DiscoveryNode.readFrom(in); } protected BaseNodeResponse(DiscoveryNode node) { diff --git a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java index 0cb214c305d1b..ec975275da36c 100644 --- a/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/nodes/TransportNodesAction.java @@ -16,6 +16,7 @@ import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.ClusterStateReusingStreamInput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.tasks.CancellableTask; @@ -185,13 +186,15 @@ class AsyncAction { private final AtomicReferenceArray responses; private final AtomicInteger counter = new AtomicInteger(); private final Task task; + private final ClusterState clusterState; AsyncAction(Task task, NodesRequest request, ActionListener listener) { this.task = task; this.request = request; this.listener = listener; + this.clusterState = clusterService.state(); if (request.concreteNodes() == null) { - resolveRequest(request, clusterService.state()); + resolveRequest(request, clusterState); assert request.concreteNodes() != null; } this.responses = new AtomicReferenceArray<>(request.concreteNodes().length); @@ -218,7 +221,7 @@ void start() { new TransportResponseHandler() { @Override public NodeResponse read(StreamInput in) throws IOException { - return newNodeResponse(in); + return newNodeResponse(new ClusterStateReusingStreamInput(in, clusterState)); } @Override diff --git a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java index 398feeb2c479b..c9a27eb1aac62 100644 --- a/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java +++ b/server/src/main/java/org/elasticsearch/cluster/node/DiscoveryNode.java @@ -10,6 +10,7 @@ import org.elasticsearch.Version; import org.elasticsearch.common.UUIDs; +import org.elasticsearch.common.io.stream.ClusterStateReusingStreamInput; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; @@ -218,6 +219,14 @@ public static Set getRolesFromSettings(final Settings setting return Set.copyOf(NODE_ROLES_SETTING.get(settings)); } + public static DiscoveryNode readFrom(StreamInput in) throws IOException { + if (in instanceof ClusterStateReusingStreamInput) { + return ((ClusterStateReusingStreamInput)in).readDiscoveryNode(); + } else { + return new DiscoveryNode(in); + } + } + /** * Creates a new {@link DiscoveryNode} by reading from the stream provided as argument * @param in the stream diff --git a/server/src/main/java/org/elasticsearch/common/io/stream/ClusterStateReusingStreamInput.java b/server/src/main/java/org/elasticsearch/common/io/stream/ClusterStateReusingStreamInput.java new file mode 100644 index 0000000000000..d366f2a6d2ce1 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/io/stream/ClusterStateReusingStreamInput.java @@ -0,0 +1,33 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0 and the Server Side Public License, v 1; you may not use this file except + * in compliance with, at your election, the Elastic License 2.0 or the Server + * Side Public License, v 1. + */ + +package org.elasticsearch.common.io.stream; + +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.node.DiscoveryNodes; + +import java.io.IOException; + +public class ClusterStateReusingStreamInput extends FilterStreamInput { + + private final DiscoveryNodes discoveryNodes; + + public ClusterStateReusingStreamInput(StreamInput delegate, ClusterState clusterState) { + super(delegate); + this.discoveryNodes = clusterState.nodes(); + } + + public DiscoveryNode readDiscoveryNode() throws IOException { + final DiscoveryNode fromStream = new DiscoveryNode(delegate); + final DiscoveryNode fromClusterState = discoveryNodes.get(fromStream.getId()); + return fromStream.equals(fromClusterState) && fromStream.getRoles().equals(fromClusterState.getRoles()) + ? fromClusterState + : fromStream; + } +} diff --git a/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java b/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java index c6482ac7b50d3..f92d5b6e8019f 100644 --- a/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java +++ b/server/src/main/java/org/elasticsearch/gateway/TransportNodesListGatewayStartedShards.java @@ -199,6 +199,7 @@ public static class NodesGatewayStartedShards extends BaseNodesResponse nodes, diff --git a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java index cbfc3ba500204..4f47b07e25f15 100644 --- a/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java +++ b/server/src/main/java/org/elasticsearch/indices/store/TransportNodesListShardStoreMetadata.java @@ -285,6 +285,7 @@ public static class NodesStoreFilesMetadata extends BaseNodesResponse nodes, List failures) {