Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introducing LocalMode to DistributedClusterStats #17817

Merged
merged 1 commit into from
Jun 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@
import javax.annotation.security.RolesAllowed;
import javax.inject.Inject;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.HeaderParam;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
Expand Down Expand Up @@ -98,9 +100,10 @@ public void getClusterStats(
@HeaderParam(X_FORWARDED_PROTO) String xForwardedProto,
@Context UriInfo uriInfo,
@Context HttpServletRequest servletRequest,
@Suspended AsyncResponse asyncResponse)
@Suspended AsyncResponse asyncResponse,
@QueryParam("includeLocalInfoOnly") @DefaultValue("false") boolean includeLocalInfoOnly)
{
if (resourceManagerEnabled) {
if (resourceManagerEnabled && !includeLocalInfoOnly) {
proxyClusterStats(servletRequest, asyncResponse, xForwardedProto, uriInfo);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@ public class TestDistributedClusterStatsResource
private TestingPrestoServer coordinator1;
private TestingPrestoServer resourceManager;
private TestingPrestoServer coordinator2;
private DistributedQueryRunner runner;

@BeforeClass
public void setup()
throws Exception
{
client = new JettyHttpClient();
DistributedQueryRunner runner = createQueryRunner(
runner = createQueryRunner(
ImmutableMap.of(
"resource-manager.query-expiration-timeout", "4m",
"resource-manager.completed-query-expiration-timeout", "4m"),
Expand Down Expand Up @@ -117,10 +118,10 @@ public void testClusterStatsRedirectToResourceManager()
runToFirstResult(client, coordinator2, "SELECT * from tpch.sf102.orders");
runToFirstResult(client, coordinator2, "SELECT * from tpch.sf100.orders");
runToFirstResult(client, coordinator2, "SELECT * from tpch.sf101.orders");
waitForGlobalQueryViewInCoordinator(3, RUNNING, coordinator1, SECONDS.toMillis(20));
waitForGlobalQueryViewInCoordinator(3, RUNNING, SECONDS.toMillis(20));
runToQueued(client, coordinator2, "SELECT * from tpch.sf100.orders");
waitForGlobalQueryViewInCoordinator(1, QUEUED, coordinator1, SECONDS.toMillis(20));
ClusterStatsResource.ClusterStats clusterStats = getClusterStats(true, coordinator1);
waitForGlobalQueryViewInCoordinator(1, QUEUED, SECONDS.toMillis(20));
ClusterStatsResource.ClusterStats clusterStats = getClusterStats(true, coordinator1, false);
assertNotNull(clusterStats);
assertTrue(clusterStats.getActiveWorkers() > 0);
assertEquals(clusterStats.getRunningQueries(), 3);
Expand All @@ -130,18 +131,22 @@ public void testClusterStatsRedirectToResourceManager()
assertEquals(clusterStats.getRunningTasks(), 12);
}

private void waitForGlobalQueryViewInCoordinator(int numberOfRunningQueries, QueryState state, TestingPrestoServer coordinator, long timeoutInMillis)
private void waitForGlobalQueryViewInCoordinator(int queryCount, QueryState state, long timeoutInMillis)
throws InterruptedException, TimeoutException
{
long deadline = System.currentTimeMillis() + timeoutInMillis;
int globalQueryCount = 0;
while (System.currentTimeMillis() < deadline) {
Optional<Integer> globalQueryCountFromCoordinator = getGlobalQueryCountIfAvailable(state, coordinator);
if (!globalQueryCountFromCoordinator.isPresent()) {
continue;
for (int i = 0; i < COORDINATOR_COUNT; i++) {
TestingPrestoServer currCoordinator = runner.getCoordinator(i);
Optional<Integer> globalQueryCountFromCoordinator = getGlobalQueryCountIfAvailable(state, currCoordinator);
if (!globalQueryCountFromCoordinator.isPresent()) {
continue;
}
globalQueryCount += globalQueryCountFromCoordinator.get();
}
globalQueryCount = globalQueryCountFromCoordinator.get();
if (globalQueryCount == numberOfRunningQueries) {

if (globalQueryCount == queryCount) {
return;
}
sleep(100);
Expand Down Expand Up @@ -184,14 +189,51 @@ private void waitUntilCoordinatorsDiscoveredHealthyInRM(long timeoutInMillis)
throw new TimeoutException(format("one of the nodes is still missing after: %s ms", timeoutInMillis));
}

private ClusterStatsResource.ClusterStats getClusterStats(boolean followRedirects, TestingPrestoServer coordinator)
private ClusterStatsResource.ClusterStats getClusterStats(boolean followRedirects, TestingPrestoServer coordinator, boolean includeLocalInfo)
{
String localInfo = "";
if (includeLocalInfo) {
localInfo = "?includeLocalInfoOnly=true";
}
Request request = prepareGet()
.setHeader(PRESTO_USER, "user")
.setUri(uriBuilderFrom(coordinator.getBaseUrl().resolve("/v1/cluster")).build())
.setUri(uriBuilderFrom(coordinator.getBaseUrl().resolve("/v1/cluster" + localInfo)).build())
.setFollowRedirects(followRedirects)
.build();

return client.execute(request, createJsonResponseHandler(jsonCodec(ClusterStatsResource.ClusterStats.class)));
}

kaur-h marked this conversation as resolved.
Show resolved Hide resolved
@Test(timeOut = 120_000)
public void testClusterStatsLocalInfoReturn() throws Exception
{
waitUntilCoordinatorsDiscoveredHealthyInRM(SECONDS.toMillis(15));
runToFirstResult(client, coordinator2, "SELECT * from tpch.sf100.orders");
runToFirstResult(client, coordinator2, "SELECT * from tpch.sf101.orders");
waitForGlobalQueryViewInCoordinator(2, RUNNING, SECONDS.toMillis(30));
runToFirstResult(client, coordinator1, "SELECT * from tpch.sf101.orders");
runToQueued(client, coordinator2, "SELECT * from tpch.sf101.orders");
waitForGlobalQueryViewInCoordinator(1, QUEUED, SECONDS.toMillis(20));

ClusterStatsResource.ClusterStats clusterLocalStatsCoord1 = getClusterStats(false, coordinator1, true);
assertNotNull(clusterLocalStatsCoord1);
assertTrue(clusterLocalStatsCoord1.getActiveWorkers() > 0);
assertEquals(clusterLocalStatsCoord1.getRunningQueries(), 1);
assertEquals(clusterLocalStatsCoord1.getQueuedQueries(), 0);
assertEquals(clusterLocalStatsCoord1.getBlockedQueries(), 0);

ClusterStatsResource.ClusterStats clusterLocalStatsCoord2 = getClusterStats(false, coordinator2, true);
assertNotNull(clusterLocalStatsCoord2);
assertTrue(clusterLocalStatsCoord2.getActiveWorkers() > 0);
assertEquals(clusterLocalStatsCoord2.getRunningQueries(), 2);
assertEquals(clusterLocalStatsCoord2.getQueuedQueries(), 1);
assertEquals(clusterLocalStatsCoord2.getBlockedQueries(), 0);

ClusterStatsResource.ClusterStats clusterStats = getClusterStats(false, coordinator1, false);
assertNotNull(clusterStats);
assertTrue(clusterStats.getActiveWorkers() > 0);
assertEquals(clusterStats.getRunningQueries(), 3);
assertEquals(clusterStats.getQueuedQueries(), 1);
assertEquals(clusterStats.getBlockedQueries(), 0);
}
}