Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CCS: Drop http address from remote cluster info #29568

Merged
merged 5 commits into from
Apr 27, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions docs/reference/cluster/remote-info.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ the configured remote cluster alias.
`seeds`::
The configured initial seed transport addresses of the remote cluster.

`http_addresses`::
The published http addresses of all connected remote nodes.

`connected`::
True if there is at least one connection to the remote cluster.

Expand Down
6 changes: 5 additions & 1 deletion docs/reference/release-notes/7.0.0-alpha1.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,8 @@ The changes listed below have been released for the first time in Elasticsearch
=== Breaking changes

Core::
* Tribe node has been removed in favor of Cross-Cluster-Search
* Tribe node has been removed in favor of Cross-Cluster-Search

Cross-Cluster-Search::
* `http_addresses` has been removed from the <<cluster-remote-info>> API
because it is expensive to fetch and no longer needed by Kibana.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;

import static java.util.stream.Collectors.toList;

public final class TransportRemoteInfoAction extends HandledTransportAction<RemoteInfoRequest, RemoteInfoResponse> {

private final RemoteClusterService remoteClusterService;
Expand All @@ -45,7 +47,6 @@ public TransportRemoteInfoAction(Settings settings, ThreadPool threadPool, Trans

@Override
protected void doExecute(RemoteInfoRequest remoteInfoRequest, ActionListener<RemoteInfoResponse> listener) {
remoteClusterService.getRemoteConnectionInfos(ActionListener.wrap(remoteConnectionInfos
-> listener.onResponse(new RemoteInfoResponse(remoteConnectionInfos)), listener::onFailure));
listener.onResponse(new RemoteInfoResponse(remoteClusterService.getRemoteConnectionInfos().collect(toList())));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.elasticsearch.rest.RestResponse;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestBuilderListener;
import org.elasticsearch.rest.action.RestToXContentListener;

import java.io.IOException;

Expand All @@ -50,16 +51,8 @@ public String getName() {
}

@Override
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client)
throws IOException {
return channel -> client.execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest(),
new RestBuilderListener<RemoteInfoResponse>(channel) {
@Override
public RestResponse buildResponse(RemoteInfoResponse response, XContentBuilder builder) throws Exception {
response.toXContent(builder, request);
return new BytesRestResponse(RestStatus.OK, builder);
}
});
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
return channel -> client.execute(RemoteInfoAction.INSTANCE, new RemoteInfoRequest(), new RestToXContentListener<>(channel));
}
@Override
public boolean canTripCircuitBreaker() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.CancellableThreads;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ThreadContext;
Expand Down Expand Up @@ -602,66 +603,13 @@ void addConnectedNode(DiscoveryNode node) {
}

/**
* Fetches connection info for this connection
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
*/
public void getConnectionInfo(ActionListener<RemoteConnectionInfo> listener) {
final Optional<DiscoveryNode> anyNode = connectedNodes.getAny();
if (anyNode.isPresent() == false) {
// not connected we return immediately
RemoteConnectionInfo remoteConnectionStats = new RemoteConnectionInfo(clusterAlias,
Collections.emptyList(), Collections.emptyList(), maxNumRemoteConnections, 0,
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable);
listener.onResponse(remoteConnectionStats);
} else {
NodesInfoRequest request = new NodesInfoRequest();
request.clear();
request.http(true);

transportService.sendRequest(anyNode.get(), NodesInfoAction.NAME, request, new TransportResponseHandler<NodesInfoResponse>() {
@Override
public NodesInfoResponse newInstance() {
return new NodesInfoResponse();
}

@Override
public void handleResponse(NodesInfoResponse response) {
Collection<TransportAddress> httpAddresses = new HashSet<>();
for (NodeInfo info : response.getNodes()) {
if (connectedNodes.contains(info.getNode()) && info.getHttp() != null) {
httpAddresses.add(info.getHttp().getAddress().publishAddress());
}
}

if (httpAddresses.size() < maxNumRemoteConnections) {
// just in case non of the connected nodes have http enabled we get other http enabled nodes instead.
for (NodeInfo info : response.getNodes()) {
if (nodePredicate.test(info.getNode()) && info.getHttp() != null) {
httpAddresses.add(info.getHttp().getAddress().publishAddress());
}
if (httpAddresses.size() == maxNumRemoteConnections) {
break; // once we have enough return...
}
}
}
RemoteConnectionInfo remoteConnectionInfo = new RemoteConnectionInfo(clusterAlias,
seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList()), new ArrayList<>(httpAddresses),
maxNumRemoteConnections, connectedNodes.size(),
RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings), skipUnavailable);
listener.onResponse(remoteConnectionInfo);
}

@Override
public void handleException(TransportException exp) {
listener.onFailure(exp);
}

@Override
public String executor() {
return ThreadPool.Names.SAME;
}
});
}

public RemoteConnectionInfo getConnectionInfo() {
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList());
TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(),
initialConnectionTimeout, skipUnavailable);
}

int getNumNodesConnected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsRequest;
import org.elasticsearch.action.admin.cluster.shards.ClusterSearchShardsResponse;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -42,7 +41,6 @@
import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand All @@ -56,6 +54,7 @@
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.elasticsearch.common.settings.Setting.boolSetting;

Expand Down Expand Up @@ -348,17 +347,8 @@ public void close() throws IOException {
IOUtils.close(remoteClusters.values());
}

