Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Avoid caching presto worker nodes #465

Merged
merged 3 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.net.URI;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.google.common.base.Preconditions.checkState;
Expand Down Expand Up @@ -401,7 +401,7 @@ public BlockLocation[] getFileBlockLocations(FileStatus file, long start, long l
return fs.getFileBlockLocations(file, start, len);
}

List<String> nodes = clusterManager.getNodes();
Set<String> nodes = clusterManager.getNodes();

if (nodes == null) {
return fs.getFileBlockLocations(file, start, len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,25 @@
*/
package com.qubole.rubix.core.utils;

import com.qubole.rubix.spi.ClusterManager;
import com.qubole.rubix.spi.AsyncClusterManager;
import com.qubole.rubix.spi.ClusterType;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

/**
* Created by Abhishek on 6/8/18.
*/
public class DummyClusterManager extends ClusterManager
public class DummyClusterManager extends AsyncClusterManager
{
@Override
public List<String> getNodesInternal()
public Set<String> getNodesInternal()
{
List<String> list = new ArrayList<String>();
Set<String> list = new HashSet<>();
String hostName = "";
try {
hostName = InetAddress.getLocalHost().getCanonicalHostName();
Expand All @@ -51,6 +53,6 @@ public ClusterType getClusterType()
@Override
public String getCurrentNodeName()
{
return getNodes().get(0);
return getNodes().iterator().next();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,32 +12,41 @@
*/
package com.qubole.rubix.core.utils;

import com.qubole.rubix.spi.ClusterManager;
import com.qubole.rubix.spi.AsyncClusterManager;
import com.qubole.rubix.spi.ClusterType;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

public class DummyClusterManagerMultinode extends ClusterManager
public class DummyClusterManagerMultinode extends AsyncClusterManager
{
@Override
public List<String> getNodesInternal()
private final String currentNode;
private final String otherNode;
private final Set<String> nodes = new HashSet<>();

public DummyClusterManagerMultinode()
{
List<String> list = new ArrayList<String>();
String hostName = "";
String currentNode;
try {
hostName = InetAddress.getLocalHost().getCanonicalHostName();
currentNode = InetAddress.getLocalHost().getCanonicalHostName();
}
catch (UnknownHostException e) {
hostName = "localhost";
currentNode = "localhost";
}
this.currentNode = currentNode;
nodes.add(currentNode);
this.otherNode = currentNode + "_copy";
nodes.add(otherNode);
}

list.add(hostName);
list.add(hostName + "_copy");

return list;
@Override
public Set<String> getNodesInternal()
{
return nodes;
}

@Override
Expand All @@ -49,11 +58,11 @@ public ClusterType getClusterType()
@Override
public String getCurrentNodeName()
{
return getNodes().get(0);
return currentNode;
}

public String locateKey(String key)
{
return getNodes().get(1);
return otherNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,18 @@
package com.qubole.rubix.core.utils;

import com.google.common.collect.Lists;
import com.qubole.rubix.spi.ClusterManager;
import com.google.common.collect.Sets;
import com.qubole.rubix.spi.AsyncClusterManager;
import com.qubole.rubix.spi.ClusterType;

import java.util.List;
import java.util.Set;

public class DockerTestClusterManager extends ClusterManager
public class DockerTestClusterManager extends AsyncClusterManager
{
@Override
public List<String> getNodesInternal()
public Set<String> getNodesInternal()
{
return Lists.newArrayList("172.18.8.1", "172.18.8.2");
return Sets.newHashSet("172.18.8.1", "172.18.8.2");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,12 @@
*/
package com.qubole.rubix.hadoop2;

import com.google.api.client.util.Sets;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.qubole.rubix.spi.ClusterManager;
import com.qubole.rubix.spi.AsyncClusterManager;
import com.qubole.rubix.spi.ClusterType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand All @@ -30,7 +32,7 @@
/**
* Created by sakshia on 28/7/16.
*/
public class Hadoop2ClusterManager extends ClusterManager
public class Hadoop2ClusterManager extends AsyncClusterManager
{
YarnConfiguration yconf;
private Log log = LogFactory.getLog(Hadoop2ClusterManager.class);
Expand All @@ -44,7 +46,7 @@ public void initialize(Configuration conf)
}

@Override
public List<String> getNodesInternal()
public Set<String> getNodesInternal()
{
try {
List<Hadoop2ClusterManagerUtil.Node> allNodes = Hadoop2ClusterManagerUtil.getAllNodes(yconf);
Expand All @@ -53,7 +55,7 @@ public List<String> getNodesInternal()
}

if (allNodes.isEmpty()) {
return ImmutableList.of();
return ImmutableSet.of();
}

Set<String> hosts = new HashSet<>();
Expand All @@ -70,8 +72,7 @@ public List<String> getNodesInternal()
throw new Exception("No healthy data nodes found.");
}

List<String> hostList = Lists.newArrayList(hosts.toArray(new String[0]));
return hostList;
return ImmutableSet.copyOf(hosts);
}
catch (Exception e) {
throw Throwables.propagate(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Set;

import static org.testng.Assert.assertTrue;

Expand Down Expand Up @@ -66,13 +66,13 @@ static ClusterManager buildClusterManager()
public void testGetNodes_multipleWorkers()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleRunningWorkers(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 2, "Should only have two nodes");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
nodeHostnames.get(1).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand All @@ -82,12 +82,12 @@ public void testGetNodes_multipleWorkers()
public void testGetNodes_oneWorker()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new OneRunningWorker(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 1, "Should only have one node");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1));
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1));
}

@Test
Expand All @@ -97,13 +97,13 @@ public void testGetNodes_oneWorker()
public void testGetNodes_oneNewWorker()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneNew(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 2, "Should only have two nodes");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
nodeHostnames.get(1).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand All @@ -113,13 +113,13 @@ public void testGetNodes_oneNewWorker()
public void testGetNodes_oneRebootedWorker()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneRebooted(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 2, "Should only have two nodes");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
nodeHostnames.get(1).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_1) &&
nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand All @@ -129,12 +129,12 @@ public void testGetNodes_oneRebootedWorker()
public void testMasterOnlyCluster()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new NoWorkers(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 1, "Should have added localhost in list");
assertTrue(nodeHostnames.get(0).equals(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname");
assertTrue(nodeHostnames.contains(InetAddress.getLocalHost().getHostAddress()), "Not added right hostname");
}

@Test
Expand All @@ -144,12 +144,12 @@ public void testMasterOnlyCluster()
public void testUnhealthyNodeCluster_decommissioned()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneDecommissioned(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 1, "Should only have one node");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand All @@ -159,12 +159,12 @@ public void testUnhealthyNodeCluster_decommissioned()
public void testUnhealthyNodeCluster_decommissioning()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneDecommissioning(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 1, "Should only have one node");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand All @@ -174,12 +174,12 @@ public void testUnhealthyNodeCluster_decommissioning()
public void testUnhealthyNodeCluster_lost()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneLost(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 1, "Should only have one node");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand All @@ -189,12 +189,12 @@ public void testUnhealthyNodeCluster_lost()
public void testUnhealthyNodeCluster_unhealthy()
throws IOException
{
final List<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
final Set<String> nodeHostnames = TestHadoop2ClusterManagerUtil.getNodeHostnamesFromCluster(
TestHadoop2ClusterManagerUtil.CLUSTER_NODES_ENDPOINT,
worker.new MultipleWorkersOneUnhealthy(), conf, ClusterType.HADOOP2_CLUSTER_MANAGER);

assertTrue(nodeHostnames.size() == 1, "Should only have one node");
assertTrue(nodeHostnames.get(0).equals(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
assertTrue(nodeHostnames.contains(TestHadoop2ClusterManagerUtil.WORKER_HOSTNAME_2), "Wrong nodes data");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ static HttpServer createServer(String endpoint, HttpHandler handler)
* @return A list of hostnames for the nodes in the cluster.
* @throws IOException if the cluster server could not be created.
*/
static List<String> getNodeHostnamesFromCluster(String endpoint, HttpHandler responseHandler,
static Set<String> getNodeHostnamesFromCluster(String endpoint, HttpHandler responseHandler,
Configuration conf, ClusterType clusterType)
throws IOException
{
Expand All @@ -93,7 +93,7 @@ static List<String> getNodeHostnamesFromCluster(String endpoint, HttpHandler res

ClusterManager clusterManager = getClusterManagerInstance(clusterType, conf);
clusterManager.initialize(conf);
final List<String> nodes = clusterManager.getNodes();
final Set<String> nodes = clusterManager.getNodes();
log.info("Got nodes: " + nodes);

server.stop(0);
Expand Down Expand Up @@ -157,10 +157,10 @@ static int matchMemberships(TestWorker prevWorker, TestWorker newWorker, Set<Str
Configuration conf, ClusterType clusterType)
throws IOException
{
final List<String> nodeHostnames1 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, prevWorker, conf, clusterType);
final Set<String> nodeHostnames1 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, prevWorker, conf, clusterType);
Map<String, String> keyMembership1 = getConsistentHashedMembership(prevWorker, keys, conf, clusterType);

final List<String> nodeHostnames2 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, newWorker, conf, clusterType);
final Set<String> nodeHostnames2 = getNodeHostnamesFromCluster(CLUSTER_NODES_ENDPOINT, newWorker, conf, clusterType);
Map<String, String> keyMembership2 = getConsistentHashedMembership(newWorker, keys, conf, clusterType);

int match = 0;
Expand Down
Loading