Skip to content

Commit

Permalink
SegRep with Remote: Update components of segrep backpressure to suppo… (
Browse files Browse the repository at this point in the history
opensearch-project#8020)

* SegRep with Remote: Update components of segrep backpressure to support remote store.

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Adding tests

Signed-off-by: Ankit Kala <ankikala@amazon.com>

* Addressed comments

Signed-off-by: Ankit Kala <ankikala@amazon.com>

---------

Signed-off-by: Ankit Kala <ankikala@amazon.com>
Signed-off-by: Rishab Nahata <rnnahata@amazon.com>
  • Loading branch information
ankitkala authored and imRishN committed Jun 27, 2023
1 parent e39b9c6 commit 4c65364
Show file tree
Hide file tree
Showing 18 changed files with 464 additions and 27 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add TokenManager Interface ([#7452](https://github.com/opensearch-project/OpenSearch/pull/7452))
- Add Remote store as a segment replication source ([#7653](https://github.com/opensearch-project/OpenSearch/pull/7653))
- Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967))

- Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020))

### Dependencies
- Bump `jackson` from 2.15.1 to 2.15.2 ([#7897](https://github.com/opensearch-project/OpenSearch/pull/7897))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public void testWritesRejected() throws Exception {
assertEquals(perGroupStats.getRejectedRequestCount(), 2L);
}
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
waitForSearchableDocs(totalDocs.get(), replicaNodes.toArray(new String[] {}));

// wait for the replicas to catch up after block is released.
assertReplicaCheckpointUpdated(primaryShard);
// index another doc showing there is no pressure enforced.
indexDoc();
refresh(INDEX_NAME);
Expand Down Expand Up @@ -179,7 +179,7 @@ public void testAddReplicaWhileWritesBlocked() throws Exception {
}
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
waitForSearchableDocs(totalDocs.get(), replicaNodes.toArray(new String[] {}));
assertReplicaCheckpointUpdated(primaryShard);

// index another doc showing there is no pressure enforced.
indexDoc();
Expand Down Expand Up @@ -258,6 +258,10 @@ public void testFailStaleReplica() throws Exception {
}

public void testWithDocumentReplicationEnabledIndex() throws Exception {
assumeTrue(
"Can't create DocRep index with remote store enabled. Skipping.",
indexSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false) == false
);
Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build();
// Starts a primary and replica node.
final String primaryNode = internalCluster().startNode(settings);
Expand Down Expand Up @@ -313,7 +317,7 @@ public void testBulkWritesRejected() throws Exception {
}
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
waitForSearchableDocs(totalDocs, replicaNodes.toArray(new String[] {}));
assertReplicaCheckpointUpdated(primaryShard);

// index another doc showing there is no pressure enforced.
executeBulkRequest(nodes, totalDocs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand All @@ -38,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -204,7 +206,7 @@ protected Releasable blockReplication(List<String> nodes, CountDownLatch latch)
node
));
mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) {
if (action.equals(SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT)) {
try {
latch.countDown();
pauseReplicationLatch.await();
Expand All @@ -222,4 +224,13 @@ protected Releasable blockReplication(List<String> nodes, CountDownLatch latch)
};
}

protected void assertReplicaCheckpointUpdated(IndexShard primaryShard) throws Exception {
assertBusy(() -> {
Set<SegmentReplicationShardStats> groupStats = primaryShard.getReplicationStats();
assertEquals(primaryShard.indexSettings().getNumberOfReplicas(), groupStats.size());
for (SegmentReplicationShardStats shardStat : groupStats) {
assertEquals(0, shardStat.getCheckpointsBehindCount());
}
}, 30, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -796,10 +796,6 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}

public void testPressureServiceStats() throws Exception {
assumeFalse(
"Skipping the test as pressure service is not compatible with SegRep and Remote store yet.",
segmentReplicationWithRemoteEnabled()
);
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.SegmentReplicationPressureIT;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* This class executes the SegmentReplicationPressureIT suite with remote store integration enabled.
* Setup is similar to SegmentReplicationPressureIT but this also enables the segment replication using remote store which
* is behind SEGMENT_REPLICATION_EXPERIMENTAL flag.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationWithRemoteStorePressureIT extends SegmentReplicationPressureIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1174,13 +1174,18 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation
assert handoffInProgress == false;
assert invariant();
final CheckpointState cps = checkpoints.get(allocationId);
assert !this.shardAllocationId.equals(allocationId) && cps != null;
assert !this.shardAllocationId.equals(allocationId);
// Ignore if the cps is null (i.e. replica shard not in active state).
if (cps == null) {
logger.warn("Ignoring the checkpoint update for allocation ID {} as its not being tracked by primary", allocationId);
return;
}
if (cps.checkpointTimers.isEmpty() == false) {
// stop any timers for checkpoints up to the received cp and remove from cps.checkpointTimers.
// Compute the max lag from the set of completed timers.
final AtomicLong lastFinished = new AtomicLong(0L);
cps.checkpointTimers.entrySet().removeIf((entry) -> {
boolean result = visibleCheckpoint.equals(entry.getKey()) || visibleCheckpoint.isAheadOf(entry.getKey());
boolean result = entry.getKey().isAheadOf(visibleCheckpoint) == false;
if (result) {
final ReplicationTimer timer = entry.getValue();
timer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
}
});
if (request.getFilesToFetch().isEmpty()) {
// before completion, alert the primary of the replica's state.
handler.getCopyState()
.getShard()
.updateVisibleCheckpointForShard(request.getTargetAllocationId(), handler.getCopyState().getCheckpoint());
wrappedListener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
} else {
handler.sendFiles(request, wrappedListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene

sendFileStep.whenComplete(r -> {
try {
shard.updateVisibleCheckpointForShard(allocationId, copyState.getCheckpoint());
future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata)));
} finally {
IOUtils.close(resources);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -63,6 +64,7 @@ public static class Actions {

public static final String GET_CHECKPOINT_INFO = "internal:index/shard/replication/get_checkpoint_info";
public static final String GET_SEGMENT_FILES = "internal:index/shard/replication/get_segment_files";
public static final String UPDATE_VISIBLE_CHECKPOINT = "internal:index/shard/replication/update_visible_checkpoint";
}

private final OngoingSegmentReplications ongoingSegmentReplications;
Expand All @@ -89,6 +91,12 @@ protected SegmentReplicationSourceService(
GetSegmentFilesRequest::new,
new GetSegmentFilesRequestHandler()
);
transportService.registerRequestHandler(
Actions.UPDATE_VISIBLE_CHECKPOINT,
ThreadPool.Names.GENERIC,
UpdateVisibleCheckpointRequest::new,
new UpdateVisibleCheckpointRequestHandler()
);
}

public SegmentReplicationSourceService(
Expand Down Expand Up @@ -142,6 +150,20 @@ public void messageReceived(GetSegmentFilesRequest request, TransportChannel cha
}
}

private class UpdateVisibleCheckpointRequestHandler implements TransportRequestHandler<UpdateVisibleCheckpointRequest> {
@Override
public void messageReceived(UpdateVisibleCheckpointRequest request, TransportChannel channel, Task task) throws Exception {
try {
IndexService indexService = indicesService.indexServiceSafe(request.getPrimaryShardId().getIndex());
IndexShard indexShard = indexService.getShard(request.getPrimaryShardId().id());
indexShard.updateVisibleCheckpointForShard(request.getTargetAllocationId(), request.getCheckpoint());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Exception e) {
channel.sendResponse(e);
}
}
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.nodesRemoved()) {
Expand Down
Loading

0 comments on commit 4c65364

Please sign in to comment.