Skip to content

Commit

Permalink
search replica recovery
Browse files Browse the repository at this point in the history
Signed-off-by: Marc Handalian <marc.handalian@gmail.com>
  • Loading branch information
mch2 committed Sep 13, 2024
1 parent e688d39 commit 6a0bf73
Show file tree
Hide file tree
Showing 17 changed files with 101 additions and 33 deletions.
1 change: 1 addition & 0 deletions gradle/run.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ testClusters {
plugin('plugins:'.concat(p))
}
}
systemProperty("opensearch.experimental.feature.read.write.split.enabled", "true")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,8 +716,8 @@ public static final IndexShard newIndexShard(
null,
DefaultRemoteStoreSettings.INSTANCE,
false,
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting)
);
IndexShardTestUtils.getFakeDiscoveryNodes(initializingShardRouting),
(s, r) -> {});
}

private static ShardRouting getInitializingShardRouting(ShardRouting existingShardRouting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class SearchReplicaReplicationIT extends SegmentReplicationBaseIT {

@Before
public void randomizeRemoteStoreEnabled() {
useRemoteStore = randomBoolean();
useRemoteStore = false;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ private Builder initializeEmpty(IndexMetadata indexMetadata, UnassignedInfo unas
shardId,
false,
true,
PeerRecoverySource.INSTANCE, // TODO: Update to remote store if enabled
EmptyStoreRecoverySource.INSTANCE,
unassignedInfo
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,21 +131,25 @@ public class IndexShardRoutingTable extends AbstractDiffable<IndexShardRoutingTa
if (shard.active()) {
activeShards.add(shard);
}
if (shard.initializing()) {
if (shard.initializing()) {
allInitializingShards.add(shard);
}
if (shard.relocating()) {
// create the target initializing shard routing on the node the shard is relocating to
allInitializingShards.add(shard.getTargetRelocatingShard());
allAllocationIds.add(shard.getTargetRelocatingShard().allocationId().getId());
if (shard.isSearchOnly() == false) {
allAllocationIds.add(shard.getTargetRelocatingShard().allocationId().getId());
}

assert shard.assignedToNode() : "relocating from unassigned " + shard;
assert shard.getTargetRelocatingShard().assignedToNode() : "relocating to unassigned " + shard.getTargetRelocatingShard();
assignedShards.add(shard.getTargetRelocatingShard());
}
if (shard.assignedToNode()) {
assignedShards.add(shard);
allAllocationIds.add(shard.allocationId().getId());
if (shard.isSearchOnly() == false) {
allAllocationIds.add(shard.allocationId().getId());
}
}
if (shard.state() != ShardRoutingState.STARTED) {
allShardsStarted = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,10 @@ protected ShardRouting(
assert !(state == ShardRoutingState.UNASSIGNED && unassignedInfo == null) : "unassigned shard must be created with meta";
assert (state == ShardRoutingState.UNASSIGNED || state == ShardRoutingState.INITIALIZING) == (recoverySource != null)
: "recovery source only available on unassigned or initializing shard but was " + state;
assert recoverySource == null || recoverySource == PeerRecoverySource.INSTANCE || primary
assert recoverySource == null || recoverySource == PeerRecoverySource.INSTANCE || primary || searchOnly
: "replica shards always recover from primary";
assert searchOnly == false || recoverySource == null || recoverySource == RecoverySource.EmptyStoreRecoverySource.INSTANCE
: "search replicas always recover as empty store";
assert (currentNodeId == null) == (state == ShardRoutingState.UNASSIGNED) : "unassigned shard must not be assigned to a node "
+ this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.AllocationId;
import org.opensearch.cluster.routing.IndexShardRoutingTable;
import org.opensearch.cluster.routing.RecoverySource;
import org.opensearch.cluster.routing.RoutingChangesObserver;
Expand Down Expand Up @@ -263,6 +264,10 @@ private IndexMetadata.Builder updateInSyncAllocations(
// the
// primary
IndexShardRoutingTable newShardRoutingTable = newRoutingTable.shardRoutingTable(shardId);
Set<String> sor = newShardRoutingTable.searchOnlyReplicas().stream()
.filter(r -> r.allocationId() != null)
.map(r -> r.allocationId().getId()).collect(Collectors.toSet());

assert newShardRoutingTable.assignedShards()
.stream()
.filter(ShardRouting::isRelocationTarget)
Expand All @@ -279,12 +284,20 @@ private IndexMetadata.Builder updateInSyncAllocations(
+ " than maximum possible active shards "
+ maxActiveShards;
Set<String> assignedAllocations = assignedShards.stream().map(s -> s.allocationId().getId()).collect(Collectors.toSet());
logger.info("Filtering out {}", sor);
inSyncAllocationIds = inSyncAllocationIds.stream()
.filter(id -> sor.contains(id) == false)
.sorted(Comparator.comparing(assignedAllocations::contains).reversed()) // values with routing entries first
.limit(maxActiveShards)
.collect(Collectors.toSet());
}

logger.info("WTF {} {}", inSyncAllocationIds, sor);
inSyncAllocationIds = inSyncAllocationIds.stream()
.filter(id -> sor.contains(id) == false)
.collect(Collectors.toSet());
logger.info("WTF {} {}", inSyncAllocationIds, sor);

// only remove allocation id of failed active primary if there is at least one active shard remaining. Assume for example that
// the primary fails but there is no new primary to fail over to. If we were to remove the allocation id of the primary from the
// in-sync set, this could create an empty primary on the next allocation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,8 +190,14 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
return allocation.decision(YES, NAME, "below primary recovery limit of [%d]", primariesInitialRecoveries);
}
} else {
if (shardRouting.isSearchOnly()) {
// search replicas recover from store and trigger a round of segRep before being marked active.
// the replication source can be either be another node for node-node replication or directly from the store.
// In a node-node case outgoing replication events are throttled by bytes/s transferred.
}
// Peer recovery
assert initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.PEER;
assert initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.PEER ||
initializingShard(shardRouting, node.nodeId()).recoverySource().getType() == RecoverySource.Type.EMPTY_STORE;

if (shardRouting.unassignedReasonIndexCreated()) {
return allocateInitialShardCopies(shardRouting, node, allocation);
Expand Down Expand Up @@ -274,6 +280,15 @@ private Decision allocateShardCopies(
);
}
} else {
if (shardRouting.isSearchOnly()) {
return allocation.decision(
YES,
NAME,
"below shard recovery limit of incoming: [%d < %d]",
currentInRecoveries,
inRecoveriesLimit
);
}
// search for corresponding recovery source (= primary shard) and check number of outgoing recoveries on that node
ShardRouting primaryShard = allocation.routingNodes().activePrimary(shardRouting.shardId());
if (primaryShard == null) {
Expand Down
5 changes: 3 additions & 2 deletions server/src/main/java/org/opensearch/index/IndexModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
Expand Down Expand Up @@ -653,7 +654,7 @@ public IndexService newIndexService(
clusterDefaultRefreshIntervalSupplier,
recoverySettings,
remoteStoreSettings,
(s) -> {}
(s, r) -> {}
);
}

Expand All @@ -679,7 +680,7 @@ public IndexService newIndexService(
Supplier<TimeValue> clusterDefaultRefreshIntervalSupplier,
RecoverySettings recoverySettings,
RemoteStoreSettings remoteStoreSettings,
Consumer<IndexShard> replicator
BiConsumer<IndexShard, Runnable> replicator
) throws IOException {
final IndexEventListener eventListener = freeze();
Function<IndexService, CheckedFunction<DirectoryReader, DirectoryReader, IOException>> readerWrapperFactory = indexReaderWrapper
Expand Down
13 changes: 8 additions & 5 deletions server/src/main/java/org/opensearch/index/IndexService.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.BooleanSupplier;
import java.util.function.Consumer;
Expand Down Expand Up @@ -196,7 +197,7 @@ public class IndexService extends AbstractIndexComponent implements IndicesClust
private final RemoteStoreSettings remoteStoreSettings;
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;
private final BiConsumer<IndexShard, Runnable> replicator;

public IndexService(
IndexSettings indexSettings,
Expand Down Expand Up @@ -235,7 +236,7 @@ public IndexService(
RemoteStoreSettings remoteStoreSettings,
FileCache fileCache,
CompositeIndexSettings compositeIndexSettings,
Consumer<IndexShard> replicator
BiConsumer<IndexShard, Runnable> replicator
) {
super(indexSettings);
this.allowExpensiveQueries = allowExpensiveQueries;
Expand Down Expand Up @@ -395,7 +396,7 @@ public IndexService(
remoteStoreSettings,
null,
null,
s -> {}
(s, r) -> {}
);
}

Expand Down Expand Up @@ -691,7 +692,8 @@ protected void closeInternal() {
recoverySettings,
remoteStoreSettings,
seedRemote,
discoveryNodes
discoveryNodes,
replicator
);
eventListener.indexShardStateChanged(indexShard, null, indexShard.state(), "shard created");
eventListener.afterIndexShardCreated(indexShard);
Expand Down Expand Up @@ -1408,8 +1410,9 @@ private void maybeSyncSegments(boolean force) {
if (getRefreshInterval().millis() > 0 || force) {
for (IndexShard shard : this.shards.values()) {
try {
logger.info("Is shard active {} {}", shard.routingEntry().isSearchOnly(), shard.routingEntry().active());
if (shard.routingEntry().isSearchOnly() && shard.routingEntry().active()) {
replicator.accept(shard);
shard.syncSegments();
}
} catch (IndexShardClosedException | AlreadyClosedException ex) {
// do nothing
Expand Down
40 changes: 33 additions & 7 deletions server/src/main/java/org/opensearch/index/shard/IndexShard.java
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,7 @@ public class IndexShard extends AbstractIndexShardComponent implements IndicesCl
private final PendingReplicationActions pendingReplicationActions;
private final ReplicationTracker replicationTracker;
private final SegmentReplicationCheckpointPublisher checkpointPublisher;
private final BiConsumer<IndexShard, Runnable> replicator;

protected volatile ShardRouting shardRouting;
protected volatile IndexShardState state;
Expand Down Expand Up @@ -391,8 +392,8 @@ public IndexShard(
final RecoverySettings recoverySettings,
final RemoteStoreSettings remoteStoreSettings,
boolean seedRemote,
final DiscoveryNodes discoveryNodes
) throws IOException {
final DiscoveryNodes discoveryNodes,
final BiConsumer<IndexShard, Runnable> replicator) throws IOException {
super(shardRouting.shardId(), indexSettings);
assert shardRouting.initializing();
this.shardRouting = shardRouting;
Expand Down Expand Up @@ -493,6 +494,7 @@ public boolean shouldCache(Query query) {
this.fileDownloader = new RemoteStoreFileDownloader(shardRouting.shardId(), threadPool, recoverySettings);
this.shardMigrationState = getShardMigrationState(indexSettings, seedRemote);
this.discoveryNodes = discoveryNodes;
this.replicator = replicator;
}

public ThreadPool getThreadPool() {
Expand Down Expand Up @@ -2010,6 +2012,10 @@ public void resetToWriteableEngine() throws IOException, InterruptedException, T
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> { resetEngineToGlobalCheckpoint(); });
}

public void syncSegments() {
replicator.accept(this, () -> {});
}

/**
* Wrapper for a non-closing reader
*
Expand Down Expand Up @@ -2514,8 +2520,10 @@ public void openEngineAndRecoverFromTranslog(boolean syncFromRemote) throws IOEx
translogConfig.setDownloadRemoteTranslogOnInit(true);
}

getEngine().translogManager()
.recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), Long.MAX_VALUE);
if (routingEntry().isSearchOnly() == false) {
getEngine().translogManager()
.recoverFromTranslog(translogRecoveryRunner, getEngine().getProcessedLocalCheckpoint(), Long.MAX_VALUE);
}
}

/**
Expand Down Expand Up @@ -2889,10 +2897,28 @@ public void recoverFromLocalShards(
public void recoverFromStore(ActionListener<Boolean> listener) {
// we are the first primary, recover from the gateway
// if its post api allocation, the index should exists
assert shardRouting.primary() : "recover from store only makes sense if the shard is a primary shard";
assert shardRouting.primary() || shardRouting.isSearchOnly() : "recover from store only makes sense if the shard is a primary shard";
assert shardRouting.initializing() : "can only start recovery on initializing shard";
StoreRecovery storeRecovery = new StoreRecovery(shardId, logger);
storeRecovery.recoverFromStore(this, listener);
ActionListener<Boolean> wrappedListener = ActionListener.wrap(
success -> {
if (success) {
if (routingEntry().isSearchOnly()) {
replicator.accept(this, () -> {
logger.info("Finished replication as part of recovery");
listener.onResponse(true);
});
} else {
listener.onResponse(true);
}
} else {
listener.onResponse(false);
}
},
listener::onFailure
);

storeRecovery.recoverFromStore(this, wrappedListener);
}

public void restoreFromRemoteStore(ActionListener<Boolean> listener) {
Expand Down Expand Up @@ -3873,7 +3899,7 @@ private void executeRecovery(
markAsRecovering(reason, recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(ActionRunnable.wrap(ActionListener.wrap(r -> {
if (r) {
recoveryListener.onDone(recoveryState);
recoveryListener.onDone(recoveryState);
}
}, e -> recoveryListener.onFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true)), action));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,7 +544,7 @@ private boolean canRecover(IndexShard indexShard) {
// got closed on us, just ignore this recovery
return false;
}
if (indexShard.routingEntry().primary() == false) {
if (indexShard.routingEntry().primary() == false && indexShard.routingEntry().isSearchOnly() == false) {
throw new IndexShardRecoveryException(shardId, "Trying to recover when the shard is in backup state", null);
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,7 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
Expand Down Expand Up @@ -359,7 +360,7 @@ public class IndicesService extends AbstractLifecycleComponent
private final SearchRequestStats searchRequestStats;
private final FileCache fileCache;
private final CompositeIndexSettings compositeIndexSettings;
private final Consumer<IndexShard> replicator;
private final BiConsumer<IndexShard, Runnable> replicator;

@Override
protected void doStart() {
Expand Down Expand Up @@ -397,7 +398,7 @@ public IndicesService(
RemoteStoreSettings remoteStoreSettings,
FileCache fileCache,
CompositeIndexSettings compositeIndexSettings,
Consumer<IndexShard> replicator
BiConsumer<IndexShard, Runnable> replicator
) {
this.settings = settings;
this.threadPool = threadPool;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public SegmentReplicator(ThreadPool threadPool) {
* Starts a replication event for the given shard.
* @param shard - {@link IndexShard} replica shard
*/
public void startReplication(IndexShard shard) {
public void startReplication(IndexShard shard, Runnable runnable) {
if (sourceFactory.get() == null) return;
startReplication(
shard,
Expand All @@ -67,6 +67,7 @@ public void startReplication(IndexShard shard) {
@Override
public void onReplicationDone(SegmentReplicationState state) {
logger.trace("Completed replication for {}", shard.shardId());
runnable.run();
}

@Override
Expand All @@ -75,6 +76,7 @@ public void onReplicationFailure(SegmentReplicationState state, ReplicationFaile
if (sendShardFailure) {
shard.failShard("unrecoverable replication failure", e);
}
runnable.run();
}
}
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ private IndexService newIndexService(IndexModule module) throws IOException {
() -> IndexSettings.DEFAULT_REFRESH_INTERVAL,
DefaultRecoverySettings.INSTANCE,
DefaultRemoteStoreSettings.INSTANCE,
s -> {}
(s, r) -> {}
);
}

Expand Down
Loading

0 comments on commit 6a0bf73

Please sign in to comment.