Skip to content

Commit

Permalink
Transport client: Don't validate node in handshake (#30737)
Browse files Browse the repository at this point in the history
This is related to #30141. Right now in the transport client we open a
temporary node connection and take the node information. This node
information is used to open a permanent connection that is used for the
client. However, we continue to use the configured transport address.
If the configured transport address is a load balancer, you might
connect to a different node for the permanent connection. This causes
the handshake validation to fail. This commit removes the handshake
validation for the transport client when it simple node sample mode.
  • Loading branch information
Tim-Brooks committed May 31, 2018
1 parent d9ab146 commit 4f66b9a
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.Version;
Expand Down Expand Up @@ -56,6 +56,7 @@
import org.elasticsearch.transport.TransportService;

import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -89,6 +90,7 @@ final class TransportClientNodesService extends AbstractComponent implements Clo
private final Object mutex = new Object();

private volatile List<DiscoveryNode> nodes = Collections.emptyList();
// Filtered nodes are nodes whose cluster name does not match the configured cluster name
private volatile List<DiscoveryNode> filteredNodes = Collections.emptyList();

private final AtomicInteger tempNodeIdGenerator = new AtomicInteger();
Expand Down Expand Up @@ -268,7 +270,7 @@ public static class RetryListener<Response> implements ActionListener<Response>
private volatile int i;

