From 8ba2782810f3dda60e0c3656f0db71d77f295b3d Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Tue, 2 Jan 2024 10:04:28 +0530 Subject: [PATCH 1/5] Fix for deserialization bug in weighted round robin metadata Signed-off-by: Anshu Agarwal --- .../metadata/WeightedRoutingMetadata.java | 7 ++-- .../WeightedRoutingMetadataTests.java | 41 ++++++++++++++++--- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java index bc24dd22f5c6e..b303c3a2034d5 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/WeightedRoutingMetadata.java @@ -26,6 +26,7 @@ import java.util.EnumSet; import java.util.HashMap; import java.util.Map; +import java.util.Objects; /** * Contains metadata for weighted routing @@ -99,7 +100,7 @@ public static NamedDiff readDiffFrom(StreamInput in) throws IOE public static WeightedRoutingMetadata fromXContent(XContentParser parser) throws IOException { String attrKey = null; Double attrValue; - String attributeName = null; + String attributeName = ""; Map weights = new HashMap<>(); WeightedRouting weightedRouting; XContentParser.Token token; @@ -162,12 +163,12 @@ public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; WeightedRoutingMetadata that = (WeightedRoutingMetadata) o; - return weightedRouting.equals(that.weightedRouting); + return weightedRouting.equals(that.weightedRouting) && version == that.version; } @Override public int hashCode() { - return weightedRouting.hashCode(); + return Objects.hash(weightedRouting.hashCode(), version); } @Override diff --git a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java index b78d1b56364eb..e19bde5d53d8a 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/WeightedRoutingMetadataTests.java @@ -8,29 +8,60 @@ package org.opensearch.cluster.metadata; +import org.opensearch.cluster.ClusterModule; +import org.opensearch.cluster.Diff; import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.core.common.io.stream.NamedWriteableRegistry; +import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.core.xcontent.XContentParser; -import org.opensearch.test.AbstractXContentTestCase; +import org.opensearch.test.AbstractDiffableSerializationTestCase; import java.io.IOException; +import java.util.HashMap; import java.util.Map; -public class WeightedRoutingMetadataTests extends AbstractXContentTestCase { +public class WeightedRoutingMetadataTests extends AbstractDiffableSerializationTestCase { + + @Override + protected Writeable.Reader instanceReader() { + return WeightedRoutingMetadata::new; + } + @Override protected WeightedRoutingMetadata createTestInstance() { + String attributeName = "zone"; Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); - WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + if (randomBoolean()) { + weights = new HashMap<>(); + attributeName = ""; + } + WeightedRouting weightedRouting = new WeightedRouting(attributeName, weights); WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1); + return weightedRoutingMetadata; } + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(ClusterModule.getNamedWriteables()); + } + @Override protected WeightedRoutingMetadata doParseInstance(XContentParser parser) throws IOException { return WeightedRoutingMetadata.fromXContent(parser); } @Override - protected boolean supportsUnknownFields() { - return false; + protected Metadata.Custom makeTestChanges(Metadata.Custom testInstance) { + + WeightedRouting weightedRouting = new WeightedRouting("", new HashMap<>()); + WeightedRoutingMetadata weightedRoutingMetadata = new WeightedRoutingMetadata(weightedRouting, -1); + return weightedRoutingMetadata; } + + @Override + protected Writeable.Reader> diffReader() { + return WeightedRoutingMetadata::readDiffFrom; + } + } From 3cde9b1c15e52796704d7fda89aaa2e4177be622 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Fri, 5 Jan 2024 11:34:46 +0530 Subject: [PATCH 2/5] Add changelog Signed-off-by: Anshu Agarwal --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 449d7d813f764..6811e670f6d8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -183,6 +183,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Remove shadowJar from `lang-painless` module publication ([#11369](https://github.com/opensearch-project/OpenSearch/issues/11369)) - Fix remote shards balancer and remove unused variables ([#11167](https://github.com/opensearch-project/OpenSearch/pull/11167)) - Fix bug where replication lag grows post primary relocation ([#11238](https://github.com/opensearch-project/OpenSearch/pull/11238)) +- Fix for deserilization bug in weighted round-robin metadata ([#11679](https://github.com/opensearch-project/OpenSearch/pull/11679)) ### Security From dedaa30670872c443fdea6bd1590d9493cee3171 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Mon, 8 Jan 2024 21:35:41 +0530 Subject: [PATCH 3/5] Add null check Signed-off-by: Anshu Agarwal --- .../java/org/opensearch/cluster/routing/WeightedRouting.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java index 2b93a1483b801..aa8a99273b160 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java @@ -54,7 +54,9 @@ public boolean isSet() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeString(attributeName); + if (attributeName != null) { + out.writeString(attributeName); + } out.writeGenericValue(weights); } From 456038bf201bc8344952081e51b0812eb76aa0e9 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Mon, 15 Jan 2024 13:14:29 +0530 Subject: [PATCH 4/5] Add integ test Signed-off-by: Anshu Agarwal --- .../cluster/routing/WeightedRoutingIT.java | 142 ++++++++++++++++++ .../cluster/routing/WeightedRouting.java | 5 +- 2 files changed, 144 insertions(+), 3 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index 2e0dd579d6910..a64b50321d265 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -13,12 +13,14 @@ import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; +import org.opensearch.action.admin.cluster.state.ClusterStateRequest; import org.opensearch.cluster.health.ClusterHealthStatus; import org.opensearch.common.settings.Settings; import org.opensearch.core.rest.RestStatus; import org.opensearch.discovery.ClusterManagerNotDiscoveredException; import org.opensearch.plugins.Plugin; import org.opensearch.snapshots.mockstore.MockRepository; +import org.opensearch.test.InternalTestCluster; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.test.disruption.NetworkDisruption; import org.opensearch.test.transport.MockTransportService; @@ -715,4 +717,144 @@ public void testClusterHealthResponseWithEnsureNodeWeighedInParam() throws Excep assertFalse(nodeLocalHealth.isTimedOut()); assertTrue(nodeLocalHealth.hasDiscoveredClusterManager()); } + + public void testReadWriteWeightedRoutingMetadataOnNodeRestart() throws Exception { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + internalCluster().startClusterManagerOnlyNode(Settings.builder().put(commonSettings).build()); + + logger.info("--> starting 1 nodes on zones 'a' & 'b' & 'c'"); + List nodes_in_zone_a = internalCluster().startDataOnlyNodes( + 1, + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build() + ); + List nodes_in_zone_b = internalCluster().startDataOnlyNodes( + 1, + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build() + ); + List nodes_in_zone_c = internalCluster().startDataOnlyNodes( + 1, + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("4").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .setVersion(-1) + .get(); + assertEquals(response.isAcknowledged(), true); + + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().setVersion(0).get(); + assertTrue(deleteResponse.isAcknowledged()); + + // check weighted routing metadata after node restart, ensure node comes healthy after restart + internalCluster().restartNode(nodes_in_zone_a.get(0), new InternalTestCluster.RestartCallback()); + ensureGreen(); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + //make sure restarted node joins the cluster + assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size()); + assertNotNull( + internalCluster().client(nodes_in_zone_a.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(nodes_in_zone_b.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(nodes_in_zone_c.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(internalCluster().getClusterManagerName()) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + + internalCluster().restartNode(internalCluster().getClusterManagerName(), new InternalTestCluster.RestartCallback()); + ensureGreen(); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // make sure restarted node joins the cluster + assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size()); + assertNotNull( + internalCluster().client(nodes_in_zone_a.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(nodes_in_zone_b.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(nodes_in_zone_c.get(0)) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + assertNotNull( + internalCluster().client(internalCluster().getClusterManagerName()) + .admin() + .cluster() + .state(new ClusterStateRequest().local(true)) + .get() + .getState() + .metadata() + .weightedRoutingMetadata() + ); + + } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java index aa8a99273b160..468fac08d2946 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRouting.java @@ -54,9 +54,8 @@ public boolean isSet() { @Override public void writeTo(StreamOutput out) throws IOException { - if (attributeName != null) { - out.writeString(attributeName); - } + + out.writeString(attributeName); out.writeGenericValue(weights); } From e1da3a01597dc4056674868816c8342f003df6b0 Mon Sep 17 00:00:00 2001 From: Anshu Agarwal Date: Mon, 15 Jan 2024 13:43:55 +0530 Subject: [PATCH 5/5] spotless fix Signed-off-by: Anshu Agarwal --- .../java/org/opensearch/cluster/routing/WeightedRoutingIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index a64b50321d265..d6d22c95ee5a2 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -766,7 +766,7 @@ public void testReadWriteWeightedRoutingMetadataOnNodeRestart() throws Exception ensureGreen(); assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); - //make sure restarted node joins the cluster + // make sure restarted node joins the cluster assertEquals(3, internalCluster().clusterService().state().nodes().getDataNodes().size()); assertNotNull( internalCluster().client(nodes_in_zone_a.get(0))