Skip to content

Commit

Permalink
Add integ test
Browse files Browse the repository at this point in the history
Signed-off-by: Anshu Agarwal <anshukag@amazon.com>
  • Loading branch information
Anshu Agarwal committed Jan 15, 2024
1 parent ce4bf71 commit 456038b
Show file tree
Hide file tree
Showing 2 changed files with 144 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,14 @@
import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.cluster.health.ClusterHealthStatus;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.discovery.ClusterManagerNotDiscoveredException;
import org.opensearch.plugins.Plugin;
import org.opensearch.snapshots.mockstore.MockRepository;
import org.opensearch.test.InternalTestCluster;
import org.opensearch.test.OpenSearchIntegTestCase;
import org.opensearch.test.disruption.NetworkDisruption;
import org.opensearch.test.transport.MockTransportService;
Expand Down Expand Up @@ -715,4 +717,144 @@ public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Excep
assertFalse(nodeLocalHealth.isTimedOut());
assertTrue(nodeLocalHealth.hasDiscoveredClusterManager());
}

public void testReadWriteWeightedRoutingMetadataOnNodeRestart() throws Exception {
Settings commonSettings = Settings.builder()
.put("cluster.routing.allocation.awareness.attributes", "zone")
.put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c")
.build();

internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build());

logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'");
List<String> nodes_in_zone_a = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "a").build()
);
List<String> nodes_in_zone_b = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "b").build()
);
List<String> nodes_in_zone_c = internalCluster().startDataOnlyNodes(
1,
Settings.builder().put(commonSettings).put("node.attr.zone", "c").build()
);

logger.info("--> waiting for nodes to form a cluster");
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet();
assertThat(health.isTimedOut(), equalTo(false));

ensureGreen();

logger.info("--> setting shard routing weights for weighted round robin");
Map<String, Double> weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0);
WeightedRouting weightedRouting = new WeightedRouting("zone", weights);

ClusterPutWeightedRoutingResponse response = client().admin()
.cluster()
.prepareWeightedRouting()
.setWeightedRouting(weightedRouting)
.setVersion(-1)
.get();
assertEquals(response.isAcknowledged(), true);

ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get();
assertTrue(deleteResponse.isAcknowledged());

// check weighted routing metadata after node restart, ensure node comes healthy after restart
internalCluster().restartNode(nodes_in_zone_a.get(0), new InternalTestCluster.RestartCallback());
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

//make sure restarted node joins the cluster
assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size());
assertNotNull(
internalCluster().client(nodes_in_zone_a.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_b.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_c.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(internalCluster().getClusterManagerName())
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);

internalCluster().restartNode(internalCluster().getClusterManagerName(), new InternalTestCluster.RestartCallback());
ensureGreen();
assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata());

// make sure restarted node joins the cluster
assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size());
assertNotNull(
internalCluster().client(nodes_in_zone_a.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_b.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(nodes_in_zone_c.get(0))
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);
assertNotNull(
internalCluster().client(internalCluster().getClusterManagerName())
.admin()
.cluster()
.state(new ClusterStateRequest().local(true))
.get()
.getState()
.metadata()
.weightedRoutingMetadata()
);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ public boolean isSet() {

@Override
public void writeTo(StreamOutput out) throws IOException {
if (attributeName != null) {
out.writeString(attributeName);
}

out.writeString(attributeName);
out.writeGenericValue(weights);
}

Expand Down

0 comments on commit 456038b

Please sign in to comment.