diff --git a/CHANGELOG.md b/CHANGELOG.md index aec2c6f58c1f7..806ba4a2e78c7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -54,6 +54,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Bumps `jempbox` from 1.8.16 to 1.8.17 ([#4550](https://github.com/opensearch-project/OpenSearch/pull/4550)) - Bumps `hadoop-hdfs` from 3.3.3 to 3.3.4 ([#4644](https://github.com/opensearch-project/OpenSearch/pull/4644)) - Bumps `jna` from 5.11.0 to 5.12.1 ([#4656](https://github.com/opensearch-project/OpenSearch/pull/4656)) +- Update Jackson Databind to 2.13.4.2 (addressing CVE-2022-42003) ([#4779](https://github.com/opensearch-project/OpenSearch/pull/4779)) ### Changed - Dependency updates (httpcore, mockito, slf4j, httpasyncclient, commons-codec) ([#4308](https://github.com/opensearch-project/OpenSearch/pull/4308)) - Use RemoteSegmentStoreDirectory instead of RemoteDirectory ([#4240](https://github.com/opensearch-project/OpenSearch/pull/4240)) @@ -67,6 +68,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - GET api for weighted shard routing([#4275](https://github.com/opensearch-project/OpenSearch/pull/4275/)) - Unmute test RelocationIT.testRelocationWhileIndexingRandom ([#4580](https://github.com/opensearch-project/OpenSearch/pull/4580)) - Add DecommissionService and helper to execute awareness attribute decommissioning ([#4084](https://github.com/opensearch-project/OpenSearch/pull/4084)) +- Delete api for weighted shard routing([#4400](https://github.com/opensearch-project/OpenSearch/pull/4400/)) - Further simplification of the ZIP publication implementation ([#4360](https://github.com/opensearch-project/OpenSearch/pull/4360)) - Relax visibility of the HTTP_CHANNEL_KEY and HTTP_SERVER_CHANNEL_KEY to make it possible for the plugins to access associated Netty4HttpChannel / Netty4HttpServerChannel instance ([#4638](https://github.com/opensearch-project/OpenSearch/pull/4638)) - Load the deprecated master role in a dedicated method instead of in setAdditionalRoles() ([#4582](https://github.com/opensearch-project/OpenSearch/pull/4582)) @@ -76,6 +78,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Update to Apache Lucene 9.4.0 ([#4661](https://github.com/opensearch-project/OpenSearch/pull/4661)) - Controlling discovery for decommissioned nodes ([#4590](https://github.com/opensearch-project/OpenSearch/pull/4590)) - Backport Apache Lucene version change for 2.4.0 ([#4677](https://github.com/opensearch-project/OpenSearch/pull/4677)) +- Use ReplicationFailedException instead of OpensearchException in ReplicationTarget ([#4725](https://github.com/opensearch-project/OpenSearch/pull/4725)) - Fix weighted routing metadata deserialization error on process restart ([#4691](https://github.com/opensearch-project/OpenSearch/pull/4691)) - Refactor Base Action class javadocs to OpenSearch.API ([#4732](https://github.com/opensearch-project/OpenSearch/pull/4732)) - Migrate client transports to Apache HttpClient / Core 5.x ([#4459](https://github.com/opensearch-project/OpenSearch/pull/4459)) @@ -129,6 +132,8 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/) - Fix SearchStats (de)serialization (caused by https://github.com/opensearch-project/OpenSearch/pull/4616) ([#4697](https://github.com/opensearch-project/OpenSearch/pull/4697)) - Fixing Gradle warnings associated with publishPluginZipPublicationToXxx tasks ([#4696](https://github.com/opensearch-project/OpenSearch/pull/4696)) - [BUG]: Remove redundant field from GetDecommissionStateResponse ([#4751](https://github.com/opensearch-project/OpenSearch/pull/4751)) +- Fixed randomly failing test ([4774](https://github.com/opensearch-project/OpenSearch/pull/4774)) +- Update version check after backport ([4786](https://github.com/opensearch-project/OpenSearch/pull/4786)) ### Security - CVE-2022-25857 org.yaml:snakeyaml DOS vulnerability ([#4341](https://github.com/opensearch-project/OpenSearch/pull/4341)) diff --git a/buildSrc/version.properties b/buildSrc/version.properties index a779389b3ca82..08784c82a4cc4 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -10,7 +10,7 @@ bundled_jdk = 17.0.4+8 spatial4j = 0.7 jts = 1.15.0 jackson = 2.13.4 -jackson_databind = 2.13.4 +jackson_databind = 2.13.4.2 snakeyaml = 1.32 icu4j = 70.1 supercsv = 2.4.0 diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java index ab9982657a410..d522bf8c4d005 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RestHighLevelClientTests.java @@ -889,6 +889,7 @@ public void testApiNamingConventions() throws Exception { "remote_store.restore", "cluster.put_weighted_routing", "cluster.get_weighted_routing", + "cluster.delete_weighted_routing", "cluster.put_decommission_awareness", "cluster.get_decommission_awareness", "cluster.delete_decommission_awareness", }; diff --git a/distribution/tools/upgrade-cli/licenses/jackson-databind-2.13.4.2.jar.sha1 b/distribution/tools/upgrade-cli/licenses/jackson-databind-2.13.4.2.jar.sha1 new file mode 100644 index 0000000000000..a7782e8aac18d --- /dev/null +++ b/distribution/tools/upgrade-cli/licenses/jackson-databind-2.13.4.2.jar.sha1 @@ -0,0 +1 @@ +325c06bdfeb628cfb80ebaaf1a26cc1eb558a585 \ No newline at end of file diff --git a/distribution/tools/upgrade-cli/licenses/jackson-databind-2.13.4.jar.sha1 b/distribution/tools/upgrade-cli/licenses/jackson-databind-2.13.4.jar.sha1 deleted file mode 100644 index fcc6491d1f78d..0000000000000 --- a/distribution/tools/upgrade-cli/licenses/jackson-databind-2.13.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -98b0edfa8e4084078f10b7b356c300ded4a71491 \ No newline at end of file diff --git a/modules/ingest-geoip/licenses/jackson-databind-2.13.4.2.jar.sha1 b/modules/ingest-geoip/licenses/jackson-databind-2.13.4.2.jar.sha1 new file mode 100644 index 0000000000000..a7782e8aac18d --- /dev/null +++ b/modules/ingest-geoip/licenses/jackson-databind-2.13.4.2.jar.sha1 @@ -0,0 +1 @@ +325c06bdfeb628cfb80ebaaf1a26cc1eb558a585 \ No newline at end of file diff --git a/modules/ingest-geoip/licenses/jackson-databind-2.13.4.jar.sha1 b/modules/ingest-geoip/licenses/jackson-databind-2.13.4.jar.sha1 deleted file mode 100644 index fcc6491d1f78d..0000000000000 --- a/modules/ingest-geoip/licenses/jackson-databind-2.13.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -98b0edfa8e4084078f10b7b356c300ded4a71491 \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/jackson-databind-2.13.4.2.jar.sha1 b/plugins/discovery-ec2/licenses/jackson-databind-2.13.4.2.jar.sha1 new file mode 100644 index 0000000000000..a7782e8aac18d --- /dev/null +++ b/plugins/discovery-ec2/licenses/jackson-databind-2.13.4.2.jar.sha1 @@ -0,0 +1 @@ +325c06bdfeb628cfb80ebaaf1a26cc1eb558a585 \ No newline at end of file diff --git a/plugins/discovery-ec2/licenses/jackson-databind-2.13.4.jar.sha1 b/plugins/discovery-ec2/licenses/jackson-databind-2.13.4.jar.sha1 deleted file mode 100644 index fcc6491d1f78d..0000000000000 --- a/plugins/discovery-ec2/licenses/jackson-databind-2.13.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -98b0edfa8e4084078f10b7b356c300ded4a71491 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/jackson-databind-2.13.4.2.jar.sha1 b/plugins/repository-azure/licenses/jackson-databind-2.13.4.2.jar.sha1 new file mode 100644 index 0000000000000..a7782e8aac18d --- /dev/null +++ b/plugins/repository-azure/licenses/jackson-databind-2.13.4.2.jar.sha1 @@ -0,0 +1 @@ +325c06bdfeb628cfb80ebaaf1a26cc1eb558a585 \ No newline at end of file diff --git a/plugins/repository-azure/licenses/jackson-databind-2.13.4.jar.sha1 b/plugins/repository-azure/licenses/jackson-databind-2.13.4.jar.sha1 deleted file mode 100644 index fcc6491d1f78d..0000000000000 --- a/plugins/repository-azure/licenses/jackson-databind-2.13.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -98b0edfa8e4084078f10b7b356c300ded4a71491 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/jackson-databind-2.13.4.2.jar.sha1 b/plugins/repository-hdfs/licenses/jackson-databind-2.13.4.2.jar.sha1 new file mode 100644 index 0000000000000..a7782e8aac18d --- /dev/null +++ b/plugins/repository-hdfs/licenses/jackson-databind-2.13.4.2.jar.sha1 @@ -0,0 +1 @@ +325c06bdfeb628cfb80ebaaf1a26cc1eb558a585 \ No newline at end of file diff --git a/plugins/repository-hdfs/licenses/jackson-databind-2.13.4.jar.sha1 b/plugins/repository-hdfs/licenses/jackson-databind-2.13.4.jar.sha1 deleted file mode 100644 index fcc6491d1f78d..0000000000000 --- a/plugins/repository-hdfs/licenses/jackson-databind-2.13.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -98b0edfa8e4084078f10b7b356c300ded4a71491 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/jackson-databind-2.13.4.2.jar.sha1 b/plugins/repository-s3/licenses/jackson-databind-2.13.4.2.jar.sha1 new file mode 100644 index 0000000000000..a7782e8aac18d --- /dev/null +++ b/plugins/repository-s3/licenses/jackson-databind-2.13.4.2.jar.sha1 @@ -0,0 +1 @@ +325c06bdfeb628cfb80ebaaf1a26cc1eb558a585 \ No newline at end of file diff --git a/plugins/repository-s3/licenses/jackson-databind-2.13.4.jar.sha1 b/plugins/repository-s3/licenses/jackson-databind-2.13.4.jar.sha1 deleted file mode 100644 index fcc6491d1f78d..0000000000000 --- a/plugins/repository-s3/licenses/jackson-databind-2.13.4.jar.sha1 +++ /dev/null @@ -1 +0,0 @@ -98b0edfa8e4084078f10b7b356c300ded4a71491 \ No newline at end of file diff --git a/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.delete_weighted_routing.json b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.delete_weighted_routing.json new file mode 100644 index 0000000000000..2cd4081b645e8 --- /dev/null +++ b/rest-api-spec/src/main/resources/rest-api-spec/api/cluster.delete_weighted_routing.json @@ -0,0 +1,19 @@ +{ + "cluster.delete_weighted_routing": { + "documentation": { + "url": "https://opensearch.org/docs/latest/opensearch/rest-api/weighted-routing/delete", + "description": "Delete weighted shard routing weights" + }, + "stability": "stable", + "url": { + "paths": [ + { + "path": "/_cluster/routing/awareness/weights", + "methods": [ + "DELETE" + ] + } + ] + } + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java index 61f82877bf12b..bba07d878a42c 100644 --- a/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/cluster/routing/WeightedRoutingIT.java @@ -9,6 +9,7 @@ package org.opensearch.cluster.routing; import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; import org.opensearch.common.settings.Settings; @@ -284,4 +285,67 @@ public void testWeightedRoutingMetadataOnOSProcessRestart() throws Exception { ensureGreen(); assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); } + + public void testDeleteWeightedRouting_WeightsNotSet() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + internalCluster().startNodes( + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // delete weighted routing metadata + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); + assertTrue(deleteResponse.isAcknowledged()); + assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + } + + public void testDeleteWeightedRouting_WeightsAreSet() { + Settings commonSettings = Settings.builder() + .put("cluster.routing.allocation.awareness.attributes", "zone") + .put("cluster.routing.allocation.awareness.force.zone.values", "a,b,c") + .build(); + + internalCluster().startNodes( + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("3").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + ensureGreen(); + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 2.0, "c", 3.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + // put api call to set weights + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertEquals(response.isAcknowledged(), true); + assertNotNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + + // delete weighted routing metadata + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); + assertTrue(deleteResponse.isAcknowledged()); + assertNull(internalCluster().clusterService().state().metadata().weightedRoutingMetadata()); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java new file mode 100644 index 0000000000000..097775b7ab4ac --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/search/SearchWeightedRoutingIT.java @@ -0,0 +1,161 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search; + +import org.junit.Assert; +import org.opensearch.action.admin.cluster.health.ClusterHealthResponse; +import org.opensearch.action.admin.cluster.node.stats.NodeStats; +import org.opensearch.action.admin.cluster.node.stats.NodesStatsResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingResponse; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.WeightedRouting; +import org.opensearch.cluster.routing.allocation.decider.AwarenessAllocationDecider; +import org.opensearch.common.settings.Settings; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.index.search.stats.SearchStats; +import org.opensearch.test.OpenSearchIntegTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0, minNumDataNodes = 3) +public class SearchWeightedRoutingIT extends OpenSearchIntegTestCase { + @Override + protected int numberOfReplicas() { + return 2; + } + + public void testSearchWithWRRShardRouting() throws IOException { + Settings commonSettings = Settings.builder() + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_FORCE_GROUP_SETTING.getKey() + "zone" + ".values", "a,b,c") + .put(AwarenessAllocationDecider.CLUSTER_ROUTING_ALLOCATION_AWARENESS_ATTRIBUTE_SETTING.getKey(), "zone") + .build(); + + logger.info("--> starting 6 nodes on different zones"); + List nodes = internalCluster().startNodes( + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "b").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "a").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build(), + Settings.builder().put(commonSettings).put("node.attr.zone", "c").build() + ); + + String A_0 = nodes.get(0); + String B_0 = nodes.get(1); + String B_1 = nodes.get(2); + String A_1 = nodes.get(3); + String C_0 = nodes.get(4); + String C_1 = nodes.get(5); + + logger.info("--> waiting for nodes to form a cluster"); + ClusterHealthResponse health = client().admin().cluster().prepareHealth().setWaitForNodes("6").execute().actionGet(); + assertThat(health.isTimedOut(), equalTo(false)); + + assertAcked( + prepareCreate("test").setSettings(Settings.builder().put("index.number_of_shards", 10).put("index.number_of_replicas", 2)) + ); + ensureGreen(); + logger.info("--> creating indices for test"); + for (int i = 0; i < 100; i++) { + client().prepareIndex("test_" + i).setId("" + i).setSource("field_" + i, "value_" + i).get(); + } + + logger.info("--> setting shard routing weights for weighted round robin"); + Map weights = Map.of("a", 1.0, "b", 1.0, "c", 0.0); + WeightedRouting weightedRouting = new WeightedRouting("zone", weights); + + ClusterPutWeightedRoutingResponse response = client().admin() + .cluster() + .prepareWeightedRouting() + .setWeightedRouting(weightedRouting) + .get(); + assertEquals(response.isAcknowledged(), true); + + Set hitNodes = new HashSet<>(); + // making search requests + for (int i = 0; i < 50; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(A_0, A_1, B_0, B_1)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } + // search should not go to nodes in zone c + assertThat(hitNodes.size(), lessThanOrEqualTo(4)); + DiscoveryNodes dataNodes = internalCluster().clusterService().state().nodes(); + List nodeIdsFromZoneWithWeightZero = new ArrayList<>(); + for (DiscoveryNode node : dataNodes) { + if (node.getAttributes().get("zone").equals("c")) { + nodeIdsFromZoneWithWeightZero.add(node.getId()); + } + } + for (String nodeId : nodeIdsFromZoneWithWeightZero) { + assertFalse(hitNodes.contains(nodeId)); + } + + NodesStatsResponse nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); + for (NodeStats stat : nodeStats.getNodes()) { + SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); + if (stat.getNode().getAttributes().get("zone").equals("c")) { + assertEquals(0, searchStats.getQueryCount()); + assertEquals(0, searchStats.getFetchCount()); + + } else { + Assert.assertTrue(searchStats.getQueryCount() > 0L); + Assert.assertTrue(searchStats.getFetchCount() > 0L); + } + } + + logger.info("--> deleted shard routing weights for weighted round robin"); + + ClusterDeleteWeightedRoutingResponse deleteResponse = client().admin().cluster().prepareDeleteWeightedRouting().get(); + assertEquals(deleteResponse.isAcknowledged(), true); + + hitNodes = new HashSet<>(); + // making search requests + for (int i = 0; i < 100; i++) { + SearchResponse searchResponse = internalCluster().client(randomFrom(A_0, A_1, B_0, B_1)) + .prepareSearch() + .setQuery(QueryBuilders.matchAllQuery()) + .get(); + assertEquals(searchResponse.getFailedShards(), 0); + for (int j = 0; j < searchResponse.getHits().getHits().length; j++) { + hitNodes.add(searchResponse.getHits().getAt(j).getShard().getNodeId()); + } + } + + // Check shard routing requests hit data nodes in zone c + for (String nodeId : nodeIdsFromZoneWithWeightZero) { + assertFalse(!hitNodes.contains(nodeId)); + } + nodeStats = client().admin().cluster().prepareNodesStats().execute().actionGet(); + + for (NodeStats stat : nodeStats.getNodes()) { + SearchStats.Stats searchStats = stat.getIndices().getSearch().getTotal(); + Assert.assertTrue(searchStats.getQueryCount() > 0L); + Assert.assertTrue(searchStats.getFetchCount() > 0L); + } + } + +} diff --git a/server/src/main/java/org/opensearch/action/ActionModule.java b/server/src/main/java/org/opensearch/action/ActionModule.java index c2c11d69f134c..84bc9b395c5dc 100644 --- a/server/src/main/java/org/opensearch/action/ActionModule.java +++ b/server/src/main/java/org/opensearch/action/ActionModule.java @@ -85,6 +85,8 @@ import org.opensearch.action.admin.cluster.settings.TransportClusterUpdateSettingsAction; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsAction; import org.opensearch.action.admin.cluster.shards.TransportClusterSearchShardsAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.TransportDeleteWeightedRoutingAction; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingAction; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.TransportGetWeightedRoutingAction; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction; @@ -302,6 +304,7 @@ import org.opensearch.rest.action.admin.cluster.RestClearVotingConfigExclusionsAction; import org.opensearch.rest.action.admin.cluster.RestCloneSnapshotAction; import org.opensearch.rest.action.admin.cluster.RestClusterAllocationExplainAction; +import org.opensearch.rest.action.admin.cluster.RestClusterDeleteWeightedRoutingAction; import org.opensearch.rest.action.admin.cluster.RestClusterGetSettingsAction; import org.opensearch.rest.action.admin.cluster.RestClusterGetWeightedRoutingAction; import org.opensearch.rest.action.admin.cluster.RestClusterHealthAction; @@ -580,6 +583,7 @@ public void reg actions.register(ClusterAddWeightedRoutingAction.INSTANCE, TransportAddWeightedRoutingAction.class); actions.register(ClusterGetWeightedRoutingAction.INSTANCE, TransportGetWeightedRoutingAction.class); + actions.register(ClusterDeleteWeightedRoutingAction.INSTANCE, TransportDeleteWeightedRoutingAction.class); actions.register(IndicesStatsAction.INSTANCE, TransportIndicesStatsAction.class); actions.register(IndicesSegmentsAction.INSTANCE, TransportIndicesSegmentsAction.class); actions.register(IndicesShardStoresAction.INSTANCE, TransportIndicesShardStoresAction.class); @@ -766,8 +770,10 @@ public void initRestHandlers(Supplier nodesInCluster) { registerHandler.accept(new RestCloseIndexAction()); registerHandler.accept(new RestOpenIndexAction()); registerHandler.accept(new RestAddIndexBlockAction()); + registerHandler.accept(new RestClusterPutWeightedRoutingAction()); registerHandler.accept(new RestClusterGetWeightedRoutingAction()); + registerHandler.accept(new RestClusterDeleteWeightedRoutingAction()); registerHandler.accept(new RestUpdateSettingsAction()); registerHandler.accept(new RestGetSettingsAction()); diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingAction.java new file mode 100644 index 0000000000000..aa438cd31b934 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingAction.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.delete; + +import org.opensearch.action.ActionType; + +/** + * Action to delete weights for weighted round-robin shard routing policy. + * + * @opensearch.internal + */ +public class ClusterDeleteWeightedRoutingAction extends ActionType { + public static final ClusterDeleteWeightedRoutingAction INSTANCE = new ClusterDeleteWeightedRoutingAction(); + public static final String NAME = "cluster:admin/routing/awareness/weights/delete"; + + private ClusterDeleteWeightedRoutingAction() { + super(NAME, ClusterDeleteWeightedRoutingResponse::new); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java new file mode 100644 index 0000000000000..71eab8ff35a2d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequest.java @@ -0,0 +1,44 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.delete; + +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.support.clustermanager.ClusterManagerNodeRequest; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Request to delete weights for weighted round-robin shard routing policy. + * + * @opensearch.internal + */ +public class ClusterDeleteWeightedRoutingRequest extends ClusterManagerNodeRequest { + public ClusterDeleteWeightedRoutingRequest() {} + + public ClusterDeleteWeightedRoutingRequest(StreamInput in) throws IOException { + super(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + } + + @Override + public String toString() { + return "ClusterDeleteWeightedRoutingRequest"; + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java new file mode 100644 index 0000000000000..19976ac6b07aa --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingRequestBuilder.java @@ -0,0 +1,27 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.delete; + +import org.opensearch.action.support.clustermanager.ClusterManagerNodeOperationRequestBuilder; +import org.opensearch.client.OpenSearchClient; + +/** + * Request builder to delete weights for weighted round-robin shard routing policy. + * + * @opensearch.internal + */ +public class ClusterDeleteWeightedRoutingRequestBuilder extends ClusterManagerNodeOperationRequestBuilder< + ClusterDeleteWeightedRoutingRequest, + ClusterDeleteWeightedRoutingResponse, + ClusterDeleteWeightedRoutingRequestBuilder> { + + public ClusterDeleteWeightedRoutingRequestBuilder(OpenSearchClient client, ClusterDeleteWeightedRoutingAction action) { + super(client, action, new ClusterDeleteWeightedRoutingRequest()); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingResponse.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingResponse.java new file mode 100644 index 0000000000000..b98ac6c0c55be --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/ClusterDeleteWeightedRoutingResponse.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.delete; + +import org.opensearch.action.support.master.AcknowledgedResponse; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.common.io.stream.StreamOutput; + +import java.io.IOException; + +/** + * Response from deleting weights for weighted round-robin search routing policy. + * + * @opensearch.internal + */ +public class ClusterDeleteWeightedRoutingResponse extends AcknowledgedResponse { + + ClusterDeleteWeightedRoutingResponse(StreamInput in) throws IOException { + super(in); + } + + public ClusterDeleteWeightedRoutingResponse(boolean acknowledged) { + super(acknowledged); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/TransportDeleteWeightedRoutingAction.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/TransportDeleteWeightedRoutingAction.java new file mode 100644 index 0000000000000..8f88d8af71b70 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/TransportDeleteWeightedRoutingAction.java @@ -0,0 +1,86 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.admin.cluster.shards.routing.weighted.delete; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.ActionListener; +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlockException; +import org.opensearch.cluster.block.ClusterBlockLevel; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.routing.WeightedRoutingService; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.io.stream.StreamInput; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +import java.io.IOException; + +/** + * Transport action for deleting weights for weighted round-robin search routing policy + * + * @opensearch.internal + */ +public class TransportDeleteWeightedRoutingAction extends TransportClusterManagerNodeAction< + ClusterDeleteWeightedRoutingRequest, + ClusterDeleteWeightedRoutingResponse> { + + private static final Logger logger = LogManager.getLogger(TransportDeleteWeightedRoutingAction.class); + + private final WeightedRoutingService weightedRoutingService; + + @Inject + public TransportDeleteWeightedRoutingAction( + TransportService transportService, + ClusterService clusterService, + WeightedRoutingService weightedRoutingService, + ThreadPool threadPool, + ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super( + ClusterDeleteWeightedRoutingAction.NAME, + transportService, + clusterService, + threadPool, + actionFilters, + ClusterDeleteWeightedRoutingRequest::new, + indexNameExpressionResolver + ); + this.weightedRoutingService = weightedRoutingService; + } + + @Override + protected String executor() { + return ThreadPool.Names.SAME; + } + + @Override + protected ClusterDeleteWeightedRoutingResponse read(StreamInput in) throws IOException { + return new ClusterDeleteWeightedRoutingResponse(in); + } + + @Override + protected ClusterBlockException checkBlock(ClusterDeleteWeightedRoutingRequest request, ClusterState state) { + return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); + } + + @Override + protected void clusterManagerOperation( + ClusterDeleteWeightedRoutingRequest request, + ClusterState state, + ActionListener listener + ) throws Exception { + weightedRoutingService.deleteWeightedRoutingMetadata(request, listener); + } +} diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/package-info.java b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/package-info.java new file mode 100644 index 0000000000000..d24c88ec674f3 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/admin/cluster/shards/routing/weighted/delete/package-info.java @@ -0,0 +1,10 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** delete weighted-round robin shard routing weights. */ +package org.opensearch.action.admin.cluster.shards.routing.weighted.delete; diff --git a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java index 3ecf5ab19c0e4..5fd83244f3dea 100644 --- a/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/cluster/snapshots/restore/RestoreSnapshotRequest.java @@ -152,7 +152,7 @@ public RestoreSnapshotRequest(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(LegacyESVersion.V_7_10_0)) { snapshotUuid = in.readOptionalString(); } - if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && in.getVersion().onOrAfter(Version.V_2_4_0)) { storageType = in.readEnum(StorageType.class); } } @@ -182,7 +182,7 @@ public void writeTo(StreamOutput out) throws IOException { "restricting the snapshot UUID is forbidden in a cluster with version [" + out.getVersion() + "] nodes" ); } - if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && out.getVersion().onOrAfter(Version.V_2_4_0)) { out.writeEnum(storageType); } } diff --git a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java index 5551c45ba9e57..77ddb5e17c742 100644 --- a/server/src/main/java/org/opensearch/client/ClusterAdminClient.java +++ b/server/src/main/java/org/opensearch/client/ClusterAdminClient.java @@ -95,6 +95,9 @@ import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequestBuilder; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequest; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequestBuilder; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingResponse; @@ -837,6 +840,21 @@ public interface ClusterAdminClient extends OpenSearchClient { */ ClusterGetWeightedRoutingRequestBuilder prepareGetWeightedRouting(); + /** + * Deletes weights for weighted round-robin search routing policy. + */ + ActionFuture deleteWeightedRouting(ClusterDeleteWeightedRoutingRequest request); + + /** + * Deletes weights for weighted round-robin search routing policy. + */ + void deleteWeightedRouting(ClusterDeleteWeightedRoutingRequest request, ActionListener listener); + + /** + * Deletes weights for weighted round-robin search routing policy. + */ + ClusterDeleteWeightedRoutingRequestBuilder prepareDeleteWeightedRouting(); + /** * Decommission awareness attribute */ diff --git a/server/src/main/java/org/opensearch/client/Requests.java b/server/src/main/java/org/opensearch/client/Requests.java index 87219dda49825..21f2a2d906602 100644 --- a/server/src/main/java/org/opensearch/client/Requests.java +++ b/server/src/main/java/org/opensearch/client/Requests.java @@ -52,6 +52,7 @@ import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequest; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.delete.DeleteSnapshotRequest; import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest; @@ -572,6 +573,15 @@ public static ClusterGetWeightedRoutingRequest getWeightedRoutingRequest(String return new ClusterGetWeightedRoutingRequest(attributeName); } + /** + * Deletes weights for weighted round-robin search routing policy + * + * @return delete weight request + */ + public static ClusterDeleteWeightedRoutingRequest deleteWeightedRoutingRequest() { + return new ClusterDeleteWeightedRoutingRequest(); + } + /** * Creates a new decommission request. * diff --git a/server/src/main/java/org/opensearch/client/support/AbstractClient.java b/server/src/main/java/org/opensearch/client/support/AbstractClient.java index bd82299725435..b42010d4253d5 100644 --- a/server/src/main/java/org/opensearch/client/support/AbstractClient.java +++ b/server/src/main/java/org/opensearch/client/support/AbstractClient.java @@ -122,6 +122,10 @@ import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequest; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsRequestBuilder; import org.opensearch.action.admin.cluster.shards.ClusterSearchShardsResponse; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingAction; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequestBuilder; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingAction; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequest; import org.opensearch.action.admin.cluster.shards.routing.weighted.get.ClusterGetWeightedRoutingRequestBuilder; @@ -1328,6 +1332,24 @@ public ClusterGetWeightedRoutingRequestBuilder prepareGetWeightedRouting() { return new ClusterGetWeightedRoutingRequestBuilder(this, ClusterGetWeightedRoutingAction.INSTANCE); } + @Override + public ActionFuture deleteWeightedRouting(ClusterDeleteWeightedRoutingRequest request) { + return execute(ClusterDeleteWeightedRoutingAction.INSTANCE, request); + } + + @Override + public void deleteWeightedRouting( + ClusterDeleteWeightedRoutingRequest request, + ActionListener listener + ) { + execute(ClusterDeleteWeightedRoutingAction.INSTANCE, request, listener); + } + + @Override + public ClusterDeleteWeightedRoutingRequestBuilder prepareDeleteWeightedRouting() { + return new ClusterDeleteWeightedRoutingRequestBuilder(this, ClusterDeleteWeightedRoutingAction.INSTANCE); + } + @Override public void deleteDanglingIndex(DeleteDanglingIndexRequest request, ActionListener listener) { execute(DeleteDanglingIndexAction.INSTANCE, request, listener); diff --git a/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java b/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java index 728bf9d1ae90e..3e4feb02686d6 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java +++ b/server/src/main/java/org/opensearch/cluster/routing/RecoverySource.java @@ -287,7 +287,7 @@ public SnapshotRecoverySource( } else { index = new IndexId(in.readString(), IndexMetadata.INDEX_UUID_NA_VALUE); } - if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && in.getVersion().onOrAfter(Version.V_2_4_0)) { isSearchableSnapshot = in.readBoolean(); } else { isSearchableSnapshot = false; @@ -330,7 +330,7 @@ protected void writeAdditionalFields(StreamOutput out) throws IOException { } else { out.writeString(index.getName()); } - if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (FeatureFlags.isEnabled(FeatureFlags.SEARCHABLE_SNAPSHOT) && out.getVersion().onOrAfter(Version.V_2_4_0)) { out.writeBoolean(isSearchableSnapshot); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java index 7ff2b23630a3c..54f2c81aea384 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/WeightedRoutingService.java @@ -12,7 +12,9 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequest; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; @@ -106,6 +108,34 @@ private boolean checkIfSameWeightsInMetadata( return newWeightedRoutingMetadata.getWeightedRouting().equals(oldWeightedRoutingMetadata.getWeightedRouting()); } + public void deleteWeightedRoutingMetadata( + final ClusterDeleteWeightedRoutingRequest request, + final ActionListener listener + ) { + clusterService.submitStateUpdateTask("delete_weighted_routing", new ClusterStateUpdateTask(Priority.URGENT) { + @Override + public ClusterState execute(ClusterState currentState) { + logger.info("Deleting weighted routing metadata from the cluster state"); + Metadata.Builder mdBuilder = Metadata.builder(currentState.metadata()); + mdBuilder.removeCustom(WeightedRoutingMetadata.TYPE); + return ClusterState.builder(currentState).metadata(mdBuilder).build(); + } + + @Override + public void onFailure(String source, Exception e) { + logger.error("failed to remove weighted routing metadata from cluster state", e); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + logger.debug("cluster weighted routing metadata change is processed by all the nodes"); + assert newState.metadata().weightedRoutingMetadata() == null; + listener.onResponse(new ClusterDeleteWeightedRoutingResponse(true)); + } + }); + } + List getAwarenessAttributes() { return awarenessAttributes; } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryFailedException.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryFailedException.java index f2665e8bbb1a7..12393ab12c95d 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryFailedException.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryFailedException.java @@ -32,11 +32,11 @@ package org.opensearch.indices.recovery; -import org.opensearch.OpenSearchException; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.common.Nullable; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.index.shard.ShardId; +import org.opensearch.indices.replication.common.ReplicationFailedException; import java.io.IOException; @@ -45,7 +45,7 @@ * * @opensearch.internal */ -public class RecoveryFailedException extends OpenSearchException { +public class RecoveryFailedException extends ReplicationFailedException { public RecoveryFailedException(StartRecoveryRequest request, Throwable cause) { this(request, null, cause); diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java index b93c054ffa4bf..c8c2fbfc896eb 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryListener.java @@ -8,9 +8,9 @@ package org.opensearch.indices.recovery; -import org.opensearch.OpenSearchException; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.indices.cluster.IndicesClusterStateService; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; @@ -49,7 +49,7 @@ public void onDone(ReplicationState state) { } @Override - public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { indicesClusterStateService.handleRecoveryFailure(shardRouting, sendShardFailure, e); } } diff --git a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java index 7acc6b8b54fdd..c1e29e0d866d8 100644 --- a/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java +++ b/server/src/main/java/org/opensearch/indices/recovery/RecoveryTarget.java @@ -37,7 +37,6 @@ import org.apache.lucene.index.IndexFormatTooOldException; import org.opensearch.Assertions; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.cluster.node.DiscoveryNode; @@ -56,10 +55,11 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationFailedException; +import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationLuceneIndex; import org.opensearch.indices.replication.common.ReplicationTarget; -import org.opensearch.indices.replication.common.ReplicationListener; -import org.opensearch.indices.replication.common.ReplicationCollection; import java.io.IOException; import java.nio.channels.FileChannel; @@ -135,7 +135,7 @@ public String description() { } @Override - public void notifyListener(OpenSearchException e, boolean sendShardFailure) { + public void notifyListener(ReplicationFailedException e, boolean sendShardFailure) { listener.onFailure(state(), new RecoveryFailedException(state(), e.getMessage(), e), sendShardFailure); } diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java index 26bec2203c599..c12d9b7165ae0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java @@ -18,7 +18,6 @@ import org.apache.lucene.store.ByteBuffersIndexInput; import org.apache.lucene.store.ChecksumIndexInput; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; import org.opensearch.common.UUIDs; @@ -105,16 +104,14 @@ public String description() { } @Override - public void notifyListener(OpenSearchException e, boolean sendShardFailure) { + public void notifyListener(ReplicationFailedException e, boolean sendShardFailure) { // Cancellations still are passed to our SegmentReplicationListner as failures, if we have failed because of cancellation // update the stage. final Throwable cancelledException = ExceptionsHelper.unwrap(e, CancellableThreads.ExecutionCancelledException.class); if (cancelledException != null) { state.setStage(SegmentReplicationState.Stage.CANCELLED); - listener.onFailure(state(), (CancellableThreads.ExecutionCancelledException) cancelledException, sendShardFailure); - } else { - listener.onFailure(state(), e, sendShardFailure); } + listener.onFailure(state(), e, sendShardFailure); } @Override @@ -150,7 +147,7 @@ public void startReplication(ActionListener listener) { // SegmentReplicationSource does not share CancellableThreads. final CancellableThreads.ExecutionCancelledException executionCancelledException = new CancellableThreads.ExecutionCancelledException("replication was canceled reason [" + reason + "]"); - notifyListener(executionCancelledException, false); + notifyListener(new ReplicationFailedException("Segment replication failed", executionCancelledException), false); throw executionCancelledException; }); state.setStage(SegmentReplicationState.Stage.REPLICATING); diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java index 8fc53ccd3bc08..b633f0fa3b9a0 100644 --- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java +++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTargetService.java @@ -12,7 +12,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.routing.ShardRouting; import org.opensearch.common.Nullable; @@ -27,6 +26,7 @@ import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.ReplicationCollection; import org.opensearch.indices.replication.common.ReplicationCollection.ReplicationRef; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.tasks.Task; @@ -196,7 +196,7 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { logger.trace( () -> new ParameterizedMessage( "[shardId {}] [replication id {}] Replication failed, timing data: {}", @@ -249,13 +249,13 @@ default void onDone(ReplicationState state) { } @Override - default void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + default void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { onReplicationFailure((SegmentReplicationState) state, e, sendShardFailure); } void onReplicationDone(SegmentReplicationState state); - void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure); + void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure); } /** @@ -293,13 +293,14 @@ public void onFailure(Exception e) { Throwable cause = ExceptionsHelper.unwrapCause(e); if (cause instanceof CancellableThreads.ExecutionCancelledException) { if (onGoingReplications.getTarget(replicationId) != null) { + IndexShard indexShard = onGoingReplications.getTarget(replicationId).indexShard(); // if the target still exists in our collection, the primary initiated the cancellation, fail the replication // but do not fail the shard. Cancellations initiated by this node from Index events will be removed with // onGoingReplications.cancel and not appear in the collection when this listener resolves. - onGoingReplications.fail(replicationId, (CancellableThreads.ExecutionCancelledException) cause, false); + onGoingReplications.fail(replicationId, new ReplicationFailedException(indexShard, cause), false); } } else { - onGoingReplications.fail(replicationId, new OpenSearchException("Segment Replication failed", e), true); + onGoingReplications.fail(replicationId, new ReplicationFailedException("Segment Replication failed", e), true); } } }); diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java index 20600856c9444..e918ac0a79691 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationCollection.java @@ -34,8 +34,6 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.OpenSearchException; -import org.opensearch.OpenSearchTimeoutException; import org.opensearch.common.concurrent.AutoCloseableRefCounted; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.AbstractRunnable; @@ -134,7 +132,7 @@ public T reset(final long id, final TimeValue activityTimeout) { } catch (Exception e) { // fail shard to be safe assert oldTarget != null; - oldTarget.notifyListener(new OpenSearchException("Unable to reset target", e), true); + oldTarget.notifyListener(new ReplicationFailedException("Unable to reset target", e), true); return null; } } @@ -187,7 +185,7 @@ public boolean cancel(long id, String reason) { * @param e exception with reason for the failure * @param sendShardFailure true a shard failed message should be sent to the master */ - public void fail(long id, OpenSearchException e, boolean sendShardFailure) { + public void fail(long id, ReplicationFailedException e, boolean sendShardFailure) { T removed = onGoingTargetEvents.remove(id); if (removed != null) { logger.trace("failing {}. Send shard failure: [{}]", removed.description(), sendShardFailure); @@ -299,7 +297,7 @@ protected void doRun() throws Exception { String message = "no activity after [" + checkInterval + "]"; fail( id, - new OpenSearchTimeoutException(message), + new ReplicationFailedException(message), true // to be safe, we don't know what go stuck ); return; diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java index afdd0ce466f9b..23ad4d0e096b5 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationFailedException.java @@ -38,4 +38,16 @@ public ReplicationFailedException(ShardId shardId, @Nullable String extraInfo, T public ReplicationFailedException(StreamInput in) throws IOException { super(in); } + + public ReplicationFailedException(Exception e) { + super(e); + } + + public ReplicationFailedException(String msg) { + super(msg); + } + + public ReplicationFailedException(String msg, Throwable cause) { + super(msg, cause); + } } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java index 0666f475d496a..1d3c48d084f60 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationListener.java @@ -8,8 +8,6 @@ package org.opensearch.indices.replication.common; -import org.opensearch.OpenSearchException; - /** * Interface for listeners that run when there's a change in {@link ReplicationState} * @@ -19,5 +17,5 @@ public interface ReplicationListener { void onDone(ReplicationState state); - void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure); + void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure); } diff --git a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java index 42f4572fef3e4..1e2dcbb46256f 100644 --- a/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java +++ b/server/src/main/java/org/opensearch/indices/replication/common/ReplicationTarget.java @@ -11,7 +11,6 @@ import org.apache.logging.log4j.Logger; import org.apache.lucene.store.RateLimiter; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.support.ChannelActionListener; import org.opensearch.common.CheckedFunction; @@ -78,7 +77,7 @@ public CancellableThreads cancellableThreads() { return cancellableThreads; } - public abstract void notifyListener(OpenSearchException e, boolean sendShardFailure); + public abstract void notifyListener(ReplicationFailedException e, boolean sendShardFailure); public ReplicationTarget(String name, IndexShard indexShard, ReplicationLuceneIndex stateIndex, ReplicationListener listener) { super(name); @@ -170,7 +169,7 @@ public void cancel(String reason) { * @param e exception that encapsulates the failure * @param sendShardFailure indicates whether to notify the master of the shard failure */ - public void fail(OpenSearchException e, boolean sendShardFailure) { + public void fail(ReplicationFailedException e, boolean sendShardFailure) { if (finished.compareAndSet(false, true)) { try { notifyListener(e, sendShardFailure); @@ -187,7 +186,7 @@ public void fail(OpenSearchException e, boolean sendShardFailure) { protected void ensureRefCount() { if (refCount() <= 0) { - throw new OpenSearchException( + throw new ReplicationFailedException( "ReplicationTarget is used but it's refcount is 0. Probably a mismatch between incRef/decRef calls" ); } diff --git a/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java new file mode 100644 index 0000000000000..9742cc373d520 --- /dev/null +++ b/server/src/main/java/org/opensearch/rest/action/admin/cluster/RestClusterDeleteWeightedRoutingAction.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.rest.action.admin.cluster; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; +import org.opensearch.client.Requests; +import org.opensearch.client.node.NodeClient; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.action.RestToXContentListener; + +import java.io.IOException; +import java.util.List; + +import static java.util.Collections.singletonList; +import static org.opensearch.rest.RestRequest.Method.DELETE; + +/** + * Delete Weighted Round Robin based shard routing weights + * + * @opensearch.api + * + */ +public class RestClusterDeleteWeightedRoutingAction extends BaseRestHandler { + + private static final Logger logger = LogManager.getLogger(RestClusterDeleteWeightedRoutingAction.class); + + @Override + public List routes() { + return singletonList(new Route(DELETE, "/_cluster/routing/awareness/weights")); + } + + @Override + public String getName() { + return "delete_weighted_routing_action"; + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = Requests.deleteWeightedRoutingRequest(); + return channel -> client.admin() + .cluster() + .deleteWeightedRouting(clusterDeleteWeightedRoutingRequest, new RestToXContentListener<>(channel)); + } +} diff --git a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java index 557c5c7ac910d..fc5d46ef84c79 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/WeightedRoutingServiceTests.java @@ -12,7 +12,9 @@ import org.junit.Before; import org.opensearch.Version; import org.opensearch.action.ActionListener; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingRequest; import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.action.admin.cluster.shards.routing.weighted.delete.ClusterDeleteWeightedRoutingResponse; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterAddWeightedRoutingAction; import org.opensearch.action.admin.cluster.shards.routing.weighted.put.ClusterPutWeightedRoutingRequestBuilder; import org.opensearch.client.node.NodeClient; @@ -233,6 +235,32 @@ public void onFailure(Exception e) { assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); } + public void testDeleteWeightedRoutingMetadata() throws InterruptedException { + Map weights = Map.of("zone_A", 1.0, "zone_B", 1.0, "zone_C", 1.0); + ClusterState state = clusterService.state(); + state = setWeightedRoutingWeights(state, weights); + ClusterState.Builder builder = ClusterState.builder(state); + ClusterServiceUtils.setState(clusterService, builder); + + ClusterDeleteWeightedRoutingRequest clusterDeleteWeightedRoutingRequest = new ClusterDeleteWeightedRoutingRequest(); + final CountDownLatch countDownLatch = new CountDownLatch(1); + ActionListener listener = new ActionListener() { + @Override + public void onResponse(ClusterDeleteWeightedRoutingResponse clusterDeleteWeightedRoutingResponse) { + assertTrue(clusterDeleteWeightedRoutingResponse.isAcknowledged()); + assertNull(clusterService.state().metadata().weightedRoutingMetadata()); + countDownLatch.countDown(); + } + + @Override + public void onFailure(Exception e) { + fail("on failure shouldn't have been called"); + } + }; + weightedRoutingService.deleteWeightedRoutingMetadata(clusterDeleteWeightedRoutingRequest, listener); + assertTrue(countDownLatch.await(30, TimeUnit.SECONDS)); + } + public void testVerifyAwarenessAttribute_InvalidAttributeName() { assertThrows( "invalid awareness attribute %s requested for updating weighted routing", diff --git a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java index 470eeea771f2b..72b77bb706065 100644 --- a/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/IndexShardTests.java @@ -331,12 +331,26 @@ public void testShardStateMetaHashCodeEquals() { randomFrom(ShardStateMetadata.IndexDataLocation.values()) ); - assertEquals(meta, new ShardStateMetadata(meta.primary, meta.indexUUID, meta.allocationId)); - assertEquals(meta.hashCode(), new ShardStateMetadata(meta.primary, meta.indexUUID, meta.allocationId).hashCode()); + assertEquals(meta, new ShardStateMetadata(meta.primary, meta.indexUUID, meta.allocationId, meta.indexDataLocation)); + assertEquals( + meta.hashCode(), + new ShardStateMetadata(meta.primary, meta.indexUUID, meta.allocationId, meta.indexDataLocation).hashCode() + ); - assertFalse(meta.equals(new ShardStateMetadata(!meta.primary, meta.indexUUID, meta.allocationId))); - assertFalse(meta.equals(new ShardStateMetadata(!meta.primary, meta.indexUUID + "foo", meta.allocationId))); - assertFalse(meta.equals(new ShardStateMetadata(!meta.primary, meta.indexUUID + "foo", randomAllocationId()))); + assertNotEquals(meta, new ShardStateMetadata(!meta.primary, meta.indexUUID, meta.allocationId, meta.indexDataLocation)); + assertNotEquals(meta, new ShardStateMetadata(!meta.primary, meta.indexUUID + "foo", meta.allocationId, meta.indexDataLocation)); + assertNotEquals(meta, new ShardStateMetadata(!meta.primary, meta.indexUUID, randomAllocationId(), meta.indexDataLocation)); + assertNotEquals( + meta, + new ShardStateMetadata( + !meta.primary, + meta.indexUUID, + randomAllocationId(), + meta.indexDataLocation == ShardStateMetadata.IndexDataLocation.LOCAL + ? ShardStateMetadata.IndexDataLocation.REMOTE + : ShardStateMetadata.IndexDataLocation.LOCAL + ) + ); Set hashCodes = new HashSet<>(); for (int i = 0; i < 30; i++) { // just a sanity check that we impl hashcode allocationId = randomBoolean() ? null : randomAllocationId(); @@ -349,7 +363,6 @@ public void testShardStateMetaHashCodeEquals() { hashCodes.add(meta.hashCode()); } assertTrue("more than one unique hashcode expected but got: " + hashCodes.size(), hashCodes.size() > 1); - } public void testClosesPreventsNewOperations() throws Exception { diff --git a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java index da04ea1b9914b..b3cbd9b00f0b1 100644 --- a/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java +++ b/server/src/test/java/org/opensearch/index/shard/SegmentReplicationIndexShardTests.java @@ -11,7 +11,6 @@ import org.apache.lucene.index.IndexCommit; import org.apache.lucene.index.SegmentInfos; import org.junit.Assert; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; @@ -44,6 +43,7 @@ import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; import org.opensearch.indices.replication.common.CopyState; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; @@ -670,8 +670,7 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { - assertTrue(e instanceof CancellableThreads.ExecutionCancelledException); + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { assertFalse(sendShardFailure); assertEquals(SegmentReplicationState.Stage.CANCELLED, state.getStage()); latch.countDown(); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index cc5100fba9010..3d04f808bc30c 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -41,7 +41,6 @@ import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.store.AlreadyClosedException; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; import org.opensearch.action.bulk.BulkShardRequest; @@ -70,6 +69,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.translog.SnapshotMatchers; import org.opensearch.index.translog.Translog; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.replication.common.ReplicationType; @@ -471,7 +471,7 @@ public void onDone(ReplicationState state) { } @Override - public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { assertThat(ExceptionsHelper.unwrap(e, IOException.class).getMessage(), equalTo("simulated")); } })) diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java index 7437cb22e44d1..9a9c5b989dea9 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetServiceTests.java @@ -22,6 +22,7 @@ import org.opensearch.index.store.Store; import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; import java.io.IOException; @@ -104,7 +105,7 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { logger.error("Unexpected error", e); Assert.fail("Test should succeed"); } @@ -149,7 +150,7 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onReplicationFailure(SegmentReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { // failures leave state object in last entered stage. assertEquals(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO, state.getStage()); assertEquals(expectedError, e.getCause()); diff --git a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java index 34ca94bba02f3..ffea4aaf6b7c4 100644 --- a/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java +++ b/server/src/test/java/org/opensearch/indices/replication/SegmentReplicationTargetTests.java @@ -27,7 +27,6 @@ import org.junit.Assert; import org.mockito.Mockito; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; @@ -40,6 +39,7 @@ import org.opensearch.index.store.StoreFileMetadata; import org.opensearch.index.store.StoreTests; import org.opensearch.indices.replication.checkpoint.ReplicationCheckpoint; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationType; import org.opensearch.test.DummyShardLock; import org.opensearch.test.IndexSettingsModule; @@ -199,7 +199,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assertEquals(exception, e.getCause().getCause()); - segrepTarget.fail(new OpenSearchException(e), false); + segrepTarget.fail(new ReplicationFailedException(e), false); } }); } @@ -242,7 +242,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assertEquals(exception, e.getCause().getCause()); - segrepTarget.fail(new OpenSearchException(e), false); + segrepTarget.fail(new ReplicationFailedException(e), false); } }); } @@ -287,7 +287,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assertEquals(exception, e.getCause()); - segrepTarget.fail(new OpenSearchException(e), false); + segrepTarget.fail(new ReplicationFailedException(e), false); } }); } @@ -332,7 +332,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assertEquals(exception, e.getCause()); - segrepTarget.fail(new OpenSearchException(e), false); + segrepTarget.fail(new ReplicationFailedException(e), false); } }); } @@ -374,7 +374,7 @@ public void onResponse(Void replicationResponse) { @Override public void onFailure(Exception e) { assert (e instanceof IllegalStateException); - segrepTarget.fail(new OpenSearchException(e), false); + segrepTarget.fail(new ReplicationFailedException(e), false); } }); } diff --git a/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java index 1789dd3b2a288..75ac1075e8ee0 100644 --- a/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java +++ b/server/src/test/java/org/opensearch/recovery/ReplicationCollectionTests.java @@ -39,6 +39,7 @@ import org.opensearch.index.shard.ShardId; import org.opensearch.index.store.Store; import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.indices.recovery.RecoveryState; @@ -59,7 +60,7 @@ public void onDone(ReplicationState state) { } @Override - public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { } }; @@ -93,7 +94,7 @@ public void onDone(ReplicationState state) { } @Override - public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { failed.set(true); latch.countDown(); } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index 1fcdfd79c544e..f520206e0f866 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -39,7 +39,6 @@ import org.apache.lucene.store.IndexInput; import org.junit.Assert; import org.opensearch.ExceptionsHelper; -import org.opensearch.OpenSearchException; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.flush.FlushRequest; @@ -119,6 +118,7 @@ import org.opensearch.indices.replication.checkpoint.SegmentReplicationCheckpointPublisher; import org.opensearch.indices.replication.common.CopyState; import org.opensearch.indices.replication.common.ReplicationCollection; +import org.opensearch.indices.replication.common.ReplicationFailedException; import org.opensearch.indices.replication.common.ReplicationListener; import org.opensearch.indices.replication.common.ReplicationState; import org.opensearch.repositories.IndexId; @@ -181,7 +181,7 @@ public void onDone(ReplicationState state) { } @Override - public void onFailure(ReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onFailure(ReplicationState state, ReplicationFailedException e, boolean sendShardFailure) { throw new AssertionError(e); } }; @@ -1282,7 +1282,11 @@ public void onReplicationDone(SegmentReplicationState state) { } @Override - public void onReplicationFailure(SegmentReplicationState state, OpenSearchException e, boolean sendShardFailure) { + public void onReplicationFailure( + SegmentReplicationState state, + ReplicationFailedException e, + boolean sendShardFailure + ) { logger.error("Unexpected replication failure in test", e); Assert.fail("test replication should not fail: " + e); }