From b950d0fdada930b5c9e64c463d3a9e54481cefc3 Mon Sep 17 00:00:00 2001 From: Dain Sundstrom Date: Fri, 21 Dec 2018 13:47:31 -0600 Subject: [PATCH] Add test for DiscoveryNodeManager listener --- .../metadata/TestDiscoveryNodeManager.java | 118 ++++++++++++++---- 1 file changed, 94 insertions(+), 24 deletions(-) diff --git a/presto-main/src/test/java/io/prestosql/metadata/TestDiscoveryNodeManager.java b/presto-main/src/test/java/io/prestosql/metadata/TestDiscoveryNodeManager.java index 5852b845788f..5bdaa1332ab2 100644 --- a/presto-main/src/test/java/io/prestosql/metadata/TestDiscoveryNodeManager.java +++ b/presto-main/src/test/java/io/prestosql/metadata/TestDiscoveryNodeManager.java @@ -17,9 +17,9 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.ListenableFuture; import io.airlift.discovery.client.ServiceDescriptor; import io.airlift.discovery.client.ServiceSelector; -import io.airlift.discovery.client.testing.StaticServiceSelector; import io.airlift.http.client.HttpClient; import io.airlift.http.client.testing.TestingHttpClient; import io.airlift.http.client.testing.TestingResponse; @@ -32,13 +32,17 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import javax.annotation.concurrent.GuardedBy; + import java.net.URI; -import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.UUID; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import static io.airlift.discovery.client.ServiceDescriptor.serviceDescriptor; +import static io.airlift.discovery.client.ServiceSelectorConfig.DEFAULT_POOL; import static io.airlift.http.client.HttpStatus.OK; import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder; import static io.prestosql.spi.NodeState.ACTIVE; @@ -52,10 +56,11 @@ public class TestDiscoveryNodeManager private final NodeInfo nodeInfo = new NodeInfo("test"); private final InternalCommunicationConfig internalCommunicationConfig = new InternalCommunicationConfig(); private NodeVersion expectedVersion; - private List activeNodes; - private List inactiveNodes; + private Set activeNodes; + private Set inactiveNodes; private PrestoNode coordinator; - private ServiceSelector selector; + private PrestoNode currentNode; + private final PrestoNodeServiceSelector selector = new PrestoNodeServiceSelector(); private HttpClient testHttpClient; @BeforeMethod @@ -64,27 +69,19 @@ public void setup() testHttpClient = new TestingHttpClient(input -> new TestingResponse(OK, ArrayListMultimap.create(), ACTIVE.name().getBytes())); expectedVersion = new NodeVersion("1"); - coordinator = new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.8"), expectedVersion, false); - activeNodes = ImmutableList.of( - new PrestoNode(nodeInfo.getNodeId(), URI.create("http://192.0.1.1"), expectedVersion, false), + coordinator = new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.2.8"), expectedVersion, true); + currentNode = new PrestoNode(nodeInfo.getNodeId(), URI.create("http://192.0.1.1"), expectedVersion, false); + + activeNodes = ImmutableSet.of( + currentNode, new PrestoNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.1:8080"), expectedVersion, false), new PrestoNode(UUID.randomUUID().toString(), URI.create("http://192.0.2.3"), expectedVersion, false), coordinator); - inactiveNodes = ImmutableList.of( + inactiveNodes = ImmutableSet.of( new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.3.9"), NodeVersion.UNKNOWN, false), new PrestoNode(UUID.randomUUID().toString(), URI.create("https://192.0.4.9"), new NodeVersion("2"), false)); - List descriptors = new ArrayList<>(); - for (PrestoNode node : Iterables.concat(activeNodes, inactiveNodes)) { - descriptors.add(serviceDescriptor("presto") - .setNodeId(node.getNodeIdentifier()) - .addProperty("http", node.getHttpUri().toString()) - .addProperty("node_version", node.getNodeVersion().toString()) - .addProperty("coordinator", String.valueOf(node.equals(coordinator))) - .build()); - } - - selector = new StaticServiceSelector(descriptors); + selector.announceNodes(activeNodes, inactiveNodes); } @Test @@ -124,15 +121,13 @@ public void testGetAllNodes() @Test public void testGetCurrentNode() { - Node expected = activeNodes.get(0); - NodeInfo nodeInfo = new NodeInfo(new NodeConfig() .setEnvironment("test") - .setNodeId(expected.getNodeIdentifier())); + .setNodeId(currentNode.getNodeIdentifier())); DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); try { - assertEquals(manager.getCurrentNode(), expected); + assertEquals(manager.getCurrentNode(), currentNode); } finally { manager.stop(); @@ -157,4 +152,79 @@ public void testGetCurrentNodeRequired() { new DiscoveryNodeManager(selector, new NodeInfo("test"), new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); } + + @Test(timeOut = 60000) + public void testNodeChangeListener() + throws Exception + { + DiscoveryNodeManager manager = new DiscoveryNodeManager(selector, nodeInfo, new NoOpFailureDetector(), expectedVersion, testHttpClient, internalCommunicationConfig); + try { + manager.startPollingNodeStates(); + + BlockingQueue notifications = new ArrayBlockingQueue<>(100); + manager.addNodeChangeListener(notifications::add); + AllNodes allNodes = notifications.take(); + assertEquals(allNodes.getActiveNodes(), activeNodes); + assertEquals(allNodes.getInactiveNodes(), inactiveNodes); + + selector.announceNodes(ImmutableSet.of(currentNode), ImmutableSet.of(coordinator)); + allNodes = notifications.take(); + assertEquals(allNodes.getActiveNodes(), ImmutableSet.of(currentNode, coordinator)); + assertEquals(allNodes.getActiveCoordinators(), ImmutableSet.of(coordinator)); + + selector.announceNodes(activeNodes, inactiveNodes); + allNodes = notifications.take(); + assertEquals(allNodes.getActiveNodes(), activeNodes); + assertEquals(allNodes.getInactiveNodes(), inactiveNodes); + } + finally { + manager.stop(); + } + } + + public static class PrestoNodeServiceSelector + implements ServiceSelector + { + @GuardedBy("this") + private List descriptors = ImmutableList.of(); + + private synchronized void announceNodes(Set activeNodes, Set inactiveNodes) + { + ImmutableList.Builder descriptors = ImmutableList.builder(); + for (Node node : Iterables.concat(activeNodes, inactiveNodes)) { + descriptors.add(serviceDescriptor("presto") + .setNodeId(node.getNodeIdentifier()) + .addProperty("http", node.getHttpUri().toString()) + .addProperty("node_version", ((PrestoNode) node).getNodeVersion().toString()) + .addProperty("coordinator", String.valueOf(node.isCoordinator())) + .build()); + } + + this.descriptors = descriptors.build(); + } + + @Override + public String getType() + { + return "presto"; + } + + @Override + public String getPool() + { + return DEFAULT_POOL; + } + + @Override + public synchronized List selectAllServices() + { + return descriptors; + } + + @Override + public ListenableFuture> refresh() + { + throw new UnsupportedOperationException(); + } + } }