Skip to content

Commit

Permalink
CCS: Drop http address from remote cluster info (#29568)
Browse files Browse the repository at this point in the history
They are expensive to fetch and no longer needed by Kibana so they
*shouldn't* be needed by anyone else either.

Closes #29207
  • Loading branch information
nik9000 authored Apr 27, 2018
1 parent 912fbb2 commit f4ed902
Show file tree
Hide file tree
Showing 10 changed files with 71 additions and 197 deletions.
3 changes: 0 additions & 3 deletions docs/reference/cluster/remote-info.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ the configured remote cluster alias.
`seeds`::
The configured initial seed transport addresses of the remote cluster.

`http_addresses`::
The published http addresses of all connected remote nodes.

`connected`::
True if there is at least one connection to the remote cluster.

Expand Down
4 changes: 4 additions & 0 deletions docs/reference/release-notes/7.0.0-alpha1.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,9 @@ The changes listed below have been released for the first time in Elasticsearch
Core::
* Tribe node has been removed in favor of Cross-Cluster-Search

Cross-Cluster-Search::
* `http_addresses` has been removed from the <<cluster-remote-info>> API
because it is expensive to fetch and no longer needed by Kibana.

Rest API::
* The Clear Cache API only supports `POST` as HTTP method
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
- match: { my_remote_cluster.num_nodes_connected: 1}
- match: { my_remote_cluster.max_connections_per_cluster: 1}
- match: { my_remote_cluster.initial_connect_timeout: "30s" }
- is_true: my_remote_cluster.http_addresses.0

---
"Add transient remote cluster based on the preset cluster and check remote info":
Expand Down Expand Up @@ -38,9 +37,6 @@

- do:
cluster.remote_info: {}
- set: { my_remote_cluster.http_addresses.0: remote_http }
- match: { test_remote_cluster.http_addresses.0: $remote_http }

- match: { test_remote_cluster.connected: true }
- match: { my_remote_cluster.connected: true }

Expand Down Expand Up @@ -132,4 +128,3 @@
transient:
search.remote.remote1.seeds: null
search.remote.remote1.skip_unavailable: null

Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import static java.util.stream.Collectors.toList;

public final class TransportRemoteInfoAction extends HandledTransportAction<RemoteInfoRequest, RemoteInfoResponse> {

private final RemoteClusterService remoteClusterService;
Expand All @@ -45,7 +47,6 @@ public TransportRemoteInfoAction(Settings settings, ThreadPool threadPool, Trans

@Override
protected void doExecute(RemoteInfoRequest remoteInfoRequest, ActionListener<RemoteInfoResponse> listener) {
remoteClusterService.getRemoteConnectionInfos(ActionListener.wrap(remoteConnectionInfos
-> listener.onResponse(new RemoteInfoResponse(remoteConnectionInfos)), listener::onFailure));
listener.onResponse(new RemoteInfoResponse(remoteClusterService.getRemoteConnectionInfos().collect(toList())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.rest.action.RestToXContentListener;

import java.io.IOException;

Expand All @@ -50,16 +51,8 @@ public String getName() {
}

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client)
throws IOException {
return channel -> client.execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest(),
new RestBuilderListener<RemoteInfoResponse>(channel) {
@Override
public RestResponse buildResponse(RemoteInfoResponse response, XContentBuilder builder) throws Exception {
response.toXContent(builder, request);
return new BytesRestResponse(RestStatus.OK, builder);
}
});
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
return channel -> client.execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest(), new RestToXContentListener<>(channel));
}
@Override
public boolean canTripCircuitBreaker() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -602,66 +603,13 @@ void addConnectedNode(DiscoveryNode node) {
}

/**
* Fetches connection info for this connection
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
*/
public void getConnectionInfo(ActionListener<RemoteConnectionInfo> listener) {
final Optional<DiscoveryNode> anyNode = connectedNodes.getAny();
if (anyNode.isPresent() == false) {
// not connected we return immediately
RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias,
Collections.emptyList(), Collections.emptyList(), maxNumRemoteConnections, 0,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable);
listener.onResponse(remoteConnectionStats);
} else {
NodesInfoRequest request = new NodesInfoRequest();
request.clear();
request.http(true);

transportService.sendRequest(anyNode.get(), NodesInfoAction.NAME, request, new TransportResponseHandler<NodesInfoResponse>() {
@Override
public NodesInfoResponse newInstance() {
return new NodesInfoResponse();
}

@Override
public void handleResponse(NodesInfoResponse response) {
Collection<TransportAddress> httpAddresses = new HashSet<>();
for (NodeInfo info : response.getNodes()) {
if (connectedNodes.contains(info.getNode()) && info.getHttp() != null) {
httpAddresses.add(info.getHttp().getAddress().publishAddress());
}
}

if (httpAddresses.size() < maxNumRemoteConnections) {
// just in case non of the connected nodes have http enabled we get other http enabled nodes instead.
for (NodeInfo info : response.getNodes()) {
if (nodePredicate.test(info.getNode()) && info.getHttp() != null) {
httpAddresses.add(info.getHttp().getAddress().publishAddress());
}
if (httpAddresses.size() == maxNumRemoteConnections) {
break; // once we have enough return...
}
}
}
RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(clusterAlias,
seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList()), new ArrayList<>(httpAddresses),
maxNumRemoteConnections, connectedNodes.size(),
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable);
listener.onResponse(remoteConnectionInfo);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}

public RemoteConnectionInfo getConnectionInfo() {
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList());
TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(),
initialConnectionTimeout, skipUnavailable);
}

int getNumNodesConnected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -42,7 +41,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -56,6 +54,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.common.settings.Setting.boolSetting;

Expand Down Expand Up @@ -348,17 +347,8 @@ public void close() throws IOException {
IOUtils.close(remoteClusters.values());
}

public void getRemoteConnectionInfos(ActionListener<Collection<RemoteConnectionInfo>> listener) {
final Map<String, RemoteClusterConnection> remoteClusters = this.remoteClusters;
if (remoteClusters.isEmpty()) {
listener.onResponse(Collections.emptyList());
} else {
final GroupedActionListener<RemoteConnectionInfo> actionListener = new GroupedActionListener<>(listener,
remoteClusters.size(), Collections.emptyList());
for (RemoteClusterConnection connection : remoteClusters.values()) {
connection.getConnectionInfo(actionListener);
}
}
public Stream<RemoteConnectionInfo> getRemoteConnectionInfos() {
return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,29 @@
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;

import static java.util.Collections.emptyList;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* This class encapsulates all remote cluster information to be rendered on
* <tt>_remote/info</tt> requests.
* {@code _remote/info} requests.
*/
public final class RemoteConnectionInfo implements ToXContentFragment, Writeable {
final List<TransportAddress> seedNodes;
final List<TransportAddress> httpAddresses;
final int connectionsPerCluster;
final TimeValue initialConnectionTimeout;
final int numNodesConnected;
final String clusterAlias;
final boolean skipUnavailable;

RemoteConnectionInfo(String clusterAlias, List<TransportAddress> seedNodes,
List<TransportAddress> httpAddresses,
int connectionsPerCluster, int numNodesConnected,
TimeValue initialConnectionTimeout, boolean skipUnavailable) {
this.clusterAlias = clusterAlias;
this.seedNodes = seedNodes;
this.httpAddresses = httpAddresses;
this.connectionsPerCluster = connectionsPerCluster;
this.numNodesConnected = numNodesConnected;
this.initialConnectionTimeout = initialConnectionTimeout;
Expand All @@ -59,16 +58,45 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable

public RemoteConnectionInfo(StreamInput input) throws IOException {
seedNodes = input.readList(TransportAddress::new);
httpAddresses = input.readList(TransportAddress::new);
if (input.getVersion().before(Version.V_7_0_0_alpha1)) {
/*
* Versions before 7.0 sent the HTTP addresses of all nodes in the
* remote cluster here but it was expensive to fetch and we
* ultimately figured out how to do without it. So we removed it.
*
* We just throw any HTTP addresses received here on the floor
* because we don't need to do anything with them.
*/
input.readList(TransportAddress::new);
}
connectionsPerCluster = input.readVInt();
initialConnectionTimeout = input.readTimeValue();
numNodesConnected = input.readVInt();
clusterAlias = input.readString();
if (input.getVersion().onOrAfter(Version.V_6_1_0)) {
skipUnavailable = input.readBoolean();
} else {
skipUnavailable = false;
skipUnavailable = input.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(seedNodes);
if (out.getVersion().before(Version.V_7_0_0_alpha1)) {
/*
* Versions before 7.0 sent the HTTP addresses of all nodes in the
* remote cluster here but it was expensive to fetch and we
* ultimately figured out how to do without it. So we removed it.
*
* When sending this request to a node that expects HTTP addresses
* here we pretend that we didn't find any. This *should* be fine
* because, after all, we haven't been using this information for
* a while.
*/
out.writeList(emptyList());
}
out.writeVInt(connectionsPerCluster);
out.writeTimeValue(initialConnectionTimeout);
out.writeVInt(numNodesConnected);
out.writeString(clusterAlias);
out.writeBoolean(skipUnavailable);
}

@Override
Expand All @@ -80,11 +108,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.value(addr.toString());
}
builder.endArray();
builder.startArray("http_addresses");
for (TransportAddress addr : httpAddresses) {
builder.value(addr.toString());
}
builder.endArray();
builder.field("connected", numNodesConnected > 0);
builder.field("num_nodes_connected", numNodesConnected);
builder.field("max_connections_per_cluster", connectionsPerCluster);
Expand All @@ -95,19 +118,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(seedNodes);
out.writeList(httpAddresses);
out.writeVInt(connectionsPerCluster);
out.writeTimeValue(initialConnectionTimeout);
out.writeVInt(numNodesConnected);
out.writeString(clusterAlias);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(skipUnavailable);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -116,15 +126,14 @@ public boolean equals(Object o) {
return connectionsPerCluster == that.connectionsPerCluster &&
numNodesConnected == that.numNodesConnected &&
Objects.equals(seedNodes, that.seedNodes) &&
Objects.equals(httpAddresses, that.httpAddresses) &&
Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) &&
Objects.equals(clusterAlias, that.clusterAlias) &&
skipUnavailable == that.skipUnavailable;
}

@Override
public int hashCode() {
return Objects.hash(seedNodes, httpAddresses, connectionsPerCluster, initialConnectionTimeout,
return Objects.hash(seedNodes, connectionsPerCluster, initialConnectionTimeout,
numNodesConnected, clusterAlias, skipUnavailable);
}
}
Loading

0 comments on commit f4ed902

Please sign in to comment.