Skip to content

Commit

Permalink
Introduce cluster default remote translog buffer interval setting (#9584
Browse files Browse the repository at this point in the history
)

Signed-off-by: Ashish Singh <ssashish@amazon.com>
  • Loading branch information
ashking94 committed Sep 1, 2023
1 parent 04c90c7 commit f5d3fd2
Show file tree
Hide file tree
Showing 13 changed files with 252 additions and 11 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Decouple replication lag from logic to fail stale replicas ([#9507](https://github.com/opensearch-project/OpenSearch/pull/9507))
- Expose DelimitedTermFrequencyTokenFilter to allow providing term frequencies along with terms ([#9479](https://github.com/opensearch-project/OpenSearch/pull/9479))
- APIs for performing async blob reads and async downloads from the repository using multiple streams ([#9592](https://github.com/opensearch-project/OpenSearch/issues/9592))
- Introduce cluster default remote translog buffer interval setting ([#9584](https://github.com/opensearch-project/OpenSearch/pull/9584))

### Dependencies
- Bump `org.apache.logging.log4j:log4j-core` from 2.17.1 to 2.20.0 ([#8307](https://github.com/opensearch-project/OpenSearch/pull/8307))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,7 +701,8 @@ public static final IndexShard newIndexShard(
(indexSettings, shardRouting) -> new InternalTranslogFactory(),
SegmentReplicationCheckpointPublisher.EMPTY,
null,
null
null,
() -> IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,21 @@
package org.opensearch.remotestore;

import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexRequest;
import org.opensearch.action.admin.indices.get.GetIndexResponse;
import org.opensearch.action.admin.indices.recovery.RecoveryResponse;
import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.concurrent.BufferedAsyncIOProcessor;
import org.opensearch.core.index.Index;
import org.opensearch.index.IndexService;
import org.opensearch.index.IndexSettings;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.recovery.RecoveryState;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchIntegTestCase;
Expand All @@ -26,9 +36,11 @@
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import static org.opensearch.index.shard.RemoteStoreRefreshListener.LAST_N_METADATA_FILES_TO_KEEP;
import static org.opensearch.indices.IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.comparesEqualTo;
Expand Down Expand Up @@ -184,4 +196,161 @@ public void testStaleCommitDeletionWithoutInvokeFlush() throws Exception {
// We also allow (numberOfIterations + 1) as index creation also triggers refresh.
MatcherAssert.assertThat(actualFileCount, is(oneOf(numberOfIterations - 1, numberOfIterations, numberOfIterations + 1)));
}

/**
* Tests that when the index setting is not passed during index creation, the buffer interval picked up is the cluster
* default.
*/
public void testDefaultBufferInterval() throws ExecutionException, InterruptedException {
setupRepo();
String clusterManagerName = internalCluster().getClusterManagerName();
String dataNode = internalCluster().startDataOnlyNodes(1).get(0);
createIndex(INDEX_NAME);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);
assertClusterRemoteBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, dataNode);

IndexShard indexShard = getIndexShard(dataNode);
assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor);
assertBufferInterval(IndexSettings.DEFAULT_REMOTE_TRANSLOG_BUFFER_INTERVAL, indexShard);

// Next, we change the default buffer interval and the same should reflect in the buffer interval of the index created
TimeValue clusterBufferInterval = TimeValue.timeValueSeconds(randomIntBetween(100, 200));
client(clusterManagerName).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), clusterBufferInterval))
.get();
assertBufferInterval(clusterBufferInterval, indexShard);
clearClusterBufferIntervalSetting(clusterManagerName);
}

/**
* This tests multiple cases where the index setting is passed during the index creation with multiple combinations
* with and without cluster default.
*/
public void testOverriddenBufferInterval() throws ExecutionException, InterruptedException {
setupRepo();
String clusterManagerName = internalCluster().getClusterManagerName();
String dataNode = internalCluster().startDataOnlyNodes(1).get(0);

TimeValue bufferInterval = TimeValue.timeValueSeconds(randomIntBetween(0, 100));
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval)
.build();
createIndex(INDEX_NAME, indexSettings);
ensureYellowAndNoInitializingShards(INDEX_NAME);
ensureGreen(INDEX_NAME);

IndexShard indexShard = getIndexShard(dataNode);
assertTrue(indexShard.getTranslogSyncProcessor() instanceof BufferedAsyncIOProcessor);
assertBufferInterval(bufferInterval, indexShard);

// Set the cluster default with a different value, validate that the buffer interval is still the overridden value
TimeValue clusterBufferInterval = TimeValue.timeValueSeconds(randomIntBetween(100, 200));
client(clusterManagerName).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), clusterBufferInterval))
.get();
assertBufferInterval(bufferInterval, indexShard);

// Set the index setting (index.remote_store.translog.buffer_interval) with a different value and validate that
// the buffer interval is updated
bufferInterval = TimeValue.timeValueSeconds(bufferInterval.seconds() + randomIntBetween(1, 100));
client(clusterManagerName).admin()
.indices()
.updateSettings(
new UpdateSettingsRequest(INDEX_NAME).settings(
Settings.builder().put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval)
)
)
.get();
assertBufferInterval(bufferInterval, indexShard);

