Skip to content

Commit

Permalink
WIP draft for ClusterStateReusingStreamInput
Browse files Browse the repository at this point in the history
We read various objects from the wire that already exist in the cluster
state. The most notable is `DiscoveryNode` which can consume ~2kB in
heap for each fresh object, but rarely changes, so it's pretty wasteful
to use fresh objects here. There could be thousands (millions?) of
`DiscoveryNode` objects in flight from various `TransportNodesAction`
responses.

This branch introduces `ClusterStateReusingStreamInput` which lets the
caller capture an appropriate `ClusterState` from which to re-use
`DiscoveryNode` objects if appropriate.

Relates elastic#77266
  • Loading branch information
DaveCTurner committed Sep 19, 2021
1 parent be4bd68 commit aa61253
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@
*/
public abstract class BaseNodeResponse extends TransportResponse {

private DiscoveryNode node;
private final DiscoveryNode node;

protected BaseNodeResponse(StreamInput in) throws IOException {
super(in);
node = new DiscoveryNode(in);
node = DiscoveryNode.readFrom(in);
}

protected BaseNodeResponse(DiscoveryNode node) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.ClusterStateReusingStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.tasks.CancellableTask;
Expand Down Expand Up @@ -185,13 +186,15 @@ class AsyncAction {
private final AtomicReferenceArray<Object> responses;
private final AtomicInteger counter = new AtomicInteger();
private final Task task;
private final ClusterState clusterState;

AsyncAction(Task task, NodesRequest request, ActionListener<NodesResponse> listener) {
this.task = task;
this.request = request;
this.listener = listener;
this.clusterState = clusterService.state();
if (request.concreteNodes() == null) {
resolveRequest(request, clusterService.state());
resolveRequest(request, clusterState);
assert request.concreteNodes() != null;
}
this.responses = new AtomicReferenceArray<>(request.concreteNodes().length);
Expand All @@ -218,7 +221,7 @@ void start() {
new TransportResponseHandler<NodeResponse>() {
@Override
public NodeResponse read(StreamInput in) throws IOException {
return newNodeResponse(in);
return newNodeResponse(new ClusterStateReusingStreamInput(in, clusterState));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import org.elasticsearch.Version;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.io.stream.ClusterStateReusingStreamInput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -218,6 +219,14 @@ public static Set<DiscoveryNodeRole> getRolesFromSettings(final Settings setting
return Set.copyOf(NODE_ROLES_SETTING.get(settings));
}

public static DiscoveryNode readFrom(StreamInput in) throws IOException {
if (in instanceof ClusterStateReusingStreamInput) {
return ((ClusterStateReusingStreamInput)in).readDiscoveryNode();
} else {
return new DiscoveryNode(in);
}
}

/**
* Creates a new {@link DiscoveryNode} by reading from the stream provided as argument
* @param in the stream
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.common.io.stream;

import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;

import java.io.IOException;

public class ClusterStateReusingStreamInput extends FilterStreamInput {

private final DiscoveryNodes discoveryNodes;

public ClusterStateReusingStreamInput(StreamInput delegate, ClusterState clusterState) {
super(delegate);
this.discoveryNodes = clusterState.nodes();
}

public DiscoveryNode readDiscoveryNode() throws IOException {
final DiscoveryNode fromStream = new DiscoveryNode(delegate);
final DiscoveryNode fromClusterState = discoveryNodes.get(fromStream.getId());
return fromStream.equals(fromClusterState) && fromStream.getRoles().equals(fromClusterState.getRoles())
? fromClusterState
: fromStream;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ public static class NodesGatewayStartedShards extends BaseNodesResponse<NodeGate

public NodesGatewayStartedShards(StreamInput in) throws IOException {
super(in);
assert false : "only ever executed locally";
}

public NodesGatewayStartedShards(ClusterName clusterName, List<NodeGatewayStartedShards> nodes,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,7 @@ public static class NodesStoreFilesMetadata extends BaseNodesResponse<NodeStoreF

public NodesStoreFilesMetadata(StreamInput in) throws IOException {
super(in);
assert false : "only ever executed locally";
}

public NodesStoreFilesMetadata(ClusterName clusterName, List<NodeStoreFilesMetadata> nodes, List<FailedNodeException> failures) {
Expand Down

0 comments on commit aa61253

Please sign in to comment.