RetryListener(NodeListenerCallback<Response> callback, ActionListener<Response> listener,
List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
List<DiscoveryNode> nodes, int index, TransportClient.HostFailureListener hostFailureListener) {
this.callback = callback;
this.listener = listener;
this.nodes = nodes;
Expand Down Expand Up @@ -361,10 +363,10 @@ public void sample() {
protected abstract void doSample();

/**
* validates a set of potentially newly discovered nodes and returns an immutable
* list of the nodes that has passed.
* Establishes the node connections. If validateInHandshake is set to true, the connection will fail if
* node returned in the handshake response is different than the discovery node.
*/
protected List<DiscoveryNode> validateNewNodes(Set<DiscoveryNode> nodes) {
List<DiscoveryNode> establishNodeConnections(Set<DiscoveryNode> nodes) {
for (Iterator<DiscoveryNode> it = nodes.iterator(); it.hasNext(); ) {
DiscoveryNode node = it.next();
if (!transportService.nodeConnected(node)) {
Expand All @@ -380,7 +382,6 @@ protected List<DiscoveryNode> validateNewNodes(Set<DiscoveryNode> nodes) {

return Collections.unmodifiableList(new ArrayList<>(nodes));
}

}

class ScheduledNodeSampler implements Runnable {
Expand All @@ -402,14 +403,16 @@ class SimpleNodeSampler extends NodeSampler {
@Override
protected void doSample() {
HashSet<DiscoveryNode> newNodes = new HashSet<>();
HashSet<DiscoveryNode> newFilteredNodes = new HashSet<>();
ArrayList<DiscoveryNode> newFilteredNodes = new ArrayList<>();
for (DiscoveryNode listedNode : listedNodes) {
try (Transport.Connection connection = transportService.openConnection(listedNode, LISTED_NODES_PROFILE)){
final PlainTransportFuture<LivenessResponse> handler = new PlainTransportFuture<>(
new FutureTransportResponseHandler<LivenessResponse>() {
@Override
public LivenessResponse newInstance() {
return new LivenessResponse();
public LivenessResponse read(StreamInput in) throws IOException {
LivenessResponse response = new LivenessResponse();
response.readFrom(in);
return response;
}
});
transportService.sendRequest(connection, TransportLivenessAction.NAME, new LivenessRequest(),
Expand All @@ -435,8 +438,8 @@ public LivenessResponse newInstance() {
}
}

nodes = validateNewNodes(newNodes);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
nodes = establishNodeConnections(newNodes);
filteredNodes = Collections.unmodifiableList(newFilteredNodes);
}
}

Expand Down Expand Up @@ -557,7 +560,7 @@ public void handleException(TransportException e) {
}
}

nodes = validateNewNodes(newNodes);
nodes = establishNodeConnections(newNodes);
filteredNodes = Collections.unmodifiableList(new ArrayList<>(newFilteredNodes));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.node.liveness.TransportLivenessAction;
Expand Down Expand Up @@ -124,6 +126,8 @@ protected boolean removeEldestEntry(Map.Entry eldest) {

private final RemoteClusterService remoteClusterService;

private final boolean validateConnections;

/** if set will call requests sent to this id to shortcut and executed locally */
volatile DiscoveryNode localNode = null;
private final Transport.Connection localNodeConnection = new Transport.Connection() {
Expand Down Expand Up @@ -153,6 +157,9 @@ public TransportService(Settings settings, Transport transport, ThreadPool threa
Function<BoundTransportAddress, DiscoveryNode> localNodeFactory, @Nullable ClusterSettings clusterSettings,
Set<String> taskHeaders) {
super(settings);
// The only time we do not want to validate node connections is when this is a transport client using the simple node sampler
this.validateConnections = TransportClient.CLIENT_TYPE.equals(settings.get(Client.CLIENT_TYPE_SETTING_S.getKey())) == false ||
TransportClient.CLIENT_TRANSPORT_SNIFF.get(settings);
this.transport = transport;
this.threadPool = threadPool;
this.localNodeFactory = localNodeFactory;
Expand Down Expand Up @@ -314,6 +321,11 @@ public boolean nodeConnected(DiscoveryNode node) {
return isLocalNode(node) || transport.nodeConnected(node);
}

/**
* Connect to the specified node with the default connection profile
*
* @param node the node to connect to
*/
public void connectToNode(DiscoveryNode node) throws ConnectTransportException {
connectToNode(node, null);
}
Expand All @@ -331,7 +343,7 @@ public void connectToNode(final DiscoveryNode node, ConnectionProfile connection
transport.connectToNode(node, connectionProfile, (newConnection, actualProfile) -> {
// We don't validate cluster names to allow for CCS connections.
final DiscoveryNode remote = handshake(newConnection, actualProfile.getHandshakeTimeout().millis(), cn -> true);
if (node.equals(remote) == false) {
if (validateConnections && node.equals(remote) == false) {
throw new ConnectTransportException(node, "handshake failed. unexpected remote node " + remote);
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
package org.elasticsearch.transport;

import org.elasticsearch.Version;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.network.NetworkService;
Expand Down Expand Up @@ -178,6 +180,42 @@ public void testNodeConnectWithDifferentNodeId() {
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}

public void testNodeConnectWithDifferentNodeIdSucceedsIfThisIsTransportClientOfSimpleNodeSampler() {
Settings.Builder settings = Settings.builder().put("cluster.name", "test");
Settings transportClientSettings = settings.put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE).build();
NetworkHandle handleA = startServices("TS_A", transportClientSettings, Version.CURRENT);
NetworkHandle handleB = startServices("TS_B", settings.build(), Version.CURRENT);
DiscoveryNode discoveryNode = new DiscoveryNode(
randomAlphaOfLength(10),
handleB.discoveryNode.getAddress(),
emptyMap(),
emptySet(),
handleB.discoveryNode.getVersion());

handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE);
assertTrue(handleA.transportService.nodeConnected(discoveryNode));
}

public void testNodeConnectWithDifferentNodeIdFailsWhenSnifferTransportClient() {
Settings.Builder settings = Settings.builder().put("cluster.name", "test");
Settings transportClientSettings = settings.put(Client.CLIENT_TYPE_SETTING_S.getKey(), TransportClient.CLIENT_TYPE)
.put(TransportClient.CLIENT_TRANSPORT_SNIFF.getKey(), true)
.build();
NetworkHandle handleA = startServices("TS_A", transportClientSettings, Version.CURRENT);
NetworkHandle handleB = startServices("TS_B", settings.build(), Version.CURRENT);
DiscoveryNode discoveryNode = new DiscoveryNode(
randomAlphaOfLength(10),
handleB.discoveryNode.getAddress(),
emptyMap(),
emptySet(),
handleB.discoveryNode.getVersion());
ConnectTransportException ex = expectThrows(ConnectTransportException.class, () -> {
handleA.transportService.connectToNode(discoveryNode, MockTcpTransport.LIGHT_PROFILE);
});
assertThat(ex.getMessage(), containsString("unexpected remote node"));
assertFalse(handleA.transportService.nodeConnected(discoveryNode));
}


private static class NetworkHandle {
private TransportService transportService;
Expand Down

0 comments on commit 4f66b9a

Please sign in to comment.