From f5d3fd2483f58cdbb0d77cf53a1e5439b3cc860a Mon Sep 17 00:00:00 2001 From: Ashish Date: Fri, 1 Sep 2023 17:25:39 +0530 Subject: [PATCH] Introduce cluster default remote translog buffer interval setting (#9584) Signed-off-by: Ashish Singh --- CHANGELOG.md | 1 + .../opensearch/index/shard/IndexShardIT.java | 3 +- .../opensearch/remotestore/RemoteStoreIT.java | 169 ++++++++++++++++++ .../common/settings/ClusterSettings.java | 3 +- .../concurrent/BufferedAsyncIOProcessor.java | 4 + .../org/opensearch/index/IndexModule.java | 6 +- .../org/opensearch/index/IndexService.java | 8 +- .../org/opensearch/index/IndexSettings.java | 7 + .../opensearch/index/shard/IndexShard.java | 20 ++- .../opensearch/indices/IndicesService.java | 31 +++- .../opensearch/index/IndexModuleTests.java | 3 +- .../indices/IndicesServiceTests.java | 5 + .../index/shard/IndexShardTestCase.java | 3 +- 13 files changed, 252 insertions(+), 11 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 80ab0fb87e609..ffefcd416da1c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507)) - Expose DelimitedTermFrequencyTokenFilter to allow providing term frequencies along with terms ([#9479](https://github.com/opensearch-project/OpenSearch/pull/9479)) - APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592)) +- Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584)) ### Dependencies - Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307)) diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java index 98a1c71d8e2b3..bb08b19df765b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/IndexShardIT.java @@ -701,7 +701,8 @@ public static final IndexShard newIndexShard( (indexSettings, shardRouting) -> new InternalTranslogFactory(), SegmentReplicationCheckpointPublisher.EMPTY, null, - null + null, + () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL ); } diff --git a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java index 9a2948861e967..b87ccdb22d014 100644 --- a/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java @@ -9,11 +9,21 @@ package org.opensearch.remotestore; import org.opensearch.action.admin.indices.delete.DeleteIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexRequest; +import org.opensearch.action.admin.indices.get.GetIndexResponse; import org.opensearch.action.admin.indices.recovery.RecoveryResponse; +import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.index.IndexResponse; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.routing.RecoverySource; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; +import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor; +import org.opensearch.core.index.Index; +import org.opensearch.index.IndexService; +import org.opensearch.index.IndexSettings; +import org.opensearch.index.shard.IndexShard; +import org.opensearch.indices.IndicesService; import org.opensearch.indices.recovery.RecoveryState; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -26,9 +36,11 @@ import java.util.Collection; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP; +import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount; import static org.hamcrest.Matchers.comparesEqualTo; @@ -184,4 +196,161 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception { // We also allow (numberOfIterations + 1) as index creation also triggers refresh. MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1))); } + + /** + * Tests that when the index setting is not passed during index creation, the buffer interval picked up is the cluster + * default. + */ + public void testDefaultBufferInterval() throws ExecutionException, InterruptedException { + setupRepo(); + String clusterManagerName = internalCluster().getClusterManagerName(); + String dataNode = internalCluster().startDataOnlyNodes(1).get(0); + createIndex(INDEX_NAME); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + assertClusterRemoteBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, dataNode); + + IndexShard indexShard = getIndexShard(dataNode); + assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor); + assertBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, indexShard); + + // Next, we change the default buffer interval and the same should reflect in the buffer interval of the index created + TimeValue clusterBufferInterval = TimeValue.timeValueSeconds(randomIntBetween(100, 200)); + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), clusterBufferInterval)) + .get(); + assertBufferInterval(clusterBufferInterval, indexShard); + clearClusterBufferIntervalSetting(clusterManagerName); + } + + /** + * This tests multiple cases where the index setting is passed during the index creation with multiple combinations + * with and without cluster default. + */ + public void testOverriddenBufferInterval() throws ExecutionException, InterruptedException { + setupRepo(); + String clusterManagerName = internalCluster().getClusterManagerName(); + String dataNode = internalCluster().startDataOnlyNodes(1).get(0); + + TimeValue bufferInterval = TimeValue.timeValueSeconds(randomIntBetween(0, 100)); + Settings indexSettings = Settings.builder() + .put(indexSettings()) + .put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval) + .build(); + createIndex(INDEX_NAME, indexSettings); + ensureYellowAndNoInitializingShards(INDEX_NAME); + ensureGreen(INDEX_NAME); + + IndexShard indexShard = getIndexShard(dataNode); + assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor); + assertBufferInterval(bufferInterval, indexShard); + + // Set the cluster default with a different value, validate that the buffer interval is still the overridden value + TimeValue clusterBufferInterval = TimeValue.timeValueSeconds(randomIntBetween(100, 200)); + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), clusterBufferInterval)) + .get(); + assertBufferInterval(bufferInterval, indexShard); + + // Set the index setting (index.remote_store.translog.buffer_interval) with a different value and validate that + // the buffer interval is updated + bufferInterval = TimeValue.timeValueSeconds(bufferInterval.seconds() + randomIntBetween(1, 100)); + client(clusterManagerName).admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(INDEX_NAME).settings( + Settings.builder().put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval) + ) + ) + .get(); + assertBufferInterval(bufferInterval, indexShard); + + // Set the index setting (index.remote_store.translog.buffer_interval) with null and validate the buffer interval + // which will be the cluster default now. + client(clusterManagerName).admin() + .indices() + .updateSettings( + new UpdateSettingsRequest(INDEX_NAME).settings( + Settings.builder().putNull(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey()) + ) + ) + .get(); + assertBufferInterval(clusterBufferInterval, indexShard); + clearClusterBufferIntervalSetting(clusterManagerName); + } + + /** + * This tests validation which kicks in during index creation failing creation if the value is less than minimum allowed value. + */ + public void testOverriddenBufferIntervalValidation() { + setupRepo(); + TimeValue bufferInterval = TimeValue.timeValueSeconds(-1); + Settings indexSettings = Settings.builder() + .put(indexSettings()) + .put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval) + .build(); + IllegalArgumentException exceptionDuringCreateIndex = assertThrows( + IllegalArgumentException.class, + () -> createIndex(INDEX_NAME, indexSettings) + ); + assertEquals( + "failed to parse value [-1] for setting [index.remote_store.translog.buffer_interval], must be >= [0ms]", + exceptionDuringCreateIndex.getMessage() + ); + } + + /** + * This tests validation of the cluster setting when being set. + */ + public void testClusterBufferIntervalValidation() { + String clusterManagerName = internalCluster().startClusterManagerOnlyNode(); + setupRepo(false); + IllegalArgumentException exception = assertThrows( + IllegalArgumentException.class, + () -> client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings( + Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(-1)) + ) + .get() + ); + assertEquals( + "failed to parse value [-1] for setting [cluster.remote_store.translog.buffer_interval], must be >= [0ms]", + exception.getMessage() + ); + } + + private IndexShard getIndexShard(String dataNode) throws ExecutionException, InterruptedException { + String clusterManagerName = internalCluster().getClusterManagerName(); + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); + GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get(); + String uuid = getIndexResponse.getSettings().get(INDEX_NAME).get(IndexMetadata.SETTING_INDEX_UUID); + IndexService indexService = indicesService.indexService(new Index(INDEX_NAME, uuid)); + return indexService.getShard(0); + } + + private void assertClusterRemoteBufferInterval(TimeValue expectedBufferInterval, String dataNode) { + IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode); + assertEquals(expectedBufferInterval, indicesService.getClusterRemoteTranslogBufferInterval()); + } + + private void assertBufferInterval(TimeValue expectedBufferInterval, IndexShard indexShard) { + assertEquals( + expectedBufferInterval, + ((BufferedAsyncIOProcessor) indexShard.getTranslogSyncProcessor()).getBufferIntervalSupplier().get() + ); + } + + private void clearClusterBufferIntervalSetting(String clusterManagerName) { + client(clusterManagerName).admin() + .cluster() + .prepareUpdateSettings() + .setTransientSettings(Settings.builder().putNull(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey())) + .get(); + } } diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 8093eab696779..05938914b019f 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -686,7 +686,8 @@ public void apply(Settings value, Settings current, Settings previous) { List.of( IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING, IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING, - IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING + IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING, + IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING ), List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH), List.of( diff --git a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java index 7079aa705d126..be2029b2e7c62 100644 --- a/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java +++ b/server/src/main/java/org/opensearch/common/util/concurrent/BufferedAsyncIOProcessor.java @@ -92,4 +92,8 @@ private TimeValue getBufferInterval() { protected abstract String getBufferProcessThreadPoolName(); + // Exclusively for testing, please do not use it elsewhere. + public Supplier getBufferIntervalSupplier() { + return bufferIntervalSupplier; + } } diff --git a/server/src/main/java/org/opensearch/index/IndexModule.java b/server/src/main/java/org/opensearch/index/IndexModule.java index d1e071eedb39e..8692876412ea9 100644 --- a/server/src/main/java/org/opensearch/index/IndexModule.java +++ b/server/src/main/java/org/opensearch/index/IndexModule.java @@ -601,7 +601,8 @@ public IndexService newIndexService( ValuesSourceRegistry valuesSourceRegistry, IndexStorePlugin.DirectoryFactory remoteDirectoryFactory, BiFunction translogFactorySupplier, - Supplier clusterDefaultRefreshIntervalSupplier + Supplier clusterDefaultRefreshIntervalSupplier, + Supplier clusterRemoteTranslogBufferIntervalSupplier ) throws IOException { final IndexEventListener eventListener = freeze(); Function> readerWrapperFactory = indexReaderWrapper @@ -658,7 +659,8 @@ public IndexService newIndexService( valuesSourceRegistry, recoveryStateFactory, translogFactorySupplier, - clusterDefaultRefreshIntervalSupplier + clusterDefaultRefreshIntervalSupplier, + clusterRemoteTranslogBufferIntervalSupplier ); success = true; return indexService; diff --git a/server/src/main/java/org/opensearch/index/IndexService.java b/server/src/main/java/org/opensearch/index/IndexService.java index e5028ff2ecff9..80ead0a333ba3 100644 --- a/server/src/main/java/org/opensearch/index/IndexService.java +++ b/server/src/main/java/org/opensearch/index/IndexService.java @@ -177,6 +177,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust private final ValuesSourceRegistry valuesSourceRegistry; private final BiFunction translogFactorySupplier; private final Supplier clusterDefaultRefreshIntervalSupplier; + private final Supplier clusterRemoteTranslogBufferIntervalSupplier; public IndexService( IndexSettings indexSettings, @@ -210,7 +211,8 @@ public IndexService( ValuesSourceRegistry valuesSourceRegistry, IndexStorePlugin.RecoveryStateFactory recoveryStateFactory, BiFunction translogFactorySupplier, - Supplier clusterDefaultRefreshIntervalSupplier + Supplier clusterDefaultRefreshIntervalSupplier, + Supplier clusterRemoteTranslogBufferIntervalSupplier ) { super(indexSettings); this.allowExpensiveQueries = allowExpensiveQueries; @@ -284,6 +286,7 @@ public IndexService( this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this); this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this); this.translogFactorySupplier = translogFactorySupplier; + this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier; updateFsyncTaskIfNecessary(); } @@ -512,7 +515,8 @@ public synchronized IndexShard createShard( translogFactorySupplier, this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null, remoteStore, - remoteStoreStatsTrackerFactory + remoteStoreStatsTrackerFactory, + clusterRemoteTranslogBufferIntervalSupplier ); eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created"); eventListener.afterIndexShardCreated(indexShard); diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 4609b1f994737..9a9e0d8b1f913 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -1197,6 +1197,13 @@ public TimeValue getRemoteTranslogUploadBufferInterval() { return remoteTranslogUploadBufferInterval; } + /** + * Returns true iff the remote translog buffer interval setting exists or in other words is explicitly set. + */ + public boolean isRemoteTranslogBufferIntervalExplicit() { + return INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.exists(settings); + } + public void setRemoteTranslogUploadBufferInterval(TimeValue remoteTranslogUploadBufferInterval) { this.remoteTranslogUploadBufferInterval = remoteTranslogUploadBufferInterval; } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index 0d51126ace8c7..e398cab23a085 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -361,7 +361,8 @@ public IndexShard( final BiFunction translogFactorySupplier, @Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher, @Nullable final Store remoteStore, - final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory + final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory, + final Supplier clusterRemoteTranslogBufferIntervalSupplier ) throws IOException { super(shardRouting.shardId(), indexSettings); assert shardRouting.initializing(); @@ -382,7 +383,7 @@ public IndexShard( threadPool, this::getEngine, indexSettings.isRemoteTranslogStoreEnabled(), - indexSettings::getRemoteTranslogUploadBufferInterval + () -> getRemoteTranslogUploadBufferInterval(clusterRemoteTranslogBufferIntervalSupplier) ); this.mapperService = mapperService; this.indexCache = indexCache; @@ -4117,6 +4118,8 @@ private static AsyncIOProcessor createTranslogSyncProcessor( boolean bufferAsyncIoProcessor, Supplier bufferIntervalSupplier ) { + assert bufferAsyncIoProcessor == false || Objects.nonNull(bufferIntervalSupplier) + : "If bufferAsyncIoProcessor is true, then the bufferIntervalSupplier needs to be non null"; ThreadContext threadContext = threadPool.getThreadContext(); CheckedConsumer>>, IOException> writeConsumer = candidates -> { try { @@ -4911,4 +4914,17 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() { public GatedCloseable getSegmentInfosSnapshot() { return getEngine().getSegmentInfosSnapshot(); } + + private TimeValue getRemoteTranslogUploadBufferInterval(Supplier clusterRemoteTranslogBufferIntervalSupplier) { + assert Objects.nonNull(clusterRemoteTranslogBufferIntervalSupplier) : "remote translog buffer interval supplier is null"; + if (indexSettings().isRemoteTranslogBufferIntervalExplicit()) { + return indexSettings().getRemoteTranslogUploadBufferInterval(); + } + return clusterRemoteTranslogBufferIntervalSupplier.get(); + } + + // Exclusively for testing, please do not use it elsewhere. + public AsyncIOProcessor getTranslogSyncProcessor() { + return translogSyncProcessor; + } } diff --git a/server/src/main/java/org/opensearch/indices/IndicesService.java b/server/src/main/java/org/opensearch/indices/IndicesService.java index 13a82ce5bdf0d..c31acd5a0f966 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesService.java +++ b/server/src/main/java/org/opensearch/indices/IndicesService.java @@ -69,6 +69,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.BigArrays; +import org.opensearch.common.util.FeatureFlags; import org.opensearch.common.util.concurrent.AbstractRefCounted; import org.opensearch.common.util.concurrent.AbstractRunnable; import org.opensearch.common.util.concurrent.OpenSearchExecutors; @@ -272,6 +273,17 @@ public class IndicesService extends AbstractLifecycleComponent Property.Final ); + /** + * Used to specify the default translog buffer interval for remote store backed indexes. + */ + public static final Setting CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING = Setting.timeSetting( + "cluster.remote_store.translog.buffer_interval", + IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, + IndexSettings.MINIMUM_REMOTE_TRANSLOG_BUFFER_INTERVAL, + Property.NodeScope, + Property.Dynamic + ); + /** * This setting is used to set the refresh interval when the {@code index.refresh_interval} index setting is not * provided during index creation or when the existing {@code index.refresh_interval} index setting is set as null. @@ -349,6 +361,7 @@ public class IndicesService extends AbstractLifecycleComponent private final IndexStorePlugin.DirectoryFactory remoteDirectoryFactory; private final BiFunction translogFactorySupplier; private volatile TimeValue clusterDefaultRefreshInterval; + private volatile TimeValue clusterRemoteTranslogBufferInterval; private final FileCacheCleaner fileCacheCleaner; @Override @@ -473,6 +486,12 @@ protected void closeInternal() { this.clusterDefaultRefreshInterval = CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING.get(clusterService.getSettings()); clusterService.getClusterSettings() .addSettingsUpdateConsumer(CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING, this::onRefreshIntervalUpdate); + + if (FeatureFlags.isEnabled(FeatureFlags.REMOTE_STORE)) { + this.clusterRemoteTranslogBufferInterval = CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.get(clusterService.getSettings()); + clusterService.getClusterSettings() + .addSettingsUpdateConsumer(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING, this::setClusterRemoteTranslogBufferInterval); + } } /** @@ -867,7 +886,8 @@ private synchronized IndexService createIndexService( valuesSourceRegistry, remoteDirectoryFactory, translogFactorySupplier, - this::getClusterDefaultRefreshInterval + this::getClusterDefaultRefreshInterval, + this::getClusterRemoteTranslogBufferInterval ); } @@ -1981,4 +2001,13 @@ private static void validateRefreshIntervalSettings(TimeValue minimumRefreshInte private TimeValue getClusterDefaultRefreshInterval() { return this.clusterDefaultRefreshInterval; } + + // Exclusively for testing, please do not use it elsewhere. + public TimeValue getClusterRemoteTranslogBufferInterval() { + return clusterRemoteTranslogBufferInterval; + } + + private void setClusterRemoteTranslogBufferInterval(TimeValue clusterRemoteTranslogBufferInterval) { + this.clusterRemoteTranslogBufferInterval = clusterRemoteTranslogBufferInterval; + } } diff --git a/server/src/test/java/org/opensearch/index/IndexModuleTests.java b/server/src/test/java/org/opensearch/index/IndexModuleTests.java index 3b43ede3c0a6d..fadce99416cbc 100644 --- a/server/src/test/java/org/opensearch/index/IndexModuleTests.java +++ b/server/src/test/java/org/opensearch/index/IndexModuleTests.java @@ -255,7 +255,8 @@ private IndexService newIndexService(IndexModule module) throws IOException { null, new RemoteSegmentStoreDirectoryFactory(() -> repositoriesService, threadPool), translogFactorySupplier, - () -> IndexSettings.DEFAULT_REFRESH_INTERVAL + () -> IndexSettings.DEFAULT_REFRESH_INTERVAL, + () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL ); } diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java index 1b547ff702d98..d22ad06245687 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceTests.java @@ -619,4 +619,9 @@ public void testConflictingEngineFactories() { ".*multiple engine factories provided for \\[foobar/.*\\]: \\[.*FooEngineFactory\\],\\[.*BarEngineFactory\\].*"; assertThat(e, hasToString(new RegexMatcher(pattern))); } + + public void testClusterRemoteTranslogBufferIntervalNull() { + IndicesService indicesService = getIndicesService(); + assertNull(indicesService.getClusterRemoteTranslogBufferInterval()); + } } diff --git a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java index b14a42e1e78ae..474acc764620d 100644 --- a/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java +++ b/test/framework/src/main/java/org/opensearch/index/shard/IndexShardTestCase.java @@ -695,7 +695,8 @@ protected IndexShard newShard( translogFactorySupplier, checkpointPublisher, remoteStore, - remoteStoreStatsTrackerFactory + remoteStoreStatsTrackerFactory, + () -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL ); indexShard.addShardFailureCallback(DEFAULT_SHARD_FAILURE_HANDLER); if (remoteStoreStatsTrackerFactory != null) {