Skip to content

Commit

Permalink
Creating and testing local info in coordinator stats
Browse files Browse the repository at this point in the history
  • Loading branch information
Harleen Kaur authored and tdcmeehan committed Jun 23, 2022
1 parent 3f5595e commit 439f31f
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
Expand Down Expand Up @@ -98,9 +100,10 @@ public void getClusterStats(
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@Context UriInfo uriInfo,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
@Suspended AsyncResponse asyncResponse,
@QueryParam("includeLocalInfoOnly") @DefaultValue("false") boolean includeLocalInfoOnly)
{
if (resourceManagerEnabled) {
if (resourceManagerEnabled && !includeLocalInfoOnly) {
proxyClusterStats(servletRequest, asyncResponse, xForwardedProto, uriInfo);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ public class TestDistributedClusterStatsResource
private TestingPrestoServer coordinator1;
private TestingPrestoServer resourceManager;
private TestingPrestoServer coordinator2;
private DistributedQueryRunner runner;

@BeforeClass
public void setup()
throws Exception
{
client = new JettyHttpClient();
DistributedQueryRunner runner = createQueryRunner(
runner = createQueryRunner(
ImmutableMap.of(
"resource-manager.query-expiration-timeout", "4m",
"resource-manager.completed-query-expiration-timeout", "4m"),
Expand Down Expand Up @@ -117,10 +118,10 @@ public void testClusterStatsRedirectToResourceManager()
runToFirstResult(client, coordinator2, "SELECT * from tpch.sf102.orders");
runToFirstResult(client, coordinator2, "SELECT * from tpch.sf100.orders");
runToFirstResult(client, coordinator2, "SELECT * from tpch.sf101.orders");
waitForGlobalQueryViewInCoordinator(3, RUNNING, coordinator1, SECONDS.toMillis(20));
waitForGlobalQueryViewInCoordinator(3, RUNNING, SECONDS.toMillis(20));
runToQueued(client, coordinator2, "SELECT * from tpch.sf100.orders");
waitForGlobalQueryViewInCoordinator(1, QUEUED, coordinator1, SECONDS.toMillis(20));
ClusterStatsResource.ClusterStats clusterStats = getClusterStats(true, coordinator1);
waitForGlobalQueryViewInCoordinator(1, QUEUED, SECONDS.toMillis(20));
ClusterStatsResource.ClusterStats clusterStats = getClusterStats(true, coordinator1, false);
assertNotNull(clusterStats);
assertTrue(clusterStats.getActiveWorkers() > 0);
assertEquals(clusterStats.getRunningQueries(), 3);
Expand All @@ -130,18 +131,22 @@ public void testClusterStatsRedirectToResourceManager()
assertEquals(clusterStats.getRunningTasks(), 12);
}

private void waitForGlobalQueryViewInCoordinator(int numberOfRunningQueries, QueryState state, TestingPrestoServer coordinator, long timeoutInMillis)
private void waitForGlobalQueryViewInCoordinator(int queryCount, QueryState state, long timeoutInMillis)
throws InterruptedException, TimeoutException
{
long deadline = System.currentTimeMillis() + timeoutInMillis;
int globalQueryCount = 0;
while (System.currentTimeMillis() < deadline) {
Optional<Integer> globalQueryCountFromCoordinator = getGlobalQueryCountIfAvailable(state, coordinator);
if (!globalQueryCountFromCoordinator.isPresent()) {
continue;
for (int i = 0; i < COORDINATOR_COUNT; i++) {
TestingPrestoServer currCoordinator = runner.getCoordinator(i);
Optional<Integer> globalQueryCountFromCoordinator = getGlobalQueryCountIfAvailable(state, currCoordinator);
if (!globalQueryCountFromCoordinator.isPresent()) {
continue;
}
globalQueryCount += globalQueryCountFromCoordinator.get();
}
globalQueryCount = globalQueryCountFromCoordinator.get();
if (globalQueryCount == numberOfRunningQueries) {

if (globalQueryCount == queryCount) {
return;
}
sleep(100);
Expand Down Expand Up @@ -184,14 +189,51 @@ private void waitUntilCoordinatorsDiscoveredHealthyInRM(long timeoutInMillis)
throw new TimeoutException(format("one of the nodes is still missing after: %s ms", timeoutInMillis));
}

private ClusterStatsResource.ClusterStats getClusterStats(boolean followRedirects, TestingPrestoServer coordinator)
private ClusterStatsResource.ClusterStats getClusterStats(boolean followRedirects, TestingPrestoServer coordinator, boolean includeLocalInfo)
{
String localInfo = "";
if (includeLocalInfo) {
localInfo = "?includeLocalInfoOnly=true";
}
Request request = prepareGet()
.setHeader(PRESTO_USER, "user")
.setUri(uriBuilderFrom(coordinator.getBaseUrl().resolve("/v1/cluster")).build())
.setUri(uriBuilderFrom(coordinator.getBaseUrl().resolve("/v1/cluster" + localInfo)).build())
.setFollowRedirects(followRedirects)
.build();

return client.execute(request, createJsonResponseHandler(jsonCodec(ClusterStatsResource.ClusterStats.class)));
}

@Test(timeOut = 120_000)
public void testClusterStatsLocalInfoReturn() throws Exception
{
waitUntilCoordinatorsDiscoveredHealthyInRM(SECONDS.toMillis(15));
runToFirstResult(client, coordinator2, "SELECT * from tpch.sf100.orders");
runToFirstResult(client, coordinator2, "SELECT * from tpch.sf101.orders");
waitForGlobalQueryViewInCoordinator(2, RUNNING, SECONDS.toMillis(30));
runToFirstResult(client, coordinator1, "SELECT * from tpch.sf101.orders");
runToQueued(client, coordinator2, "SELECT * from tpch.sf101.orders");
waitForGlobalQueryViewInCoordinator(1, QUEUED, SECONDS.toMillis(20));

ClusterStatsResource.ClusterStats clusterLocalStatsCoord1 = getClusterStats(false, coordinator1, true);
assertNotNull(clusterLocalStatsCoord1);
assertTrue(clusterLocalStatsCoord1.getActiveWorkers() > 0);
assertEquals(clusterLocalStatsCoord1.getRunningQueries(), 1);
assertEquals(clusterLocalStatsCoord1.getQueuedQueries(), 0);
assertEquals(clusterLocalStatsCoord1.getBlockedQueries(), 0);

ClusterStatsResource.ClusterStats clusterLocalStatsCoord2 = getClusterStats(false, coordinator2, true);
assertNotNull(clusterLocalStatsCoord2);
assertTrue(clusterLocalStatsCoord2.getActiveWorkers() > 0);
assertEquals(clusterLocalStatsCoord2.getRunningQueries(), 2);
assertEquals(clusterLocalStatsCoord2.getQueuedQueries(), 1);
assertEquals(clusterLocalStatsCoord2.getBlockedQueries(), 0);

ClusterStatsResource.ClusterStats clusterStats = getClusterStats(false, coordinator1, false);
assertNotNull(clusterStats);
assertTrue(clusterStats.getActiveWorkers() > 0);
assertEquals(clusterStats.getRunningQueries(), 3);
assertEquals(clusterStats.getQueuedQueries(), 1);
assertEquals(clusterStats.getBlockedQueries(), 0);
}
}

0 comments on commit 439f31f

Please sign in to comment.