Skip to content

Commit

Permalink
Retry remote state download while bootstrap (#15950) (#16074)
Browse files Browse the repository at this point in the history
* Retry remote state download while bootstrap
(cherry picked from commit 12dadcf)

Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
opensearch-trigger-bot[bot] authored Sep 26, 2024
1 parent a1846cf commit a99fe30
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.opensearch.test.OpenSearchIntegTestCase;
import org.junit.Before;

import java.io.IOError;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
Expand Down Expand Up @@ -339,10 +340,11 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
for (UploadedIndexMetadata md : manifest.getIndices()) {
Files.move(segmentRepoPath.resolve(md.getUploadedFilename()), segmentRepoPath.resolve("cluster-state/"));
}
internalCluster().stopAllNodes();
} catch (IOException e) {
throw new RuntimeException(e);
}
assertThrows(IllegalStateException.class, () -> addNewNodes(dataNodeCount, clusterManagerNodeCount));
assertThrows(IOError.class, () -> internalCluster().client());
// Test is complete

// Starting a node without remote state to ensure test cleanup
Expand Down
60 changes: 51 additions & 9 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.model.RemoteClusterStateManifestInfo;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;

import java.io.Closeable;
import java.io.IOError;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
Expand Down Expand Up @@ -110,6 +110,8 @@ public class GatewayMetaState implements Closeable {
*/
public static final String STALE_STATE_CONFIG_NODE_ID = "STALE_STATE_CONFIG";

private final Logger logger = LogManager.getLogger(GatewayMetaState.class);

private PersistedStateRegistry persistedStateRegistry;

public PersistedState getPersistedState() {
Expand Down Expand Up @@ -176,15 +178,11 @@ public void start(
);
if (ClusterState.UNKNOWN_UUID.equals(lastKnownClusterUUID) == false) {
// Load state from remote
final RemoteRestoreResult remoteRestoreResult = remoteStoreRestoreService.restore(
// Remote Metadata should always override local disk Metadata
// if local disk Metadata's cluster uuid is UNKNOWN_UUID
ClusterState.builder(clusterState).metadata(Metadata.EMPTY_METADATA).build(),
lastKnownClusterUUID,
false,
new String[] {}
clusterState = restoreClusterStateWithRetries(
remoteStoreRestoreService,
clusterState,
lastKnownClusterUUID
);
clusterState = remoteRestoreResult.getClusterState();
}
}
remotePersistedState = new RemotePersistedState(remoteClusterStateService, lastKnownClusterUUID);
Expand Down Expand Up @@ -259,6 +257,50 @@ public void start(
}
}

private ClusterState restoreClusterStateWithRetries(
RemoteStoreRestoreService remoteStoreRestoreService,
ClusterState clusterState,
String lastKnownClusterUUID
) {
int maxAttempts = 5;
int delayInMills = 200;
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
try {
logger.info("Attempt {} to restore cluster state", attempt);
return restoreClusterState(remoteStoreRestoreService, clusterState, lastKnownClusterUUID);
} catch (Exception e) {
if (attempt == maxAttempts) {
// Throw an Error so that the process is halted.
throw new IOError(e);
}
try {
TimeUnit.MILLISECONDS.sleep(delayInMills);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // Restore interrupted status
throw new RuntimeException(ie);
}
delayInMills = delayInMills * 2;
}
}
// This statement will never be reached.
return null;
}

ClusterState restoreClusterState(
RemoteStoreRestoreService remoteStoreRestoreService,
ClusterState clusterState,
String lastKnownClusterUUID
) {
return remoteStoreRestoreService.restore(
// Remote Metadata should always override local disk Metadata
// if local disk Metadata's cluster uuid is UNKNOWN_UUID
ClusterState.builder(clusterState).metadata(Metadata.EMPTY_METADATA).build(),
lastKnownClusterUUID,
false,
new String[] {}
).getClusterState();
}

// exposed so it can be overridden by tests
ClusterState prepareInitialClusterState(TransportService transportService, ClusterService clusterService, ClusterState clusterState) {
assert clusterState.nodes().getLocalNode() == null : "prepareInitialClusterState must only be called once";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1244,14 +1244,72 @@ public void testGatewayForRemoteStateForInitialBootstrapBlocksApplied() throws I
}
}

