From 456038bf201bc8344952081e51b0812eb76aa0e9 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Mon, 15 Jan 2024 13:14:29 +0530 Subject: [PATCH] Add integ test Signed-off-by: Anshu Agarwal --- .../cluster/routing/WeightedRoutingIT.java | 142 ++++++++++++++++++ .../cluster/routing/WeightedRouting.java | 5 +- 2 files changed, 144 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index 2e0dd579d6910..a64b50321d265 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -13,12 +13,14 @@ import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.common.settings.Settings; import org.opensearch.core.rest.RestStatus; import org.opensearch.discovery.ClusterManagerNotDiscoveredException; import org.opensearch.plugins.Plugin; import org.opensearch.snapshots.mockstore.MockRepository; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.NetworkDisruption; import org.opensearch.test.transport.MockTransportService; @@ -715,4 +717,144 @@ public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Excep assertFalse(nodeLocalHealth.isTimedOut()); assertTrue(nodeLocalHealth.hasDiscoveredClusterManager()); } + + public void testReadWriteWeightedRoutingMetadataOnNodeRestart() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + 1, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + 1, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + 1, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertEquals(response.isAcknowledged(), true); + + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get(); + assertTrue(deleteResponse.isAcknowledged()); + + // check weighted routing metadata after node restart, ensure node comes healthy after restart + internalCluster().restartNode(nodes_in_zone_a.get(0), new InternalTestCluster.RestartCallback()); + ensureGreen(); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + //make sure restarted node joins the cluster + assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size()); + assertNotNull( + internalCluster().client(nodes_in_zone_a.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(nodes_in_zone_b.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(nodes_in_zone_c.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(internalCluster().getClusterManagerName()) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + + internalCluster().restartNode(internalCluster().getClusterManagerName(), new InternalTestCluster.RestartCallback()); + ensureGreen(); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // make sure restarted node joins the cluster + assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size()); + assertNotNull( + internalCluster().client(nodes_in_zone_a.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(nodes_in_zone_b.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(nodes_in_zone_c.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(internalCluster().getClusterManagerName()) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java index aa8a99273b160..468fac08d2946 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java @@ -54,9 +54,8 @@ public boolean isSet() { @Override public void writeTo(StreamOutput out) throws IOException { - if (attributeName != null) { - out.writeString(attributeName); - } + + out.writeString(attributeName); out.writeGenericValue(weights); }