Skip to content

Commit

Permalink
NETWORKING: Make RemoteClusterConn. Lazy Resolve DNS (#32764)
Browse files Browse the repository at this point in the history
* Lazy resolve DNS (i.e. `String` to `DiscoveryNode`) to not run into indefinitely caching lookup issues (provided the JVM dns cache is configured correctly as explained in https://www.elastic.co/guide/en/elasticsearch/reference/6.3/networkaddress-cache-ttl.html)
   * Changed `InetAddress` type to `String` for that higher up the stack
   * Passed down `Supplier<DiscoveryNode>` instead of outright `DiscoveryNode` from `RemoteClusterAware#buildRemoteClustersSeeds` on to lazy resolve DNS when the `DiscoveryNode` is actually used (could've also passed down the value of `clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting)` together with the `List<String>` of hosts, but this route seemed to introduce less duplication and resulted in a significantly smaller changeset).
* Closes #28858
  • Loading branch information
original-brownbear committed Aug 18, 2018
1 parent 532d552 commit f82bb64
Show file tree
Hide file tree
Showing 6 changed files with 153 additions and 102 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;

import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.metadata.ClusterNameExpressionResolver;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand Down Expand Up @@ -48,9 +49,20 @@ public abstract class RemoteClusterAware extends AbstractComponent {
/**
* A list of initial seed nodes to discover eligible nodes from the remote cluster
*/
public static final Setting.AffixSetting<List<InetSocketAddress>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting("search.remote.",
"seeds", (key) -> Setting.listSetting(key, Collections.emptyList(), RemoteClusterAware::parseSeedAddress,
Setting.Property.NodeScope, Setting.Property.Dynamic));
public static final Setting.AffixSetting<List<String>> REMOTE_CLUSTERS_SEEDS = Setting.affixKeySetting(
"search.remote.",
"seeds",
key -> Setting.listSetting(
key, Collections.emptyList(),
s -> {
// validate seed address
parsePort(s);
return s;
},
Setting.Property.NodeScope,
Setting.Property.Dynamic
)
);
public static final char REMOTE_CLUSTER_INDEX_SEPARATOR = ':';
public static final String LOCAL_CLUSTER_GROUP_KEY = "";

Expand All @@ -65,18 +77,20 @@ protected RemoteClusterAware(Settings settings) {
this.clusterNameResolver = new ClusterNameExpressionResolver(settings);
}

protected static Map<String, List<DiscoveryNode>> buildRemoteClustersSeeds(Settings settings) {
Stream<Setting<List<InetSocketAddress>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
protected static Map<String, List<Supplier<DiscoveryNode>>> buildRemoteClustersSeeds(Settings settings) {
Stream<Setting<List<String>>> allConcreteSettings = REMOTE_CLUSTERS_SEEDS.getAllConcreteSettings(settings);
return allConcreteSettings.collect(
Collectors.toMap(REMOTE_CLUSTERS_SEEDS::getNamespace, concreteSetting -> {
String clusterName = REMOTE_CLUSTERS_SEEDS.getNamespace(concreteSetting);
List<DiscoveryNode> nodes = new ArrayList<>();
for (InetSocketAddress address : concreteSetting.get(settings)) {
TransportAddress transportAddress = new TransportAddress(address);
DiscoveryNode node = new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
nodes.add(node);
List<String> addresses = concreteSetting.get(settings);
List<Supplier<DiscoveryNode>> nodes = new ArrayList<>(addresses.size());
for (String address : addresses) {
nodes.add(() -> {
TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
return new DiscoveryNode(clusterName + "#" + transportAddress.toString(),
transportAddress,
Version.CURRENT.minimumCompatibilityVersion());
});
}
return nodes;
}));
Expand Down Expand Up @@ -128,7 +142,7 @@ public Map<String, List<String>> groupClusterIndices(String[] requestIndices, Pr
* Subclasses must implement this to receive information about updated cluster aliases. If the given address list is
* empty the cluster alias is unregistered and should be removed.
*/
protected abstract void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses);
protected abstract void updateRemoteCluster(String clusterAlias, List<String> addresses);

/**
* Registers this instance to listen to updates on the cluster settings.
Expand All @@ -138,27 +152,35 @@ public void listenForUpdates(ClusterSettings clusterSettings) {
(namespace, value) -> {});
}

private static InetSocketAddress parseSeedAddress(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
String host = remoteHost.substring(0, portSeparator);
protected static InetSocketAddress parseSeedAddress(String remoteHost) {
String host = remoteHost.substring(0, indexOfPortSeparator(remoteHost));
InetAddress hostAddress;
try {
hostAddress = InetAddress.getByName(host);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unknown host [" + host + "]", e);
}
return new InetSocketAddress(hostAddress, parsePort(remoteHost));
}

private static int parsePort(String remoteHost) {
try {
int port = Integer.valueOf(remoteHost.substring(portSeparator + 1));
int port = Integer.valueOf(remoteHost.substring(indexOfPortSeparator(remoteHost) + 1));
if (port <= 0) {
throw new IllegalArgumentException("port number must be > 0 but was: [" + port + "]");
}
return new InetSocketAddress(hostAddress, port);
return port;
} catch (NumberFormatException e) {
throw new IllegalArgumentException("port must be a number", e);
throw new IllegalArgumentException("failed to parse port", e);
}
}

private static int indexOfPortSeparator(String remoteHost) {
int portSeparator = remoteHost.lastIndexOf(':'); // in case we have a IPv6 address ie. [::1]:9300
if (portSeparator == -1 || portSeparator == remoteHost.length()) {
throw new IllegalArgumentException("remote hosts need to be configured as [host:port], found [" + remoteHost + "] instead");
}
return portSeparator;
}

public static String buildRemoteIndexName(String clusterAlias, String indexName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;

import java.util.function.Supplier;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.util.SetOnce;
Expand Down Expand Up @@ -84,7 +85,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
private final String clusterAlias;
private final int maxNumRemoteConnections;
private final Predicate<DiscoveryNode> nodePredicate;
private volatile List<DiscoveryNode> seedNodes;
private volatile List<Supplier<DiscoveryNode>> seedNodes;
private volatile boolean skipUnavailable;
private final ConnectHandler connectHandler;
private SetOnce<ClusterName> remoteClusterName = new SetOnce<>();
Expand All @@ -99,7 +100,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
* @param maxNumRemoteConnections the maximum number of connections to the remote cluster
* @param nodePredicate a predicate to filter eligible remote nodes to connect to
*/
RemoteClusterConnection(Settings settings, String clusterAlias, List<DiscoveryNode> seedNodes,
RemoteClusterConnection(Settings settings, String clusterAlias, List<Supplier<DiscoveryNode>> seedNodes,
TransportService transportService, int maxNumRemoteConnections, Predicate<DiscoveryNode> nodePredicate) {
super(settings);
this.localClusterName = ClusterName.CLUSTER_NAME_SETTING.get(settings);
Expand Down Expand Up @@ -127,7 +128,7 @@ final class RemoteClusterConnection extends AbstractComponent implements Transpo
/**
* Updates the list of seed nodes for this cluster connection
*/
synchronized void updateSeedNodes(List<DiscoveryNode> seedNodes, ActionListener<Void> connectListener) {
synchronized void updateSeedNodes(List<Supplier<DiscoveryNode>> seedNodes, ActionListener<Void> connectListener) {
this.seedNodes = Collections.unmodifiableList(new ArrayList<>(seedNodes));
connectHandler.connect(connectListener);
}
Expand Down Expand Up @@ -456,15 +457,15 @@ protected void doRun() {
});
}

void collectRemoteNodes(Iterator<DiscoveryNode> seedNodes,
private void collectRemoteNodes(Iterator<Supplier<DiscoveryNode>> seedNodes,
final TransportService transportService, ActionListener<Void> listener) {
if (Thread.currentThread().isInterrupted()) {
listener.onFailure(new InterruptedException("remote connect thread got interrupted"));
}
try {
if (seedNodes.hasNext()) {
cancellableThreads.executeIO(() -> {
final DiscoveryNode seedNode = seedNodes.next();
final DiscoveryNode seedNode = seedNodes.next().get();
final TransportService.HandshakeResponse handshakeResponse;
Transport.Connection connection = transportService.openConnection(seedNode,
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, null, null));
Expand Down Expand Up @@ -554,11 +555,11 @@ private class SniffClusterStateResponseHandler implements TransportResponseHandl
private final TransportService transportService;
private final Transport.Connection connection;
private final ActionListener<Void> listener;
private final Iterator<DiscoveryNode> seedNodes;
private final Iterator<Supplier<DiscoveryNode>> seedNodes;
private final CancellableThreads cancellableThreads;

SniffClusterStateResponseHandler(TransportService transportService, Transport.Connection connection,
ActionListener<Void> listener, Iterator<DiscoveryNode> seedNodes,
ActionListener<Void> listener, Iterator<Supplier<DiscoveryNode>> seedNodes,
CancellableThreads cancellableThreads) {
this.transportService = transportService;
this.connection = connection;
Expand Down Expand Up @@ -651,7 +652,7 @@ void addConnectedNode(DiscoveryNode node) {
* Get the information about remote nodes to be rendered on {@code _remote/info} requests.
*/
public RemoteConnectionInfo getConnectionInfo() {
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(DiscoveryNode::getAddress).collect(Collectors.toList());
List<TransportAddress> seedNodeAddresses = seedNodes.stream().map(node -> node.get().getAddress()).collect(Collectors.toList());
TimeValue initialConnectionTimeout = RemoteClusterService.REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
return new RemoteConnectionInfo(clusterAlias, seedNodeAddresses, maxNumRemoteConnections, connectedNodes.size(),
initialConnectionTimeout, skipUnavailable);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.transport;

import java.util.function.Supplier;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
Expand All @@ -40,7 +41,6 @@

import java.io.Closeable;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -115,7 +115,8 @@ public final class RemoteClusterService extends RemoteClusterAware implements Cl
* @param seeds a cluster alias to discovery node mapping representing the remote clusters seeds nodes
* @param connectionListener a listener invoked once every configured cluster has been connected to
*/
private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>> seeds, ActionListener<Void> connectionListener) {
private synchronized void updateRemoteClusters(Map<String, List<Supplier<DiscoveryNode>>> seeds,
ActionListener<Void> connectionListener) {
if (seeds.containsKey(LOCAL_CLUSTER_GROUP_KEY)) {
throw new IllegalArgumentException("remote clusters must not have the empty string as its key");
}
Expand All @@ -125,7 +126,7 @@ private synchronized void updateRemoteClusters(Map<String, List<DiscoveryNode>>
} else {
CountDown countDown = new CountDown(seeds.size());
remoteClusters.putAll(this.remoteClusters);
for (Map.Entry<String, List<DiscoveryNode>> entry : seeds.entrySet()) {
for (Map.Entry<String, List<Supplier<DiscoveryNode>>> entry : seeds.entrySet()) {
RemoteClusterConnection remote = this.remoteClusters.get(entry.getKey());
if (entry.getValue().isEmpty()) { // with no seed nodes we just remove the connection
try {
Expand Down Expand Up @@ -310,16 +311,17 @@ synchronized void updateSkipUnavailable(String clusterAlias, Boolean skipUnavail
}
}

protected void updateRemoteCluster(String clusterAlias, List<InetSocketAddress> addresses) {
@Override
protected void updateRemoteCluster(String clusterAlias, List<String> addresses) {
updateRemoteCluster(clusterAlias, addresses, ActionListener.wrap((x) -> {}, (x) -> {}));
}

void updateRemoteCluster(
final String clusterAlias,
final List<InetSocketAddress> addresses,
final List<String> addresses,
final ActionListener<Void> connectionListener) {
final List<DiscoveryNode> nodes = addresses.stream().map(address -> {
final TransportAddress transportAddress = new TransportAddress(address);
final List<Supplier<DiscoveryNode>> nodes = addresses.stream().<Supplier<DiscoveryNode>>map(address -> () -> {
final TransportAddress transportAddress = new TransportAddress(RemoteClusterAware.parseSeedAddress(address));
final String id = clusterAlias + "#" + transportAddress.toString();
final Version version = Version.CURRENT.minimumCompatibilityVersion();
return new DiscoveryNode(id, transportAddress, version);
Expand All @@ -334,7 +336,7 @@ void updateRemoteCluster(
void initializeRemoteClusters() {
final TimeValue timeValue = REMOTE_INITIAL_CONNECTION_TIMEOUT_SETTING.get(settings);
final PlainActionFuture<Void> future = new PlainActionFuture<>();
Map<String, List<DiscoveryNode>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings);
Map<String, List<Supplier<DiscoveryNode>>> seeds = RemoteClusterAware.buildRemoteClustersSeeds(settings);
updateRemoteClusters(seeds, future);
try {
future.get(timeValue.millis(), TimeUnit.MILLISECONDS);
Expand Down
Loading

0 comments on commit f82bb64

Please sign in to comment.