private MockGatewayMetaState newGatewayForRemoteState(
public void testGatewayMetaStateRemoteStateDownloadRetries() throws IOException {
MockGatewayMetaState gateway = null;
MockGatewayMetaState gatewayMetaStateSpy = null;
try {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
when(remoteClusterStateService.getLastKnownUUIDFromRemote("test-cluster")).thenReturn("test-cluster-uuid");
RemoteStoreRestoreService remoteStoreRestoreService = mock(RemoteStoreRestoreService.class);
when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), any())).thenThrow(
new IllegalStateException("unable to download cluster state")
).thenReturn(RemoteRestoreResult.build("test-cluster-uuid", null, ClusterState.EMPTY_STATE));
final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
gateway = initializeGatewayForRemoteState(true);
gatewayMetaStateSpy = Mockito.spy(gateway);
startGatewayForRemoteState(
gatewayMetaStateSpy,
remoteClusterStateService,
remoteStoreRestoreService,
persistedStateRegistry,
ClusterState.EMPTY_STATE
);
verify(gatewayMetaStateSpy, times(2)).restoreClusterState(Mockito.any(), Mockito.any(), Mockito.any());
} finally {
IOUtils.close(gatewayMetaStateSpy);
}
}

public void testGatewayMetaStateRemoteStateDownloadFailure() throws IOException {
MockGatewayMetaState gateway = null;
final MockGatewayMetaState gatewayMetaStateSpy;
try {
RemoteClusterStateService remoteClusterStateService = mock(RemoteClusterStateService.class);
when(remoteClusterStateService.getLastKnownUUIDFromRemote("test-cluster")).thenReturn("test-cluster-uuid");
RemoteStoreRestoreService remoteStoreRestoreService = mock(RemoteStoreRestoreService.class);
when(remoteStoreRestoreService.restore(any(), any(), anyBoolean(), any())).thenThrow(
new IllegalStateException("unable to download cluster state")
);
final PersistedStateRegistry persistedStateRegistry = persistedStateRegistry();
gateway = initializeGatewayForRemoteState(true);
gatewayMetaStateSpy = Mockito.spy(gateway);
assertThrows(
Error.class,
() -> startGatewayForRemoteState(
gatewayMetaStateSpy,
remoteClusterStateService,
remoteStoreRestoreService,
persistedStateRegistry,
ClusterState.EMPTY_STATE
)
);
verify(gatewayMetaStateSpy, times(5)).restoreClusterState(Mockito.any(), Mockito.any(), Mockito.any());
} finally {
IOUtils.close(gateway);
}
}

private MockGatewayMetaState initializeGatewayForRemoteState(boolean prepareFullState) {
return new MockGatewayMetaState(localNode, bigArrays, prepareFullState);
}

private MockGatewayMetaState startGatewayForRemoteState(
MockGatewayMetaState gateway,
RemoteClusterStateService remoteClusterStateService,
RemoteStoreRestoreService remoteStoreRestoreService,
PersistedStateRegistry persistedStateRegistry,
ClusterState currentState,
boolean prepareFullState
ClusterState currentState
) throws IOException {
MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays, prepareFullState);
String randomRepoName = "randomRepoName";
String stateRepoTypeAttributeKey = String.format(
Locale.getDefault(),
Expand Down Expand Up @@ -1305,6 +1363,24 @@ private MockGatewayMetaState newGatewayForRemoteState(
return gateway;
}

private MockGatewayMetaState newGatewayForRemoteState(
RemoteClusterStateService remoteClusterStateService,
RemoteStoreRestoreService remoteStoreRestoreService,
PersistedStateRegistry persistedStateRegistry,
ClusterState currentState,
boolean prepareFullState
) throws IOException {
MockGatewayMetaState gatewayMetaState = initializeGatewayForRemoteState(prepareFullState);
startGatewayForRemoteState(
gatewayMetaState,
remoteClusterStateService,
remoteStoreRestoreService,
persistedStateRegistry,
currentState
);
return gatewayMetaState;
}

private static BigArrays getBigArrays() {
return usually()
? BigArrays.NON_RECYCLING_INSTANCE
Expand Down

0 comments on commit a99fe30

Please sign in to comment.