public void getRemoteConnectionInfos(ActionListener<Collection<RemoteConnectionInfo>> listener) {
final Map<String, RemoteClusterConnection> remoteClusters = this.remoteClusters;
if (remoteClusters.isEmpty()) {
listener.onResponse(Collections.emptyList());
} else {
final GroupedActionListener<RemoteConnectionInfo> actionListener = new GroupedActionListener<>(listener,
remoteClusters.size(), Collections.emptyList());
for (RemoteClusterConnection connection : remoteClusters.values()) {
connection.getConnectionInfo(actionListener);
}
}
public Stream<RemoteConnectionInfo> getRemoteConnectionInfos() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm returning a Stream here because I read https://stackoverflow.com/a/24679745/233833 yesterday. The caller always wants a List but since we have a stream here it seems sane to return the stream and let the caller do what they want with it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: what difference does it make given that the only caller calls toList on this Stream all the time?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

None. I wouldn't go out of my way to return a Stream if I didn't already have one. I just figured, since I do, I may as well return it.

return remoteClusters.values().stream().map(RemoteClusterConnection::getConnectionInfo);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,30 +27,29 @@
import org.elasticsearch.common.xcontent.ToXContentFragment;
import org.elasticsearch.common.xcontent.XContentBuilder;

import static java.util.Collections.emptyList;

import java.io.IOException;
import java.util.List;
import java.util.Objects;

/**
* This class encapsulates all remote cluster information to be rendered on
* <tt>_remote/info</tt> requests.
* {@code _remote/info} requests.
*/
public final class RemoteConnectionInfo implements ToXContentFragment, Writeable {
final List<TransportAddress> seedNodes;
final List<TransportAddress> httpAddresses;
final int connectionsPerCluster;
final TimeValue initialConnectionTimeout;
final int numNodesConnected;
final String clusterAlias;
final boolean skipUnavailable;

RemoteConnectionInfo(String clusterAlias, List<TransportAddress> seedNodes,
List<TransportAddress> httpAddresses,
int connectionsPerCluster, int numNodesConnected,
TimeValue initialConnectionTimeout, boolean skipUnavailable) {
this.clusterAlias = clusterAlias;
this.seedNodes = seedNodes;
this.httpAddresses = httpAddresses;
this.connectionsPerCluster = connectionsPerCluster;
this.numNodesConnected = numNodesConnected;
this.initialConnectionTimeout = initialConnectionTimeout;
Expand All @@ -59,16 +58,45 @@ public final class RemoteConnectionInfo implements ToXContentFragment, Writeable

public RemoteConnectionInfo(StreamInput input) throws IOException {
seedNodes = input.readList(TransportAddress::new);
httpAddresses = input.readList(TransportAddress::new);
if (input.getVersion().before(Version.V_7_0_0_alpha1)) {
/*
* Versions before 7.0 sent the HTTP addresses of all nodes in the
* remote cluster here but it was expensive to fetch and we
* ultimately figured out how to do without it. So we removed it.
*
* We just throw any HTTP addresses received here on the floor
* because we don't need to do anything with them.
*/
input.readList(TransportAddress::new);
}
connectionsPerCluster = input.readVInt();
initialConnectionTimeout = input.readTimeValue();
numNodesConnected = input.readVInt();
clusterAlias = input.readString();
if (input.getVersion().onOrAfter(Version.V_6_1_0)) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dropped this because I'm not backporting this change to 6.x because it is breaking so I figured this was a good time to remove the bwc for 6.0.x

skipUnavailable = input.readBoolean();
} else {
skipUnavailable = false;
skipUnavailable = input.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(seedNodes);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved this next to the ctor because I like to be able to read the read and write on a single screen if possible.

if (out.getVersion().before(Version.V_7_0_0_alpha1)) {
/*
* Versions before 7.0 sent the HTTP addresses of all nodes in the
* remote cluster here but it was expensive to fetch and we
* ultimately figured out how to do without it. So we removed it.
*
* When sending this request to a node that expects HTTP addresses
* here we pretend that we didn't find any. This *should* be fine
* because, after all, we haven't been using this information for
* a while.
*/
out.writeList(emptyList());
}
out.writeVInt(connectionsPerCluster);
out.writeTimeValue(initialConnectionTimeout);
out.writeVInt(numNodesConnected);
out.writeString(clusterAlias);
out.writeBoolean(skipUnavailable);
}

@Override
Expand All @@ -80,11 +108,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.value(addr.toString());
}
builder.endArray();
builder.startArray("http_addresses");
for (TransportAddress addr : httpAddresses) {
builder.value(addr.toString());
}
builder.endArray();
builder.field("connected", numNodesConnected > 0);
builder.field("num_nodes_connected", numNodesConnected);
builder.field("max_connections_per_cluster", connectionsPerCluster);
Expand All @@ -95,19 +118,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
return builder;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeList(seedNodes);
out.writeList(httpAddresses);
out.writeVInt(connectionsPerCluster);
out.writeTimeValue(initialConnectionTimeout);
out.writeVInt(numNodesConnected);
out.writeString(clusterAlias);
if (out.getVersion().onOrAfter(Version.V_6_1_0)) {
out.writeBoolean(skipUnavailable);
}
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand All @@ -116,15 +126,14 @@ public boolean equals(Object o) {
return connectionsPerCluster == that.connectionsPerCluster &&
numNodesConnected == that.numNodesConnected &&
Objects.equals(seedNodes, that.seedNodes) &&
Objects.equals(httpAddresses, that.httpAddresses) &&
Objects.equals(initialConnectionTimeout, that.initialConnectionTimeout) &&
Objects.equals(clusterAlias, that.clusterAlias) &&
skipUnavailable == that.skipUnavailable;
}

@Override
public int hashCode() {
return Objects.hash(seedNodes, httpAddresses, connectionsPerCluster, initialConnectionTimeout,
return Objects.hash(seedNodes, connectionsPerCluster, initialConnectionTimeout,
numNodesConnected, clusterAlias, skipUnavailable);
}
}
Loading