diff --git a/presto-main/src/main/java/com/facebook/presto/server/ClusterStatsResource.java b/presto-main/src/main/java/com/facebook/presto/server/ClusterStatsResource.java index 1c95a0d53d44..fcadfc9da550 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ClusterStatsResource.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ClusterStatsResource.java @@ -32,10 +32,12 @@ import javax.annotation.security.RolesAllowed; import javax.inject.Inject; import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; import javax.ws.rs.HeaderParam; import javax.ws.rs.Path; import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; import javax.ws.rs.container.AsyncResponse; import javax.ws.rs.container.Suspended; import javax.ws.rs.core.Context; @@ -98,9 +100,10 @@ public void getClusterStats( @HeaderParam(X_FORWARDED_PROTO) String xForwardedProto, @Context UriInfo uriInfo, @Context HttpServletRequest servletRequest, - @Suspended AsyncResponse asyncResponse) + @Suspended AsyncResponse asyncResponse, + @QueryParam("includeLocalInfoOnly") @DefaultValue("false") boolean includeLocalInfoOnly) { - if (resourceManagerEnabled) { + if (resourceManagerEnabled && !includeLocalInfoOnly) { proxyClusterStats(servletRequest, asyncResponse, xForwardedProto, uriInfo); return; } diff --git a/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedClusterStatsResource.java b/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedClusterStatsResource.java index bdcf506e4853..495316791999 100644 --- a/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedClusterStatsResource.java +++ b/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedClusterStatsResource.java @@ -62,13 +62,14 @@ public class TestDistributedClusterStatsResource private TestingPrestoServer coordinator1; private TestingPrestoServer resourceManager; private TestingPrestoServer coordinator2; + private DistributedQueryRunner runner; @BeforeClass public void setup() throws Exception { client = new JettyHttpClient(); - DistributedQueryRunner runner = createQueryRunner( + runner = createQueryRunner( ImmutableMap.of( "resource-manager.query-expiration-timeout", "4m", "resource-manager.completed-query-expiration-timeout", "4m"), @@ -117,10 +118,10 @@ public void testClusterStatsRedirectToResourceManager() runToFirstResult(client, coordinator2, "SELECT * from tpch.sf102.orders"); runToFirstResult(client, coordinator2, "SELECT * from tpch.sf100.orders"); runToFirstResult(client, coordinator2, "SELECT * from tpch.sf101.orders"); - waitForGlobalQueryViewInCoordinator(3, RUNNING, coordinator1, SECONDS.toMillis(20)); + waitForGlobalQueryViewInCoordinator(3, RUNNING, SECONDS.toMillis(20)); runToQueued(client, coordinator2, "SELECT * from tpch.sf100.orders"); - waitForGlobalQueryViewInCoordinator(1, QUEUED, coordinator1, SECONDS.toMillis(20)); - ClusterStatsResource.ClusterStats clusterStats = getClusterStats(true, coordinator1); + waitForGlobalQueryViewInCoordinator(1, QUEUED, SECONDS.toMillis(20)); + ClusterStatsResource.ClusterStats clusterStats = getClusterStats(true, coordinator1, false); assertNotNull(clusterStats); assertTrue(clusterStats.getActiveWorkers() > 0); assertEquals(clusterStats.getRunningQueries(), 3); @@ -130,18 +131,22 @@ public void testClusterStatsRedirectToResourceManager() assertEquals(clusterStats.getRunningTasks(), 12); } - private void waitForGlobalQueryViewInCoordinator(int numberOfRunningQueries, QueryState state, TestingPrestoServer coordinator, long timeoutInMillis) + private void waitForGlobalQueryViewInCoordinator(int queryCount, QueryState state, long timeoutInMillis) throws InterruptedException, TimeoutException { long deadline = System.currentTimeMillis() + timeoutInMillis; int globalQueryCount = 0; while (System.currentTimeMillis() < deadline) { - Optional globalQueryCountFromCoordinator = getGlobalQueryCountIfAvailable(state, coordinator); - if (!globalQueryCountFromCoordinator.isPresent()) { - continue; + for (int i = 0; i < COORDINATOR_COUNT; i++) { + TestingPrestoServer currCoordinator = runner.getCoordinator(i); + Optional globalQueryCountFromCoordinator = getGlobalQueryCountIfAvailable(state, currCoordinator); + if (!globalQueryCountFromCoordinator.isPresent()) { + continue; + } + globalQueryCount += globalQueryCountFromCoordinator.get(); } - globalQueryCount = globalQueryCountFromCoordinator.get(); - if (globalQueryCount == numberOfRunningQueries) { + + if (globalQueryCount == queryCount) { return; } sleep(100); @@ -184,14 +189,51 @@ private void waitUntilCoordinatorsDiscoveredHealthyInRM(long timeoutInMillis) throw new TimeoutException(format("one of the nodes is still missing after: %s ms", timeoutInMillis)); } - private ClusterStatsResource.ClusterStats getClusterStats(boolean followRedirects, TestingPrestoServer coordinator) + private ClusterStatsResource.ClusterStats getClusterStats(boolean followRedirects, TestingPrestoServer coordinator, boolean includeLocalInfo) { + String localInfo = ""; + if (includeLocalInfo) { + localInfo = "?includeLocalInfoOnly=true"; + } Request request = prepareGet() .setHeader(PRESTO_USER, "user") - .setUri(uriBuilderFrom(coordinator.getBaseUrl().resolve("/v1/cluster")).build()) + .setUri(uriBuilderFrom(coordinator.getBaseUrl().resolve("/v1/cluster" + localInfo)).build()) .setFollowRedirects(followRedirects) .build(); return client.execute(request, createJsonResponseHandler(jsonCodec(ClusterStatsResource.ClusterStats.class))); } + + @Test(timeOut = 120_000) + public void testClusterStatsLocalInfoReturn() throws Exception + { + waitUntilCoordinatorsDiscoveredHealthyInRM(SECONDS.toMillis(15)); + runToFirstResult(client, coordinator2, "SELECT * from tpch.sf100.orders"); + runToFirstResult(client, coordinator2, "SELECT * from tpch.sf101.orders"); + waitForGlobalQueryViewInCoordinator(2, RUNNING, SECONDS.toMillis(30)); + runToFirstResult(client, coordinator1, "SELECT * from tpch.sf101.orders"); + runToQueued(client, coordinator2, "SELECT * from tpch.sf101.orders"); + waitForGlobalQueryViewInCoordinator(1, QUEUED, SECONDS.toMillis(20)); + + ClusterStatsResource.ClusterStats clusterLocalStatsCoord1 = getClusterStats(false, coordinator1, true); + assertNotNull(clusterLocalStatsCoord1); + assertTrue(clusterLocalStatsCoord1.getActiveWorkers() > 0); + assertEquals(clusterLocalStatsCoord1.getRunningQueries(), 1); + assertEquals(clusterLocalStatsCoord1.getQueuedQueries(), 0); + assertEquals(clusterLocalStatsCoord1.getBlockedQueries(), 0); + + ClusterStatsResource.ClusterStats clusterLocalStatsCoord2 = getClusterStats(false, coordinator2, true); + assertNotNull(clusterLocalStatsCoord2); + assertTrue(clusterLocalStatsCoord2.getActiveWorkers() > 0); + assertEquals(clusterLocalStatsCoord2.getRunningQueries(), 2); + assertEquals(clusterLocalStatsCoord2.getQueuedQueries(), 1); + assertEquals(clusterLocalStatsCoord2.getBlockedQueries(), 0); + + ClusterStatsResource.ClusterStats clusterStats = getClusterStats(false, coordinator1, false); + assertNotNull(clusterStats); + assertTrue(clusterStats.getActiveWorkers() > 0); + assertEquals(clusterStats.getRunningQueries(), 3); + assertEquals(clusterStats.getQueuedQueries(), 1); + assertEquals(clusterStats.getBlockedQueries(), 0); + } }