Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] SegRep with Remote: Update components of segrep backpressure to support remote store. (#8020) #8066

Merged
merged 4 commits into from
Jun 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement concurrent aggregations support without profile option ([#7514](https://github.com/opensearch-project/OpenSearch/pull/7514))
- Add dynamic index and cluster setting for concurrent segment search ([#7956](https://github.com/opensearch-project/OpenSearch/pull/7956))
- Add descending order search optimization through reverse segment read. ([#7967](https://github.com/opensearch-project/OpenSearch/pull/7967))
- Update components of segrep backpressure to support remote store. ([#8020](https://github.com/opensearch-project/OpenSearch/pull/8020))
- Make remote cluster connection setup in async ([#8038](https://github.com/opensearch-project/OpenSearch/pull/8038))

### Dependencies
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,9 @@ public void testWritesRejected() throws Exception {
assertEquals(perGroupStats.getRejectedRequestCount(), 2L);
}
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
waitForSearchableDocs(totalDocs.get(), replicaNodes.toArray(new String[] {}));

// wait for the replicas to catch up after block is released.
assertReplicaCheckpointUpdated(primaryShard);
// index another doc showing there is no pressure enforced.
indexDoc();
refresh(INDEX_NAME);
Expand Down Expand Up @@ -179,7 +179,7 @@ public void testAddReplicaWhileWritesBlocked() throws Exception {
}
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
waitForSearchableDocs(totalDocs.get(), replicaNodes.toArray(new String[] {}));
assertReplicaCheckpointUpdated(primaryShard);

// index another doc showing there is no pressure enforced.
indexDoc();
Expand Down Expand Up @@ -258,6 +258,10 @@ public void testFailStaleReplica() throws Exception {
}

public void testWithDocumentReplicationEnabledIndex() throws Exception {
assumeTrue(
"Can't create DocRep index with remote store enabled. Skipping.",
indexSettings().getAsBoolean(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, false) == false
);
Settings settings = Settings.builder().put(MAX_REPLICATION_TIME_SETTING.getKey(), TimeValue.timeValueMillis(500)).build();
// Starts a primary and replica node.
final String primaryNode = internalCluster().startNode(settings);
Expand Down Expand Up @@ -313,7 +317,7 @@ public void testBulkWritesRejected() throws Exception {
}
refresh(INDEX_NAME);
// wait for the replicas to catch up after block is released.
waitForSearchableDocs(totalDocs, replicaNodes.toArray(new String[] {}));
assertReplicaCheckpointUpdated(primaryShard);

// index another doc showing there is no pressure enforced.
executeBulkRequest(nodes, totalDocs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.opensearch.index.IndexModule;
import org.opensearch.index.IndexService;
import org.opensearch.index.SegmentReplicationPerGroupStats;
import org.opensearch.index.SegmentReplicationShardStats;
import org.opensearch.index.shard.IndexShard;
import org.opensearch.index.store.Store;
import org.opensearch.index.store.StoreFileMetadata;
Expand All @@ -38,6 +39,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -204,7 +206,7 @@ protected Releasable blockReplication(List<String> nodes, CountDownLatch latch)
node
));
mockTargetTransportService.addSendBehavior((connection, requestId, action, request, options) -> {
if (action.equals(SegmentReplicationSourceService.Actions.GET_SEGMENT_FILES)) {
if (action.equals(SegmentReplicationSourceService.Actions.UPDATE_VISIBLE_CHECKPOINT)) {
try {
latch.countDown();
pauseReplicationLatch.await();
Expand All @@ -222,4 +224,13 @@ protected Releasable blockReplication(List<String> nodes, CountDownLatch latch)
};
}

protected void assertReplicaCheckpointUpdated(IndexShard primaryShard) throws Exception {
assertBusy(() -> {
Set<SegmentReplicationShardStats> groupStats = primaryShard.getReplicationStats();
assertEquals(primaryShard.indexSettings().getNumberOfReplicas(), groupStats.size());
for (SegmentReplicationShardStats shardStat : groupStats) {
assertEquals(0, shardStat.getCheckpointsBehindCount());
}
}, 30, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -797,10 +797,6 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
}

public void testPressureServiceStats() throws Exception {
assumeFalse(
"Skipping the test as pressure service is not compatible with SegRep and Remote store yet.",
segmentReplicationWithRemoteEnabled()
);
final String primaryNode = internalCluster().startNode();
createIndex(INDEX_NAME);
final String replicaNode = internalCluster().startNode();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.remotestore;

import org.junit.After;
import org.junit.Before;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.SegmentReplicationPressureIT;
import org.opensearch.indices.replication.common.ReplicationType;
import org.opensearch.test.OpenSearchIntegTestCase;

import java.nio.file.Path;

import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;

/**
* This class executes the SegmentReplicationPressureIT suite with remote store integration enabled.
* Setup is similar to SegmentReplicationPressureIT but this also enables the segment replication using remote store which
* is behind SEGMENT_REPLICATION_EXPERIMENTAL flag.
*/
@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class SegmentReplicationWithRemoteStorePressureIT extends SegmentReplicationPressureIT {

private static final String REPOSITORY_NAME = "test-remote-store-repo";

@Override
public Settings indexSettings() {
return Settings.builder()
.put(super.indexSettings())
.put(IndexMetadata.SETTING_REMOTE_STORE_ENABLED, true)
.put(IndexMetadata.SETTING_REMOTE_STORE_REPOSITORY, REPOSITORY_NAME)
.put(IndexMetadata.SETTING_REMOTE_TRANSLOG_STORE_ENABLED, false)
.put(IndexMetadata.SETTING_REPLICATION_TYPE, ReplicationType.SEGMENT)
.build();
}

@Override
protected Settings featureFlagSettings() {
return Settings.builder()
.put(super.featureFlagSettings())
.put(FeatureFlags.REMOTE_STORE, "true")
.put(FeatureFlags.SEGMENT_REPLICATION_EXPERIMENTAL, "true")
.build();
}

@Before
public void setup() {
internalCluster().startClusterManagerOnlyNode();
Path absolutePath = randomRepoPath().toAbsolutePath();
assertAcked(
clusterAdmin().preparePutRepository(REPOSITORY_NAME).setType("fs").setSettings(Settings.builder().put("location", absolutePath))
);
}

@After
public void teardown() {
assertAcked(clusterAdmin().prepareDeleteRepository(REPOSITORY_NAME));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1176,13 +1176,18 @@ public synchronized void updateVisibleCheckpointForShard(final String allocation
assert handoffInProgress == false;
assert invariant();
final CheckpointState cps = checkpoints.get(allocationId);
assert !this.shardAllocationId.equals(allocationId) && cps != null;
assert !this.shardAllocationId.equals(allocationId);
// Ignore if the cps is null (i.e. replica shard not in active state).
if (cps == null) {
logger.warn("Ignoring the checkpoint update for allocation ID {} as its not being tracked by primary", allocationId);
return;
}
if (cps.checkpointTimers.isEmpty() == false) {
// stop any timers for checkpoints up to the received cp and remove from cps.checkpointTimers.
// Compute the max lag from the set of completed timers.
final AtomicLong lastFinished = new AtomicLong(0L);
cps.checkpointTimers.entrySet().removeIf((entry) -> {
boolean result = visibleCheckpoint.equals(entry.getKey()) || visibleCheckpoint.isAheadOf(entry.getKey());
boolean result = entry.getKey().isAheadOf(visibleCheckpoint) == false;
if (result) {
final ReplicationTimer timer = entry.getValue();
timer.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,6 @@ void startSegmentCopy(GetSegmentFilesRequest request, ActionListener<GetSegmentF
}
});
if (request.getFilesToFetch().isEmpty()) {
// before completion, alert the primary of the replica's state.
handler.getCopyState()
.getShard()
.updateVisibleCheckpointForShard(request.getTargetAllocationId(), handler.getCopyState().getCheckpoint());
wrappedListener.onResponse(new GetSegmentFilesResponse(Collections.emptyList()));
} else {
handler.sendFiles(request, wrappedListener);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ public synchronized void sendFiles(GetSegmentFilesRequest request, ActionListene

sendFileStep.whenComplete(r -> {
try {
shard.updateVisibleCheckpointForShard(allocationId, copyState.getCheckpoint());
future.onResponse(new GetSegmentFilesResponse(List.of(storeFileMetadata)));
} finally {
IOUtils.close(resources);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportRequestHandler;
import org.opensearch.transport.TransportResponse;
import org.opensearch.transport.TransportService;

import java.io.IOException;
Expand Down Expand Up @@ -63,6 +64,7 @@ public static class Actions {

public static final String GET_CHECKPOINT_INFO = "internal:index/shard/replication/get_checkpoint_info";
public static final String GET_SEGMENT_FILES = "internal:index/shard/replication/get_segment_files";
public static final String UPDATE_VISIBLE_CHECKPOINT = "internal:index/shard/replication/update_visible_checkpoint";
}

private final OngoingSegmentReplications ongoingSegmentReplications;
Expand All @@ -89,6 +91,12 @@ protected SegmentReplicationSourceService(
GetSegmentFilesRequest::new,
new GetSegmentFilesRequestHandler()
);
transportService.registerRequestHandler(
Actions.UPDATE_VISIBLE_CHECKPOINT,
ThreadPool.Names.GENERIC,
UpdateVisibleCheckpointRequest::new,
new UpdateVisibleCheckpointRequestHandler()
);
}

public SegmentReplicationSourceService(
Expand Down Expand Up @@ -142,6 +150,20 @@ public void messageReceived(GetSegmentFilesRequest request, TransportChannel cha
}
}

private class UpdateVisibleCheckpointRequestHandler implements TransportRequestHandler<UpdateVisibleCheckpointRequest> {
@Override
public void messageReceived(UpdateVisibleCheckpointRequest request, TransportChannel channel, Task task) throws Exception {
try {
IndexService indexService = indicesService.indexServiceSafe(request.getPrimaryShardId().getIndex());
IndexShard indexShard = indexService.getShard(request.getPrimaryShardId().id());
indexShard.updateVisibleCheckpointForShard(request.getTargetAllocationId(), request.getCheckpoint());
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Exception e) {
channel.sendResponse(e);
}
}
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.nodesRemoved()) {
Expand Down
Loading