// Set the index setting (index.remote_store.translog.buffer_interval) with null and validate the buffer interval
// which will be the cluster default now.
client(clusterManagerName).admin()
.indices()
.updateSettings(
new UpdateSettingsRequest(INDEX_NAME).settings(
Settings.builder().putNull(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey())
)
)
.get();
assertBufferInterval(clusterBufferInterval, indexShard);
clearClusterBufferIntervalSetting(clusterManagerName);
}

/**
* This tests validation which kicks in during index creation failing creation if the value is less than minimum allowed value.
*/
public void testOverriddenBufferIntervalValidation() {
setupRepo();
TimeValue bufferInterval = TimeValue.timeValueSeconds(-1);
Settings indexSettings = Settings.builder()
.put(indexSettings())
.put(IndexSettings.INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), bufferInterval)
.build();
IllegalArgumentException exceptionDuringCreateIndex = assertThrows(
IllegalArgumentException.class,
() -> createIndex(INDEX_NAME, indexSettings)
);
assertEquals(
"failed to parse value [-1] for setting [index.remote_store.translog.buffer_interval], must be >= [0ms]",
exceptionDuringCreateIndex.getMessage()
);
}

/**
* This tests validation of the cluster setting when being set.
*/
public void testClusterBufferIntervalValidation() {
String clusterManagerName = internalCluster().startClusterManagerOnlyNode();
setupRepo(false);
IllegalArgumentException exception = assertThrows(
IllegalArgumentException.class,
() -> client(clusterManagerName).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(
Settings.builder().put(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey(), TimeValue.timeValueSeconds(-1))
)
.get()
);
assertEquals(
"failed to parse value [-1] for setting [cluster.remote_store.translog.buffer_interval], must be >= [0ms]",
exception.getMessage()
);
}

private IndexShard getIndexShard(String dataNode) throws ExecutionException, InterruptedException {
String clusterManagerName = internalCluster().getClusterManagerName();
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode);
GetIndexResponse getIndexResponse = client(clusterManagerName).admin().indices().getIndex(new GetIndexRequest()).get();
String uuid = getIndexResponse.getSettings().get(INDEX_NAME).get(IndexMetadata.SETTING_INDEX_UUID);
IndexService indexService = indicesService.indexService(new Index(INDEX_NAME, uuid));
return indexService.getShard(0);
}

private void assertClusterRemoteBufferInterval(TimeValue expectedBufferInterval, String dataNode) {
IndicesService indicesService = internalCluster().getInstance(IndicesService.class, dataNode);
assertEquals(expectedBufferInterval, indicesService.getClusterRemoteTranslogBufferInterval());
}

private void assertBufferInterval(TimeValue expectedBufferInterval, IndexShard indexShard) {
assertEquals(
expectedBufferInterval,
((BufferedAsyncIOProcessor<?>) indexShard.getTranslogSyncProcessor()).getBufferIntervalSupplier().get()
);
}

