From 91582e7c25a6f76eeebfa3ab74287178e1986491 Mon Sep 17 00:00:00 2001 From: Ashish Date: Tue, 19 Mar 2024 12:23:00 +0530 Subject: [PATCH 1/8] Introduce remote store path type in customData in IndexMetadata (#12607) Signed-off-by: Ashish Singh --- .../cluster/metadata/IndexMetadata.java | 1 + .../metadata/MetadataCreateIndexService.java | 36 ++++++-- .../common/settings/ClusterSettings.java | 2 + .../index/remote/RemoteStorePathResolver.java | 30 ++++++ .../index/remote/RemoteStorePathType.java | 36 ++++++++ .../opensearch/indices/IndicesService.java | 13 +++ .../MetadataCreateIndexServiceTests.java | 92 ++++++++++++++++++- 7 files changed, 201 insertions(+), 9 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteStorePathResolver.java create mode 100644 server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java diff --git a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java index 03784df509ed6..80b78cfe154f1 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java @@ -635,6 +635,7 @@ public static APIBlock readFrom(StreamInput input) throws IOException { static final String KEY_ROLLOVER_INFOS = "rollover_info"; static final String KEY_SYSTEM = "system"; public static final String KEY_PRIMARY_TERMS = "primary_terms"; + public static final String REMOTE_STORE_CUSTOM_KEY = "remote_store"; public static final String INDEX_STATE_FILE_PREFIX = "state-"; diff --git a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java index acc2f3a294745..f117d1a4a11a2 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/MetadataCreateIndexService.java @@ -88,6 +88,8 @@ import org.opensearch.index.mapper.MapperService; import org.opensearch.index.mapper.MapperService.MergeReason; import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.remote.RemoteStorePathResolver; +import org.opensearch.index.remote.RemoteStorePathType; import org.opensearch.index.shard.IndexSettingProvider; import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndexCreationException; @@ -167,6 +169,9 @@ public class MetadataCreateIndexService { private final ClusterManagerTaskThrottler.ThrottlingKey createIndexTaskKey; private AwarenessReplicaBalance awarenessReplicaBalance; + @Nullable + private final RemoteStorePathResolver remoteStorePathResolver; + public MetadataCreateIndexService( final Settings settings, final ClusterService clusterService, @@ -198,6 +203,9 @@ public MetadataCreateIndexService( // Task is onboarded for throttling, it will get retried from associated TransportClusterManagerNodeAction. createIndexTaskKey = clusterService.registerClusterManagerTask(ClusterManagerTaskKeys.CREATE_INDEX_KEY, true); + remoteStorePathResolver = isRemoteDataAttributePresent(settings) + ? new RemoteStorePathResolver(clusterService.getClusterSettings()) + : null; } /** @@ -498,7 +506,8 @@ private ClusterState applyCreateIndexWithTemporaryService( temporaryIndexMeta.getSettings(), temporaryIndexMeta.getRoutingNumShards(), sourceMetadata, - temporaryIndexMeta.isSystem() + temporaryIndexMeta.isSystem(), + temporaryIndexMeta.getCustomData() ); } catch (Exception e) { logger.info("failed to build index metadata [{}]", request.index()); @@ -522,10 +531,11 @@ private ClusterState applyCreateIndexWithTemporaryService( /** * Given a state and index settings calculated after applying templates, validate metadata for - * the new index, returning an {@link IndexMetadata} for the new index + * the new index, returning an {@link IndexMetadata} for the new index. + *

+ * The access level of the method changed to default level for visibility to test. */ - private IndexMetadata buildAndValidateTemporaryIndexMetadata( - final ClusterState currentState, + IndexMetadata buildAndValidateTemporaryIndexMetadata( final Settings aggregatedIndexSettings, final CreateIndexClusterStateUpdateRequest request, final int routingNumShards @@ -544,6 +554,11 @@ private IndexMetadata buildAndValidateTemporaryIndexMetadata( tmpImdBuilder.settings(indexSettings); tmpImdBuilder.system(isSystem); + if (remoteStorePathResolver != null) { + String pathType = remoteStorePathResolver.resolveType().toString(); + tmpImdBuilder.putCustom(IndexMetadata.REMOTE_STORE_CUSTOM_KEY, Map.of(RemoteStorePathType.NAME, pathType)); + } + // Set up everything, now locally create the index to see that things are ok, and apply IndexMetadata tempMetadata = tmpImdBuilder.build(); validateActiveShardCount(request.waitForActiveShards(), tempMetadata); @@ -582,7 +597,7 @@ private ClusterState applyCreateIndexRequestWithV1Templates( clusterService.getClusterSettings() ); int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null); - IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(currentState, aggregatedIndexSettings, request, routingNumShards); + IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards); return applyCreateIndexWithTemporaryService( currentState, @@ -647,7 +662,7 @@ private ClusterState applyCreateIndexRequestWithV2Template( clusterService.getClusterSettings() ); int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, null); - IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(currentState, aggregatedIndexSettings, request, routingNumShards); + IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards); return applyCreateIndexWithTemporaryService( currentState, @@ -728,7 +743,7 @@ private ClusterState applyCreateIndexRequestWithExistingMetadata( clusterService.getClusterSettings() ); final int routingNumShards = getIndexNumberOfRoutingShards(aggregatedIndexSettings, sourceMetadata); - IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(currentState, aggregatedIndexSettings, request, routingNumShards); + IndexMetadata tmpImd = buildAndValidateTemporaryIndexMetadata(aggregatedIndexSettings, request, routingNumShards); return applyCreateIndexWithTemporaryService( currentState, @@ -1147,7 +1162,8 @@ static IndexMetadata buildIndexMetadata( Settings indexSettings, int routingNumShards, @Nullable IndexMetadata sourceMetadata, - boolean isSystem + boolean isSystem, + Map customData ) { IndexMetadata.Builder indexMetadataBuilder = createIndexMetadataBuilder(indexName, sourceMetadata, indexSettings, routingNumShards); indexMetadataBuilder.system(isSystem); @@ -1168,6 +1184,10 @@ static IndexMetadata buildIndexMetadata( indexMetadataBuilder.putAlias(aliases.get(i)); } + for (Map.Entry entry : customData.entrySet()) { + indexMetadataBuilder.putCustom(entry.getKey(), entry.getValue()); + } + indexMetadataBuilder.state(IndexMetadata.State.OPEN); return indexMetadataBuilder.build(); } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 6b4be45929553..730a330b6ca60 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -713,6 +713,8 @@ public void apply(Settings value, Settings current, Settings previous) { IoBasedAdmissionControllerSettings.SEARCH_IO_USAGE_LIMIT, IoBasedAdmissionControllerSettings.INDEXING_IO_USAGE_LIMIT, IndicesService.CLUSTER_INDEX_RESTRICT_REPLICATION_TYPE_SETTING, + IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING, + // Concurrent segment search settings SearchService.CLUSTER_CONCURRENT_SEGMENT_SEARCH_SETTING, SearchService.CONCURRENT_SEGMENT_SEARCH_TARGET_MAX_SLICE_COUNT_SETTING diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathResolver.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathResolver.java new file mode 100644 index 0000000000000..6e8126fcce0ca --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathResolver.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.indices.IndicesService; + +/** + * Determines the {@link RemoteStorePathType} at the time of index metadata creation. + * + * @opensearch.internal + */ +public class RemoteStorePathResolver { + + private final ClusterSettings clusterSettings; + + public RemoteStorePathResolver(ClusterSettings clusterSettings) { + this.clusterSettings = clusterSettings; + } + + public RemoteStorePathType resolveType() { + return clusterSettings.get(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING); + } +} diff --git a/server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java new file mode 100644 index 0000000000000..a64e07ab1f66f --- /dev/null +++ b/server/src/main/java/org/opensearch/index/remote/RemoteStorePathType.java @@ -0,0 +1,36 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.remote; + +import java.util.Locale; + +/** + * Enumerates the types of remote store paths resolution techniques supported by OpenSearch. + * For more information, see Github issue #12567. + * + * @opensearch.internal + */ +public enum RemoteStorePathType { + + FIXED, + HASHED_PREFIX; + + public static RemoteStorePathType parseString(String remoteStoreBlobPathType) { + try { + return RemoteStorePathType.valueOf(remoteStoreBlobPathType.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException("Could not parse RemoteStorePathType for [" + remoteStoreBlobPathType + "]"); + } + } + + /** + * This string is used as key for storing information in the custom data in index settings. + */ + public static final String NAME = "path_type"; +} diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 40c10e3a2fe96..0e64894e6f708 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -123,6 +123,7 @@ import org.opensearch.index.query.QueryRewriteContext; import org.opensearch.index.recovery.RecoveryStats; import org.opensearch.index.refresh.RefreshStats; +import org.opensearch.index.remote.RemoteStorePathType; import org.opensearch.index.remote.RemoteStoreStatsTrackerFactory; import org.opensearch.index.search.stats.SearchStats; import org.opensearch.index.seqno.RetentionLeaseStats; @@ -313,6 +314,18 @@ public class IndicesService extends AbstractLifecycleComponent Property.Final ); + /** + * This setting is used to set the remote store blob store path prefix strategy. This setting is effective only for + * remote store enabled cluster. + */ + public static final Setting CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING = new Setting<>( + "cluster.remote_store.index.path.prefix.type", + RemoteStorePathType.FIXED.toString(), + RemoteStorePathType::parseString, + Property.NodeScope, + Property.Dynamic + ); + /** * The node's settings. */ diff --git a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java index cc605878119a2..cf4de32890a2a 100644 --- a/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/metadata/MetadataCreateIndexServiceTests.java @@ -71,6 +71,7 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.mapper.MapperService; import org.opensearch.index.query.QueryShardContext; +import org.opensearch.index.remote.RemoteStorePathType; import org.opensearch.index.translog.Translog; import org.opensearch.indices.IndexCreationException; import org.opensearch.indices.IndicesService; @@ -1563,13 +1564,102 @@ public void testBuildIndexMetadata() { .put(SETTING_NUMBER_OF_SHARDS, 1) .build(); List aliases = singletonList(AliasMetadata.builder("alias1").build()); - IndexMetadata indexMetadata = buildIndexMetadata("test", aliases, () -> null, indexSettings, 4, sourceIndexMetadata, false); + IndexMetadata indexMetadata = buildIndexMetadata( + "test", + aliases, + () -> null, + indexSettings, + 4, + sourceIndexMetadata, + false, + new HashMap<>() + ); assertThat(indexMetadata.getAliases().size(), is(1)); assertThat(indexMetadata.getAliases().keySet().iterator().next(), is("alias1")); assertThat("The source index primary term must be used", indexMetadata.primaryTerm(0), is(3L)); } + /** + * This test checks if the cluster is a remote store cluster then we populate custom data for remote settings in + * index metadata of the underlying index. This captures information around the resolution pattern of the path for + * remote segments and translog. + */ + public void testRemoteCustomData() { + // Case 1 - Remote store is not enabled + IndexMetadata indexMetadata = testRemoteCustomData(false, randomFrom(RemoteStorePathType.values())); + assertNull(indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY)); + + // Case 2 - cluster.remote_store.index.path.prefix.optimised=fixed (default value) + indexMetadata = testRemoteCustomData(true, RemoteStorePathType.FIXED); + validateRemoteCustomData( + indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY), + RemoteStorePathType.NAME, + RemoteStorePathType.FIXED.toString() + ); + + // Case 3 - cluster.remote_store.index.path.prefix.optimised=hashed_prefix + indexMetadata = testRemoteCustomData(true, RemoteStorePathType.HASHED_PREFIX); + validateRemoteCustomData( + indexMetadata.getCustomData(IndexMetadata.REMOTE_STORE_CUSTOM_KEY), + RemoteStorePathType.NAME, + RemoteStorePathType.HASHED_PREFIX.toString() + ); + } + + private IndexMetadata testRemoteCustomData(boolean remoteStoreEnabled, RemoteStorePathType remoteStorePathType) { + Settings.Builder settingsBuilder = Settings.builder(); + if (remoteStoreEnabled) { + settingsBuilder.put(NODE_ATTRIBUTES.getKey() + REMOTE_STORE_SEGMENT_REPOSITORY_NAME_ATTRIBUTE_KEY, "test"); + } + settingsBuilder.put(IndicesService.CLUSTER_REMOTE_STORE_PATH_PREFIX_TYPE_SETTING.getKey(), remoteStorePathType.toString()); + Settings settings = settingsBuilder.build(); + + ClusterService clusterService = mock(ClusterService.class); + Metadata metadata = Metadata.builder() + .transientSettings(Settings.builder().put(Metadata.DEFAULT_REPLICA_COUNT_SETTING.getKey(), 1).build()) + .build(); + ClusterState clusterState = ClusterState.builder(org.opensearch.cluster.ClusterName.CLUSTER_NAME_SETTING.getDefault(Settings.EMPTY)) + .metadata(metadata) + .build(); + ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + when(clusterService.getSettings()).thenReturn(settings); + when(clusterService.getClusterSettings()).thenReturn(clusterSettings); + when(clusterService.state()).thenReturn(clusterState); + + ThreadPool threadPool = new TestThreadPool(getTestName()); + MetadataCreateIndexService metadataCreateIndexService = new MetadataCreateIndexService( + settings, + clusterService, + null, + null, + null, + createTestShardLimitService(randomIntBetween(1, 1000), false, clusterService), + new Environment(Settings.builder().put("path.home", "dummy").build(), null), + IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, + threadPool, + null, + new SystemIndices(Collections.emptyMap()), + true, + new AwarenessReplicaBalance(settings, clusterService.getClusterSettings()) + ); + CreateIndexClusterStateUpdateRequest request = new CreateIndexClusterStateUpdateRequest("create index", "test", "test"); + Settings indexSettings = Settings.builder() + .put("index.version.created", Version.CURRENT) + .put(INDEX_NUMBER_OF_SHARDS_SETTING.getKey(), 3) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1) + .build(); + + IndexMetadata indexMetadata = metadataCreateIndexService.buildAndValidateTemporaryIndexMetadata(indexSettings, request, 0); + threadPool.shutdown(); + return indexMetadata; + } + + private void validateRemoteCustomData(Map customData, String expectedKey, String expectedValue) { + assertTrue(customData.containsKey(expectedKey)); + assertEquals(expectedValue, customData.get(expectedKey)); + } + public void testGetIndexNumberOfRoutingShardsWithNullSourceIndex() { Settings indexSettings = Settings.builder() .put("index.version.created", Version.CURRENT) From 7a6f8b0eead5371fa18d49534ac76edf199e9c4c Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Tue, 19 Mar 2024 07:16:07 -0400 Subject: [PATCH 2/8] Bump mockito & bytebuddy dependencies (#12720) Signed-off-by: Andriy Redko --- buildSrc/version.properties | 4 ++-- .../org/opensearch/transport/RemoteClusterClientTests.java | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/buildSrc/version.properties b/buildSrc/version.properties index 6da095473b520..8705588babe97 100644 --- a/buildSrc/version.properties +++ b/buildSrc/version.properties @@ -55,9 +55,9 @@ bouncycastle=1.77 randomizedrunner = 2.7.1 junit = 4.13.2 hamcrest = 2.1 -mockito = 5.10.0 +mockito = 5.11.0 objenesis = 3.2 -bytebuddy = 1.14.7 +bytebuddy = 1.14.9 # benchmark dependencies jmh = 1.35 diff --git a/server/src/test/java/org/opensearch/transport/RemoteClusterClientTests.java b/server/src/test/java/org/opensearch/transport/RemoteClusterClientTests.java index f3b7f9916d460..59c0206a87fb3 100644 --- a/server/src/test/java/org/opensearch/transport/RemoteClusterClientTests.java +++ b/server/src/test/java/org/opensearch/transport/RemoteClusterClientTests.java @@ -63,6 +63,7 @@ public void tearDown() throws Exception { ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); } + @AwaitsFix(bugUrl = "https://github.com/opensearch-project/OpenSearch/issues/12338") public void testConnectAndExecuteRequest() throws Exception { Settings remoteSettings = Settings.builder().put(ClusterName.CLUSTER_NAME_SETTING.getKey(), "foo_bar_cluster").build(); try ( From d4e1ab135dfcb058b476fce09a669f4be5854702 Mon Sep 17 00:00:00 2001 From: Sooraj Sinha <81695996+soosinha@users.noreply.github.com> Date: Tue, 19 Mar 2024 21:52:43 +0530 Subject: [PATCH 3/8] Election scheduler should be cancelled after cluster state publication (#11699) Signed-off-by: Sooraj Sinha --- CHANGELOG.md | 1 + .../cluster/coordination/Coordinator.java | 20 +++++++++++++++---- .../coordination/CoordinatorTests.java | 4 ++-- .../AbstractCoordinatorTestCase.java | 7 +++++++ 4 files changed, 26 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c4558cf3fe251..3fb141e189bd3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -146,6 +146,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Allow composite aggregation to run under a parent filter aggregation ([#11499](https://github.com/opensearch-project/OpenSearch/pull/11499)) - Quickly compute terms aggregations when the top-level query is functionally match-all for a segment ([#11643](https://github.com/opensearch-project/OpenSearch/pull/11643)) - Mark fuzzy filter GA and remove experimental setting ([12631](https://github.com/opensearch-project/OpenSearch/pull/12631)) +- Keep the election scheduler open until cluster state has been applied ([#11699](https://github.com/opensearch-project/OpenSearch/pull/11699)) ### Deprecated diff --git a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java index 5a07f964f94a4..3d74feddfa261 100644 --- a/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java +++ b/server/src/main/java/org/opensearch/cluster/coordination/Coordinator.java @@ -386,6 +386,7 @@ public void onFailure(String source, Exception e) { @Override public void onSuccess(String source) { + closePrevotingAndElectionScheduler(); applyListener.onResponse(null); } }); @@ -472,17 +473,29 @@ private static Optional joinWithDestination(Optional lastJoin, Disco } private void closePrevotingAndElectionScheduler() { + closePrevoting(); + closeElectionScheduler(); + } + + private void closePrevoting() { if (prevotingRound != null) { prevotingRound.close(); prevotingRound = null; } + } + private void closeElectionScheduler() { if (electionScheduler != null) { electionScheduler.close(); electionScheduler = null; } } + // package-visible for testing + boolean isElectionSchedulerRunning() { + return electionScheduler != null; + } + private void updateMaxTermSeen(final long term) { synchronized (mutex) { maxTermSeen = Math.max(maxTermSeen, term); @@ -724,7 +737,7 @@ void becomeLeader(String method) { lastKnownLeader = Optional.of(getLocalNode()); peerFinder.deactivate(getLocalNode()); clusterFormationFailureHelper.stop(); - closePrevotingAndElectionScheduler(); + closePrevoting(); preVoteCollector.update(getPreVoteResponse(), getLocalNode()); assert leaderChecker.leader() == null : leaderChecker.leader(); @@ -761,7 +774,7 @@ void becomeFollower(String method, DiscoveryNode leaderNode) { lastKnownLeader = Optional.of(leaderNode); peerFinder.deactivate(leaderNode); clusterFormationFailureHelper.stop(); - closePrevotingAndElectionScheduler(); + closePrevoting(); cancelActivePublication("become follower: " + method); preVoteCollector.update(getPreVoteResponse(), leaderNode); @@ -927,7 +940,6 @@ public void invariant() { assert lastKnownLeader.isPresent() && lastKnownLeader.get().equals(getLocalNode()); assert joinAccumulator instanceof JoinHelper.LeaderJoinAccumulator; assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader; - assert electionScheduler == null : electionScheduler; assert prevotingRound == null : prevotingRound; assert becomingClusterManager || getStateForClusterManagerService().nodes().getClusterManagerNodeId() != null : getStateForClusterManagerService(); @@ -972,7 +984,6 @@ assert getLocalNode().equals(applierState.nodes().getClusterManagerNode()) assert lastKnownLeader.isPresent() && (lastKnownLeader.get().equals(getLocalNode()) == false); assert joinAccumulator instanceof JoinHelper.FollowerJoinAccumulator; assert peerFinderLeader.equals(lastKnownLeader) : peerFinderLeader; - assert electionScheduler == null : electionScheduler; assert prevotingRound == null : prevotingRound; assert getStateForClusterManagerService().nodes().getClusterManagerNodeId() == null : getStateForClusterManagerService(); assert leaderChecker.currentNodeIsClusterManager() == false; @@ -1693,6 +1704,7 @@ public void onSuccess(String source) { updateMaxTermSeen(getCurrentTerm()); if (mode == Mode.LEADER) { + closePrevotingAndElectionScheduler(); // if necessary, abdicate to another node or improve the voting configuration boolean attemptReconfiguration = true; final ClusterState state = getLastAcceptedState(); // committed state diff --git a/server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java b/server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java index a3129655148ab..5eeebd2588416 100644 --- a/server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java +++ b/server/src/test/java/org/opensearch/cluster/coordination/CoordinatorTests.java @@ -270,7 +270,7 @@ public void testNodesJoinAfterStableCluster() { public void testExpandsConfigurationWhenGrowingFromOneNodeToThreeButDoesNotShrink() { try (Cluster cluster = new Cluster(1)) { cluster.runRandomly(); - cluster.stabilise(); + cluster.stabilise(DEFAULT_STABILISATION_TIME * 2); final ClusterNode leader = cluster.getAnyLeader(); @@ -1750,7 +1750,7 @@ public void testDoesNotPerformElectionWhenRestartingFollower() { public void testImproveConfigurationPerformsVotingConfigExclusionStateCheck() { try (Cluster cluster = new Cluster(1)) { cluster.runRandomly(); - cluster.stabilise(); + cluster.stabilise(DEFAULT_STABILISATION_TIME * 2); final Coordinator coordinator = cluster.getAnyLeader().coordinator; final ClusterState currentState = coordinator.getLastAcceptedState(); diff --git a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java index 28d7706fb1493..0754cc1793dc8 100644 --- a/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/opensearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -47,6 +47,7 @@ import org.opensearch.cluster.OpenSearchAllocationTestCase; import org.opensearch.cluster.coordination.AbstractCoordinatorTestCase.Cluster.ClusterNode; import org.opensearch.cluster.coordination.CoordinationMetadata.VotingConfiguration; +import org.opensearch.cluster.coordination.Coordinator.Mode; import org.opensearch.cluster.coordination.LinearizabilityChecker.History; import org.opensearch.cluster.coordination.LinearizabilityChecker.SequentialSpec; import org.opensearch.cluster.coordination.PersistedStateRegistry.PersistedStateType; @@ -653,6 +654,12 @@ void stabilise(long stabilisationDurationMillis) { leader.getLastAppliedClusterState().getNodes().nodeExists(nodeId) ); } + if (clusterNode.coordinator.getMode() == Mode.LEADER || clusterNode.coordinator.getMode() == Mode.FOLLOWER) { + assertFalse( + "Election scheduler should stop after cluster has stabilised", + clusterNode.coordinator.isElectionSchedulerRunning() + ); + } } final Set connectedNodeIds = clusterNodes.stream() From 479b65bfd4337ede96105739f5611ae9e8dbf971 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 19 Mar 2024 21:16:50 -0500 Subject: [PATCH 4/8] Bump peter-evans/create-pull-request from 5 to 6 (#12724) Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: dependabot[bot] --- .github/workflows/version.yml | 6 +++--- CHANGELOG.md | 1 + 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/version.yml b/.github/workflows/version.yml index be2a89ac931e9..7f120b65d7c2e 100644 --- a/.github/workflows/version.yml +++ b/.github/workflows/version.yml @@ -62,7 +62,7 @@ jobs: - name: Create PR for BASE id: base_pr - uses: peter-evans/create-pull-request@v5 + uses: peter-evans/create-pull-request@v6 with: base: ${{ env.BASE }} branch: 'create-pull-request/patch-${{ env.BASE }}' @@ -88,7 +88,7 @@ jobs: - name: Create PR for BASE_X id: base_x_pr - uses: peter-evans/create-pull-request@v5 + uses: peter-evans/create-pull-request@v6 with: base: ${{ env.BASE_X }} branch: 'create-pull-request/patch-${{ env.BASE_X }}' @@ -114,7 +114,7 @@ jobs: - name: Create PR for main id: main_pr - uses: peter-evans/create-pull-request@v5 + uses: peter-evans/create-pull-request@v6 with: base: main branch: 'create-pull-request/patch-main' diff --git a/CHANGELOG.md b/CHANGELOG.md index 3fb141e189bd3..4e5426797c69f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -141,6 +141,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Bump `aws-sdk-java` from 2.20.55 to 2.20.86 ([#12251](https://github.com/opensearch-project/OpenSearch/pull/12251)) - Bump `reactor-netty` from 1.1.15 to 1.1.17 ([#12633](https://github.com/opensearch-project/OpenSearch/pull/12633)) - Bump `reactor` from 3.5.14 to 3.5.15 ([#12633](https://github.com/opensearch-project/OpenSearch/pull/12633)) +- Bump `peter-evans/create-pull-request` from 5 to 6 ([#12724](https://github.com/opensearch-project/OpenSearch/pull/12724)) ### Changed - Allow composite aggregation to run under a parent filter aggregation ([#11499](https://github.com/opensearch-project/OpenSearch/pull/11499)) From f4a8d2b752fd2b444cc3ab35fc463300cf31f870 Mon Sep 17 00:00:00 2001 From: Sagar <99425694+sgup432@users.noreply.github.com> Date: Tue, 19 Mar 2024 19:41:59 -0700 Subject: [PATCH 5/8] Fixing ehcache flaky test (#12764) * Fixing ehcache flaky test Signed-off-by: Sagar Upadhyaya * Adding a ehcache issue reference for thread leak issue Signed-off-by: Sagar Upadhyaya * Updating comment Signed-off-by: Sagar Upadhyaya --------- Signed-off-by: Sagar Upadhyaya --- .../store/disk/EhCacheDiskCacheTests.java | 44 +++++++++++++++++++ .../store/disk/EhcacheThreadLeakFilter.java | 29 ++++++++++++ 2 files changed, 73 insertions(+) create mode 100644 plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhcacheThreadLeakFilter.java diff --git a/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java b/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java index 8c0ab62baee54..3a98ad2fef6b1 100644 --- a/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java +++ b/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhCacheDiskCacheTests.java @@ -8,6 +8,8 @@ package org.opensearch.cache.store.disk; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; + import org.opensearch.cache.EhcacheDiskCacheSettings; import org.opensearch.common.Randomness; import org.opensearch.common.cache.CacheType; @@ -47,6 +49,7 @@ import static org.opensearch.cache.EhcacheDiskCacheSettings.DISK_STORAGE_PATH_KEY; import static org.hamcrest.CoreMatchers.instanceOf; +@ThreadLeakFilters(filters = { EhcacheThreadLeakFilter.class }) public class EhCacheDiskCacheTests extends OpenSearchSingleNodeTestCase { private static final int CACHE_SIZE_IN_BYTES = 1024 * 101; @@ -633,6 +636,47 @@ public void testBasicGetAndPutBytesReference() throws Exception { } } + public void testInvalidate() throws Exception { + Settings settings = Settings.builder().build(); + MockRemovalListener removalListener = new MockRemovalListener<>(); + try (NodeEnvironment env = newNodeEnvironment(settings)) { + ICache ehcacheTest = new EhcacheDiskCache.Builder().setThreadPoolAlias("ehcacheTest") + .setStoragePath(env.nodePaths()[0].indicesPath.toString() + "/request_cache") + .setIsEventListenerModeSync(true) + .setKeyType(String.class) + .setKeySerializer(new StringSerializer()) + .setValueSerializer(new StringSerializer()) + .setValueType(String.class) + .setCacheType(CacheType.INDICES_REQUEST_CACHE) + .setSettings(settings) + .setExpireAfterAccess(TimeValue.MAX_VALUE) + .setMaximumWeightInBytes(CACHE_SIZE_IN_BYTES) + .setRemovalListener(removalListener) + .build(); + int randomKeys = randomIntBetween(10, 100); + Map keyValueMap = new HashMap<>(); + for (int i = 0; i < randomKeys; i++) { + keyValueMap.put(UUID.randomUUID().toString(), UUID.randomUUID().toString()); + } + for (Map.Entry entry : keyValueMap.entrySet()) { + ehcacheTest.put(entry.getKey(), entry.getValue()); + } + assertEquals(keyValueMap.size(), ehcacheTest.count()); + List removedKeyList = new ArrayList<>(); + for (Map.Entry entry : keyValueMap.entrySet()) { + if (randomBoolean()) { + removedKeyList.add(entry.getKey()); + ehcacheTest.invalidate(entry.getKey()); + } + } + for (String removedKey : removedKeyList) { + assertNull(ehcacheTest.get(removedKey)); + } + assertEquals(keyValueMap.size() - removedKeyList.size(), ehcacheTest.count()); + ehcacheTest.close(); + } + } + private static String generateRandomString(int length) { String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; StringBuilder randomString = new StringBuilder(length); diff --git a/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhcacheThreadLeakFilter.java b/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhcacheThreadLeakFilter.java new file mode 100644 index 0000000000000..6b54c3be10466 --- /dev/null +++ b/plugins/cache-ehcache/src/test/java/org/opensearch/cache/store/disk/EhcacheThreadLeakFilter.java @@ -0,0 +1,29 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cache.store.disk; + +import com.carrotsearch.randomizedtesting.ThreadFilter; + +/** + * In Ehcache(as of 3.10.8), while calling remove/invalidate() on entries causes to start a daemon thread in the + * background to clean up the stale offheap memory associated with the disk cache. And this thread is not closed even + * after we try to close the cache or cache manager. Considering that it requires a node restart to switch between + * different cache plugins, this shouldn't be a problem for now. + * + * See: https://github.com/ehcache/ehcache3/issues/3204 + */ +public class EhcacheThreadLeakFilter implements ThreadFilter { + + private static final String OFFENDING_THREAD_NAME = "MappedByteBufferSource"; + + @Override + public boolean reject(Thread t) { + return t.getName().startsWith(OFFENDING_THREAD_NAME); + } +} From 1b4c76ddd9a4363312d5950655ecbc36c530bb96 Mon Sep 17 00:00:00 2001 From: gaobinlong Date: Wed, 20 Mar 2024 11:06:16 +0800 Subject: [PATCH 6/8] Update supported version for the primary_only parameter in force-merge API (#12657) * Update supported version for adding primary_only parameter to force-merge API Signed-off-by: Gao Binlong * Modify change log Signed-off-by: Gao Binlong * Remove some unused code Signed-off-by: Gao Binlong * Remove change log Signed-off-by: Gao Binlong * Fix test issue Signed-off-by: Gao Binlong --------- Signed-off-by: Gao Binlong --- .../20_wait_for_completion.yml | 33 ++++++++++-- .../indices/forcemerge/ForceMergeRequest.java | 4 +- .../forcemerge/ForceMergeRequestTests.java | 53 ++++++++++--------- 3 files changed, 60 insertions(+), 30 deletions(-) diff --git a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml index efa239547e84a..a0bddd1cbd13f 100644 --- a/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml +++ b/rest-api-spec/src/main/resources/rest-api-spec/test/indices.forcemerge/20_wait_for_completion.yml @@ -4,15 +4,17 @@ # will return a task immediately and the merge process will run in background. - skip: - version: " - 2.99.99" - reason: "only available in 3.0+" - features: allowed_warnings + version: " - 2.6.99, 2.13.0 - " + reason: "wait_for_completion was introduced in 2.7.0 and task description was changed in 2.13.0" + features: allowed_warnings, node_selector - do: indices.create: index: test_index - do: + node_selector: + version: " 2.7.0 - 2.12.99" indices.forcemerge: index: test_index wait_for_completion: false @@ -25,8 +27,31 @@ wait_for_completion: true task_id: $taskId - match: { task.action: "indices:admin/forcemerge" } - - match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]" } + - match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true]" } + +--- +"Force merge index with wait_for_completion after task description changed": + - skip: + version: " - 2.12.99 " + reason: "task description was changed in 2.13.0" + features: allowed_warnings, node_selector + + - do: + node_selector: + version: " 2.13.0 - " + indices.forcemerge: + index: test_index + wait_for_completion: false + max_num_segments: 1 + - match: { task: /^.+$/ } + - set: { task: taskId } + - do: + tasks.get: + wait_for_completion: true + task_id: $taskId + - match: { task.action: "indices:admin/forcemerge" } + - match: { task.description: "Force-merge indices [test_index], maxSegments[1], onlyExpungeDeletes[false], flush[true], primaryOnly[false]" } # .tasks index is created when the force-merge operation completes, so we should delete .tasks index finally, # if not, the .tasks index may introduce unexpected warnings and then cause other test cases to fail. # Delete the .tasks index directly will also introduce warning, but currently we don't have such APIs which can delete one diff --git a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java index bf6ee9ca43755..3efc4db21afbc 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequest.java @@ -102,7 +102,7 @@ public ForceMergeRequest(StreamInput in) throws IOException { maxNumSegments = in.readInt(); onlyExpungeDeletes = in.readBoolean(); flush = in.readBoolean(); - if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + if (in.getVersion().onOrAfter(Version.V_2_13_0)) { primaryOnly = in.readBoolean(); } if (in.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { @@ -219,7 +219,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeInt(maxNumSegments); out.writeBoolean(onlyExpungeDeletes); out.writeBoolean(flush); - if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + if (out.getVersion().onOrAfter(Version.V_2_13_0)) { out.writeBoolean(primaryOnly); } if (out.getVersion().onOrAfter(FORCE_MERGE_UUID_VERSION)) { diff --git a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java index a80141c52b6b4..03cf38548a8cd 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/forcemerge/ForceMergeRequestTests.java @@ -32,8 +32,10 @@ package org.opensearch.action.admin.indices.forcemerge; import org.opensearch.Version; +import org.opensearch.action.support.IndicesOptions; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.tasks.TaskId; import org.opensearch.test.OpenSearchTestCase; import org.opensearch.test.VersionUtils; @@ -95,42 +97,48 @@ public void testSerialization() throws Exception { public void testBwcSerialization() throws Exception { { final ForceMergeRequest sample = randomRequest(); - final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT); + final Version version = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT); try (BytesStreamOutput out = new BytesStreamOutput()) { - out.setVersion(compatibleVersion); + out.setVersion(version); sample.writeTo(out); - final ForceMergeRequest deserializedRequest; try (StreamInput in = out.bytes().streamInput()) { - in.setVersion(Version.CURRENT); - deserializedRequest = new ForceMergeRequest(in); - } - - assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments()); - assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes()); - assertEquals(sample.flush(), deserializedRequest.flush()); - if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { - assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly()); - assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID()); + in.setVersion(version); + TaskId.readFromStream(in); + in.readStringArray(); + IndicesOptions.readIndicesOptions(in); + int maxNumSegments = in.readInt(); + boolean onlyExpungeDeletes = in.readBoolean(); + boolean flush = in.readBoolean(); + boolean primaryOnly = in.readBoolean(); + String forceMergeUUID; + if (version.onOrAfter(Version.V_3_0_0)) { + forceMergeUUID = in.readString(); + } else { + forceMergeUUID = in.readOptionalString(); + } + assertEquals(sample.maxNumSegments(), maxNumSegments); + assertEquals(sample.onlyExpungeDeletes(), onlyExpungeDeletes); + assertEquals(sample.flush(), flush); + assertEquals(sample.primaryOnly(), primaryOnly); + assertEquals(sample.forceMergeUUID(), forceMergeUUID); } } } { final ForceMergeRequest sample = randomRequest(); - final Version compatibleVersion = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT); + final Version version = VersionUtils.randomCompatibleVersion(random(), Version.CURRENT); try (BytesStreamOutput out = new BytesStreamOutput()) { - out.setVersion(Version.CURRENT); + out.setVersion(version); sample.getParentTask().writeTo(out); out.writeStringArray(sample.indices()); sample.indicesOptions().writeIndicesOptions(out); out.writeInt(sample.maxNumSegments()); out.writeBoolean(sample.onlyExpungeDeletes()); out.writeBoolean(sample.flush()); - if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { - out.writeBoolean(sample.primaryOnly()); - } - if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { + out.writeBoolean(sample.primaryOnly()); + if (version.onOrAfter(Version.V_3_0_0)) { out.writeString(sample.forceMergeUUID()); } else { out.writeOptionalString(sample.forceMergeUUID()); @@ -138,18 +146,15 @@ public void testBwcSerialization() throws Exception { final ForceMergeRequest deserializedRequest; try (StreamInput in = out.bytes().streamInput()) { - in.setVersion(compatibleVersion); + in.setVersion(version); deserializedRequest = new ForceMergeRequest(in); } assertEquals(sample.maxNumSegments(), deserializedRequest.maxNumSegments()); assertEquals(sample.onlyExpungeDeletes(), deserializedRequest.onlyExpungeDeletes()); assertEquals(sample.flush(), deserializedRequest.flush()); - if (compatibleVersion.onOrAfter(Version.V_3_0_0)) { - assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly()); - } + assertEquals(sample.primaryOnly(), deserializedRequest.primaryOnly()); assertEquals(sample.forceMergeUUID(), deserializedRequest.forceMergeUUID()); - } } } From ad6b53834229da724312ad6a64faa2dc41a8d4c5 Mon Sep 17 00:00:00 2001 From: shwetathareja Date: Wed, 20 Mar 2024 16:28:42 +0530 Subject: [PATCH 7/8] Fixing flaky test GeoPointShapeQueryTests.testQueryLinearRing (#12783) Signed-off-by: Shweta Thareja Co-authored-by: Shweta Thareja --- .../java/org/opensearch/search/geo/GeoPointShapeQueryTests.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/server/src/test/java/org/opensearch/search/geo/GeoPointShapeQueryTests.java b/server/src/test/java/org/opensearch/search/geo/GeoPointShapeQueryTests.java index b6b2a86ac7549..b5d34a78ab5a4 100644 --- a/server/src/test/java/org/opensearch/search/geo/GeoPointShapeQueryTests.java +++ b/server/src/test/java/org/opensearch/search/geo/GeoPointShapeQueryTests.java @@ -143,6 +143,8 @@ public void testQueryLinearRing() throws Exception { e.getCause().getMessage(), containsString("Field [" + defaultGeoFieldName + "] does not support LINEARRING queries") ); + } catch (UnsupportedOperationException e) { + assertThat(e.getMessage(), containsString("line ring cannot be serialized using GeoJson")); } } From c369ec44e381ddb85db647d89389842952a1c06a Mon Sep 17 00:00:00 2001 From: Prabhat <20185657+CaptainDredge@users.noreply.github.com> Date: Wed, 20 Mar 2024 04:50:28 -0700 Subject: [PATCH 8/8] Added static setting for checkPendingFlushUpdate functionality of lucene writer (#12710) Signed-off-by: Prabhat Sharma Co-authored-by: Prabhat Sharma --- CHANGELOG.md | 1 + .../common/settings/IndexScopedSettings.java | 1 + .../org/opensearch/index/IndexSettings.java | 20 ++++++++++++++++++- .../index/engine/InternalEngine.java | 1 + 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e5426797c69f..3cfdcd86f6bf0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -118,6 +118,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - [Tiered caching] Add Stale keys Management and CacheCleaner to IndicesRequestCache ([#12625](https://github.com/opensearch-project/OpenSearch/pull/12625)) - [Tiered caching] Add serializer integration to allow ehcache disk cache to use non-primitive values ([#12709](https://github.com/opensearch-project/OpenSearch/pull/12709)) - [Admission Control] Integrated IO Based AdmissionController to AdmissionControl Framework ([#12583](https://github.com/opensearch-project/OpenSearch/pull/12583)) +- Introduce a new setting `index.check_pending_flush.enabled` to expose the ability to disable the check for pending flushes by write threads ([#12710](https://github.com/opensearch-project/OpenSearch/pull/12710)) ### Dependencies - Bump `peter-evans/find-comment` from 2 to 3 ([#12288](https://github.com/opensearch-project/OpenSearch/pull/12288)) diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 49bb3abf1decd..c6c312d6b6eea 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -207,6 +207,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY, IndexSettings.INDEX_MERGE_POLICY, + IndexSettings.INDEX_CHECK_PENDING_FLUSH_ENABLED, LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING, LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING, LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index d750a13dece64..5aaea2c498701 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -624,6 +624,16 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); + /** + * Expert: Makes indexing threads check for pending flushes on update in order to help out + * flushing indexing buffers to disk. This is an experimental Apache Lucene feature. + */ + public static final Setting INDEX_CHECK_PENDING_FLUSH_ENABLED = Setting.boolSetting( + "index.check_pending_flush.enabled", + true, + Property.IndexScope + ); + public static final Setting TIME_SERIES_INDEX_MERGE_POLICY = Setting.simpleString( "indices.time_series_index.default_index_merge_policy", DEFAULT_POLICY, @@ -816,7 +826,10 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { * Specialized merge-on-flush policy if provided */ private volatile UnaryOperator mergeOnFlushPolicy; - + /** + * Is flush check by write threads enabled or not + */ + private final boolean checkPendingFlushEnabled; /** * Is fuzzy set enabled for doc id */ @@ -959,6 +972,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti maxFullFlushMergeWaitTime = scopedSettings.get(INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME); mergeOnFlushEnabled = scopedSettings.get(INDEX_MERGE_ON_FLUSH_ENABLED); setMergeOnFlushPolicy(scopedSettings.get(INDEX_MERGE_ON_FLUSH_POLICY)); + checkPendingFlushEnabled = scopedSettings.get(INDEX_CHECK_PENDING_FLUSH_ENABLED); defaultSearchPipeline = scopedSettings.get(DEFAULT_SEARCH_PIPELINE); /* There was unintentional breaking change got introduced with [OpenSearch-6424](https://github.com/opensearch-project/OpenSearch/pull/6424) (version 2.7). * For indices created prior version (prior to 2.7) which has IndexSort type, they used to type cast the SortField.Type @@ -1844,6 +1858,10 @@ private void setMergeOnFlushPolicy(String policy) { } } + public boolean isCheckPendingFlushEnabled() { + return checkPendingFlushEnabled; + } + public Optional> getMergeOnFlushPolicy() { return Optional.ofNullable(mergeOnFlushPolicy); } diff --git a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java index e204656d3f106..a25ec95f58e05 100644 --- a/server/src/main/java/org/opensearch/index/engine/InternalEngine.java +++ b/server/src/main/java/org/opensearch/index/engine/InternalEngine.java @@ -2336,6 +2336,7 @@ private IndexWriterConfig getIndexWriterConfig() { iwc.setMaxFullFlushMergeWaitMillis(0); } + iwc.setCheckPendingFlushUpdate(config().getIndexSettings().isCheckPendingFlushEnabled()); iwc.setMergePolicy(new OpenSearchMergePolicy(mergePolicy)); iwc.setSimilarity(engineConfig.getSimilarity()); iwc.setRAMBufferSizeMB(engineConfig.getIndexingBufferSize().getMbFrac());