Skip to content

Commit

Permalink
[Segment Replication] Enable Segment Replication Backpressure setting…
Browse files Browse the repository at this point in the history
… by default. (#7183)

* Enalble Segment Replication Backpressure setting by default.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Apply spotless.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

* Add null check to guard against test failures.

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>

---------

Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
  • Loading branch information
Rishikesh1159 authored Apr 17, 2023
1 parent e2ced4c commit a953178
Show file tree
Hide file tree
Showing 3 changed files with 36 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_INDEXING_CHECKPOINTS;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationPressureIT extends SegmentReplicationBaseIT {
Expand All @@ -49,7 +49,6 @@ public class SegmentReplicationPressureIT extends SegmentReplicationBaseIT {
protected Settings nodeSettings(int nodeOrdinal) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal))
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueSeconds(1))
.put(MAX_INDEXING_CHECKPOINTS.getKey(), MAX_CHECKPOINTS_BEHIND)
.build();
Expand Down Expand Up @@ -238,6 +237,30 @@ public void testFailStaleReplica() throws Exception {
assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId());
}

public void testWithDocumentReplicationEnabledIndex() throws Exception {
Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build();
// Starts a primary and replica node.
final String primaryNode = internalCluster().startNode(settings);
createIndex(
INDEX_NAME,
Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.DOCUMENT).build()
);
ensureYellowAndNoInitializingShards(INDEX_NAME);
final String replicaNode = internalCluster().startNode(settings);
ensureGreen(INDEX_NAME);
final AtomicInteger totalDocs = new AtomicInteger(0);
// Index docs until replica stale limit is reached.
totalDocs.getAndSet(indexUntilCheckpointCount());
// index again after stale limit.
indexDoc();
refresh(INDEX_NAME);
totalDocs.incrementAndGet();
// verify total doc count is same and docs are not rejected.
assertHitCount(client(primaryNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs.get());
assertHitCount(client(replicaNode).prepareSearch(INDEX_NAME).setSize(0).setPreference("_only_local").get(), totalDocs.get());

}

public void testBulkWritesRejected() throws Exception {
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class SegmentReplicationPressureService implements Closeable {
*/
public static final Setting<Boolean> SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED = Setting.boolSetting(
"segrep.pressure.enabled",
false,
true,
Setting.Property.Dynamic,
Setting.Property.NodeScope
);
Expand Down Expand Up @@ -128,9 +128,11 @@ AsyncFailStaleReplicaTask getFailStaleReplicaTask() {

public void isSegrepLimitBreached(ShardId shardId) {
final IndexService indexService = indicesService.indexService(shardId.getIndex());
final IndexShard shard = indexService.getShard(shardId.id());
if (isSegmentReplicationBackpressureEnabled && shard.indexSettings().isSegRepEnabled() && shard.routingEntry().primary()) {
validateReplicationGroup(shard);
if (indexService != null) {
final IndexShard shard = indexService.getShard(shardId.id());
if (isSegmentReplicationBackpressureEnabled && shard.indexSettings().isSegRepEnabled() && shard.routingEntry().primary()) {
validateReplicationGroup(shard);
}
}
}

Expand Down Expand Up @@ -233,6 +235,9 @@ protected void runInternal() {
stats.getShardStats().get(shardId).getReplicaStats()
);
final IndexService indexService = pressureService.indicesService.indexService(shardId.getIndex());
if (indexService.getIndexSettings() != null && indexService.getIndexSettings().isSegRepEnabled() == false) {
return;
}
final IndexShard primaryShard = indexService.getShard(shardId.getId());
for (SegmentReplicationShardStats staleReplica : staleReplicas) {
if (staleReplica.getCurrentReplicationTimeMillis() > 2 * pressureService.maxReplicationTime.millis()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,12 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.times;
import static org.opensearch.index.SegmentReplicationPressureService.MAX_REPLICATION_TIME_SETTING;
import static org.opensearch.index.SegmentReplicationPressureService.SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED;

public class SegmentReplicationPressureServiceTests extends OpenSearchIndexLevelReplicationTestCase {

private static ShardStateAction shardStateAction = Mockito.mock(ShardStateAction.class);
private static final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueSeconds(5))
.build();

Expand Down Expand Up @@ -99,10 +97,7 @@ public void testIsSegrepLimitBreached() throws Exception {
}

public void testIsSegrepLimitBreached_onlyCheckpointLimitBreached() throws Exception {
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.build();
final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();

try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
Expand All @@ -126,10 +121,7 @@ public void testIsSegrepLimitBreached_onlyCheckpointLimitBreached() throws Excep
}

public void testIsSegrepLimitBreached_onlyTimeLimitBreached() throws Exception {
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.build();
final Settings settings = Settings.builder().put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT).build();

try (ReplicationGroup shards = createGroup(1, settings, new NRTReplicationEngineFactory())) {
shards.startAll();
Expand Down Expand Up @@ -194,7 +186,6 @@ public void testIsSegrepLimitBreached_underStaleNodeLimit() throws Exception {
public void testFailStaleReplicaTask() throws Exception {
final Settings settings = Settings.builder()
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.put(SEGMENT_REPLICATION_INDEXING_PRESSURE_ENABLED.getKey(), true)
.put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(10))
.build();

Expand Down

0 comments on commit a953178

Please sign in to comment.