private void clearClusterBufferIntervalSetting(String clusterManagerName) {
client(clusterManagerName).admin()
.cluster()
.prepareUpdateSettings()
.setTransientSettings(Settings.builder().putNull(CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.getKey()))
.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,8 @@ public void apply(Settings value, Settings current, Settings previous) {
List.of(
IndicesService.CLUSTER_REMOTE_STORE_ENABLED_SETTING,
IndicesService.CLUSTER_REMOTE_SEGMENT_STORE_REPOSITORY_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING
IndicesService.CLUSTER_REMOTE_TRANSLOG_REPOSITORY_SETTING,
IndicesService.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING
),
List.of(FeatureFlags.CONCURRENT_SEGMENT_SEARCH),
List.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,8 @@ private TimeValue getBufferInterval() {

protected abstract String getBufferProcessThreadPoolName();

// Exclusively for testing, please do not use it elsewhere.
public Supplier<TimeValue> getBufferIntervalSupplier() {
return bufferIntervalSupplier;
}
}
6 changes: 4 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -601,7 +601,8 @@ public IndexService newIndexService(
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.DirectoryFactory remoteDirectoryFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down Expand Up @@ -658,7 +659,8 @@ public IndexService newIndexService(
valuesSourceRegistry,
recoveryStateFactory,
translogFactorySupplier,
clusterDefaultRefreshIntervalSupplier
clusterDefaultRefreshIntervalSupplier,
clusterRemoteTranslogBufferIntervalSupplier
);
success = true;
return indexService;
Expand Down
8 changes: 6 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final ValuesSourceRegistry valuesSourceRegistry;
private final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier;
private final Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier;
private final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -210,7 +211,8 @@ public IndexService(
ValuesSourceRegistry valuesSourceRegistry,
IndexStorePlugin.RecoveryStateFactory recoveryStateFactory,
BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -284,6 +286,7 @@ public IndexService(
this.globalCheckpointTask = new AsyncGlobalCheckpointTask(this);
this.retentionLeaseSyncTask = new AsyncRetentionLeaseSyncTask(this);
this.translogFactorySupplier = translogFactorySupplier;
this.clusterRemoteTranslogBufferIntervalSupplier = clusterRemoteTranslogBufferIntervalSupplier;
updateFsyncTaskIfNecessary();
}

Expand Down Expand Up @@ -512,7 +515,8 @@ public synchronized IndexShard createShard(
translogFactorySupplier,
this.indexSettings.isSegRepEnabled() ? checkpointPublisher : null,
remoteStore,
remoteStoreStatsTrackerFactory
remoteStoreStatsTrackerFactory,
clusterRemoteTranslogBufferIntervalSupplier
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down
7 changes: 7 additions & 0 deletions server/src/main/java/org/opensearch/index/IndexSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -1197,6 +1197,13 @@ public TimeValue getRemoteTranslogUploadBufferInterval() {
return remoteTranslogUploadBufferInterval;
}

/**
* Returns true iff the remote translog buffer interval setting exists or in other words is explicitly set.
*/
public boolean isRemoteTranslogBufferIntervalExplicit() {
return INDEX_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING.exists(settings);
}

public void setRemoteTranslogUploadBufferInterval(TimeValue remoteTranslogUploadBufferInterval) {
this.remoteTranslogUploadBufferInterval = remoteTranslogUploadBufferInterval;
}
Expand Down
20 changes: 18 additions & 2 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ public IndexShard(
final BiFunction<IndexSettings, ShardRouting, TranslogFactory> translogFactorySupplier,
@Nullable final SegmentReplicationCheckpointPublisher checkpointPublisher,
@Nullable final Store remoteStore,
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory
final RemoteStoreStatsTrackerFactory remoteStoreStatsTrackerFactory,
final Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier
) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
Expand All @@ -382,7 +383,7 @@ public IndexShard(
threadPool,
this::getEngine,
indexSettings.isRemoteTranslogStoreEnabled(),
indexSettings::getRemoteTranslogUploadBufferInterval
() -> getRemoteTranslogUploadBufferInterval(clusterRemoteTranslogBufferIntervalSupplier)
);
this.mapperService = mapperService;
this.indexCache = indexCache;
Expand Down Expand Up @@ -4117,6 +4118,8 @@ private static AsyncIOProcessor<Translog.Location> createTranslogSyncProcessor(
boolean bufferAsyncIoProcessor,
Supplier<TimeValue> bufferIntervalSupplier
) {
assert bufferAsyncIoProcessor == false || Objects.nonNull(bufferIntervalSupplier)
: "If bufferAsyncIoProcessor is true, then the bufferIntervalSupplier needs to be non null";
ThreadContext threadContext = threadPool.getThreadContext();
CheckedConsumer<List<Tuple<Translog.Location, Consumer<Exception>>>, IOException> writeConsumer = candidates -> {
try {
Expand Down Expand Up @@ -4911,4 +4914,17 @@ RetentionLeaseSyncer getRetentionLeaseSyncer() {
public GatedCloseable<SegmentInfos> getSegmentInfosSnapshot() {
return getEngine().getSegmentInfosSnapshot();
}

private TimeValue getRemoteTranslogUploadBufferInterval(Supplier<TimeValue> clusterRemoteTranslogBufferIntervalSupplier) {
assert Objects.nonNull(clusterRemoteTranslogBufferIntervalSupplier) : "remote translog buffer interval supplier is null";
if (indexSettings().isRemoteTranslogBufferIntervalExplicit()) {
return indexSettings().getRemoteTranslogUploadBufferInterval();
}
return clusterRemoteTranslogBufferIntervalSupplier.get();
}

// Exclusively for testing, please do not use it elsewhere.
public AsyncIOProcessor<Translog.Location> getTranslogSyncProcessor() {
return translogSyncProcessor;
}
}
Loading

0 comments on commit f5d3fd2

Please sign in to comment.