Skip to content

Commit

Permalink
Restore cluster metadata during bootstrap
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Sep 6, 2023
1 parent 1b41fa0 commit 4bc570b
Show file tree
Hide file tree
Showing 6 changed files with 259 additions and 60 deletions.
38 changes: 22 additions & 16 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import org.opensearch.env.NodeMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.repositories.RepositoryMissingException;
Expand Down Expand Up @@ -126,7 +128,8 @@ public void start(
MetadataUpgrader metadataUpgrader,
PersistedClusterStateService persistedClusterStateService,
RemoteClusterStateService remoteClusterStateService,
PersistedStateRegistry persistedStateRegistry
PersistedStateRegistry persistedStateRegistry,
RemoteStoreRestoreService remoteStoreRestoreService
) {
assert this.persistedStateRegistry == null : "Persisted state registry should only be set once";
this.persistedStateRegistry = persistedStateRegistry;
Expand Down Expand Up @@ -154,7 +157,7 @@ public void start(
PersistedState remotePersistedState = null;
boolean success = false;
try {
final ClusterState clusterState = prepareInitialClusterState(
ClusterState clusterState = prepareInitialClusterState(
transportService,
clusterService,
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
Expand All @@ -164,10 +167,24 @@ public void start(
);

if (DiscoveryNode.isClusterManagerNode(settings)) {
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
if (REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
// If there is valid cluster UUID in the state loaded from local, then fetch the previous cluster UUID from remote just to add
// in manifest.
// If the cluster UUID loaded from local is unknown (_na_) then fetch the best state from remote
// If there is no valid state on remote, continue with initial empty state
// If there is a valid state, then restore index metadata using this state
if (ClusterState.UNKNOWN_UUID.equals(clusterState.metadata().clusterUUID())) {
String previousClusterUUID = remoteClusterStateService.getValidPreviousClusterUUID(clusterState.getClusterName().value());
if (!ClusterState.UNKNOWN_UUID.equals(previousClusterUUID)) {
// Load state from remote
final RemoteRestoreResult remoteRestoreResult = remoteStoreRestoreService.restore(clusterState, previousClusterUUID, false,
new String[]{});
clusterState = remoteRestoreResult.getClusterState();
}
}
remotePersistedState = new RemotePersistedState(remoteClusterStateService);
}
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
} else {
persistedState = new AsyncLucenePersistedState(
settings,
Expand Down Expand Up @@ -651,12 +668,6 @@ public void setCurrentTerm(long currentTerm) {
@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.info("Cluster is not yet ready to publish state to remote store");
lastAcceptedState = clusterState;
return;
}
final ClusterMetadataManifest manifest;
if (shouldWriteFullClusterState(clusterState)) {
manifest = remoteClusterStateService.writeFullMetadata(clusterState);
Expand Down Expand Up @@ -706,13 +717,8 @@ private boolean shouldWriteFullClusterState(ClusterState clusterState) {
@Override
public void markLastAcceptedStateAsCommitted() {
try {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.trace("Cluster is not yet ready to publish state to remote store");
return;
}
assert lastAcceptedState != null : "Last accepted state is not present";
assert lastAcceptedManifest != null : "Last accepted manifest is not present";
final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted(
lastAcceptedState,
lastAcceptedManifest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

package org.opensearch.gateway.remote;

import java.util.Collections;
import java.util.Optional;
import java.util.Set;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.Version;
Expand Down Expand Up @@ -123,12 +126,17 @@ public RemoteClusterStateService(
*/
@Nullable
public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) throws IOException {
//should fetch the previous cluster UUID before writing full cluster state.
//Whenever a new election happens, a new master will be elected and that master might have stale previous UUID
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
logger.error("Local node is not elected cluster manager. Exiting");
return null;
}
ensureRepositorySet();

final Optional<ClusterMetadataManifest> latestManifest = getLatestClusterMetadataManifest(clusterState.getClusterName().value(),
clusterState.getMetadata().clusterUUID());
final String previousClusterUUID = latestManifest.isPresent() ? latestManifest.get().getPreviousClusterUUID() : ClusterState.UNKNOWN_UUID;

final List<ClusterMetadataManifest.UploadedIndexMetadata> allUploadedIndexMetadata = new ArrayList<>();
// todo parallel upload
Expand All @@ -148,7 +156,7 @@ public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) thro
);
allUploadedIndexMetadata.add(uploadedIndexMetadata);
}
final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, false);
final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, previousClusterUUID, false);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
Expand Down Expand Up @@ -231,6 +239,7 @@ public ClusterMetadataManifest writeIncrementalMetadata(
final ClusterMetadataManifest manifest = uploadManifest(
clusterState,
allUploadedIndexMetadata.values().stream().collect(Collectors.toList()),
previousManifest.getPreviousClusterUUID(),
false
);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
Expand Down Expand Up @@ -263,7 +272,7 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat
}
assert clusterState != null : "Last accepted cluster state is not set";
assert previousManifest != null : "Last cluster metadata manifest is not set";
return uploadManifest(clusterState, previousManifest.getIndices(), true);
return uploadManifest(clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), true);
}

public ClusterState getLatestClusterState(String clusterUUID) {
Expand All @@ -278,11 +287,7 @@ public void close() throws IOException {
}
}

// Visible for testing
void ensureRepositorySet() {
if (blobStoreRepository != null) {
return;
}
public void start() {
assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.get(settings);
assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
Expand All @@ -294,6 +299,7 @@ void ensureRepositorySet() {
private ClusterMetadataManifest uploadManifest(
ClusterState clusterState,
List<UploadedIndexMetadata> uploadedIndexMetadata,
String previousClusterUUID,
boolean committed
) throws IOException {
synchronized (this) {
Expand All @@ -306,7 +312,8 @@ private ClusterMetadataManifest uploadManifest(
Version.CURRENT,
nodeId,
committed,
uploadedIndexMetadata
uploadedIndexMetadata,
previousClusterUUID
);
writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName);
return manifest;
Expand Down Expand Up @@ -379,10 +386,13 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) {
*/
public Map<String, IndexMetadata> getLatestIndexMetadata(String clusterName, String clusterUUID) throws IOException {
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
ClusterMetadataManifest clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID())
Optional<ClusterMetadataManifest> clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (!clusterMetadataManifest.isPresent()) {
throw new IllegalStateException("Latest index metadata is not present for the provided clusterUUID");
}
assert Objects.equals(clusterUUID, clusterMetadataManifest.get().getClusterUUID())
: "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch.";
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) {
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.get().getIndices()) {
IndexMetadata indexMetadata = getIndexMetadata(clusterName, clusterUUID, uploadedIndexMetadata);
remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata);
}
Expand Down Expand Up @@ -417,9 +427,97 @@ private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, U
* @param clusterName name of the cluster
* @return ClusterMetadataManifest
*/
public ClusterMetadataManifest getLatestClusterMetadataManifest(String clusterName, String clusterUUID) {
String latestManifestFileName = getLatestManifestFileName(clusterName, clusterUUID);
return fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, latestManifestFileName);
public Optional<ClusterMetadataManifest> getLatestClusterMetadataManifest(String clusterName, String clusterUUID) {
Optional<String> latestManifestFileName = getLatestManifestFileName(clusterName, clusterUUID);
if (latestManifestFileName.isPresent()) {
return Optional.of(fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, latestManifestFileName.get()));
}
return Optional.empty();
}

/**
* Fetch the previous cluster UUIDs from remote state store and return the most recent valid cluster UUID
*
* @param clusterName The cluster name for which previous cluster UUID is to be fetched
* @return Last valid cluster UUID
*/
public String getValidPreviousClusterUUID(String clusterName) {
try {
Set<String> clusterUUIDs = getAllClusterUUIDs(clusterName);
Map<String, ClusterMetadataManifest> latestManifests = getLatestManifestForAllClusterUUIDs(clusterName, clusterUUIDs);
List<String> validChain = createClusterChain(latestManifests);
if (validChain.isEmpty()) {
return ClusterState.UNKNOWN_UUID;
}
return validChain.get(0);
} catch (IOException e) {
logger.error("Exception while fetching previous UUIDs for cluster name: {}", clusterName);
throw new IllegalStateException("Error while fetching previous UUIDs from remote store");
}
}

private Set<String> getAllClusterUUIDs(String clusterName) throws IOException {
Map<String, BlobContainer> clusterUUIDMetadata = clusterUUIDContainer(clusterName).children();
if (clusterUUIDMetadata == null) {
return Collections.emptySet();
}
return Collections.unmodifiableSet(clusterUUIDMetadata.keySet());
}

private Map<String, ClusterMetadataManifest> getLatestManifestForAllClusterUUIDs(String clusterName, Set<String> clusterUUIDs) {
Map<String, ClusterMetadataManifest> manifestsByClusterUUID = new HashMap<>();
for (String clusterUUID : clusterUUIDs) {
try {
Optional<ClusterMetadataManifest> manifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
manifestsByClusterUUID.put(clusterUUID, manifest.get());
} catch (Exception e) {
logger.error("Exception in fetching manifest for clusterUUID: {}", clusterUUID);
// Throwing the exception since we may not be able to construct the clusterUUID chain if some manifest file is missing.
throw new IllegalStateException(" Error while fetching manifest data from remote store");
}
}
return manifestsByClusterUUID;
}

/**
* This method creates a valid cluster UUID chain.
*
* @param manifestsByClusterUUID Map of latest ClusterMetadataManifest for every cluster UUID
* @return List of cluster UUIDs. The first element is the most recent cluster UUID in the chain
*/
private List<String> createClusterChain(final Map<String, ClusterMetadataManifest> manifestsByClusterUUID) {
final Map<String, String> clusterUUIDGraph = manifestsByClusterUUID.values()
.stream()
.collect(Collectors.toMap(ClusterMetadataManifest::getClusterUUID, ClusterMetadataManifest::getPreviousClusterUUID));
final List<String> validClusterUUIDs = manifestsByClusterUUID.values()
.stream()
.filter(m -> !isInvalidClusterUUID(m) && !clusterUUIDGraph.containsValue(m.getClusterUUID()))
.map(ClusterMetadataManifest::getClusterUUID)
.collect(Collectors.toList());
if (validClusterUUIDs.isEmpty()) {
logger.info("There is no valid previous cluster UUID");
return Collections.emptyList();
}
if (validClusterUUIDs.size() > 1) {
logger.error(
"There are more than 1 valid top level previous cluster UUIDs which are part of different chains. Cluster UUIDs: {} . "
+ "Cleanup the invalid cluster UUID data",
validClusterUUIDs
);
throw new IllegalStateException("More than 1 valid top level cluster UUIDs present in remote store");
}
final List<String> validChain = new ArrayList<>();
String currentUUID = validClusterUUIDs.get(0);
while (!ClusterState.UNKNOWN_UUID.equals(currentUUID)) {
validChain.add(currentUUID);
// Getting the previous cluster UUID of a cluster UUID from the clusterUUID Graph
currentUUID = clusterUUIDGraph.get(currentUUID);
}
return validChain;
}

private boolean isInvalidClusterUUID(ClusterMetadataManifest manifest) {
return !manifest.isCommitted() && manifest.getIndices().isEmpty();
}

/**
Expand All @@ -428,7 +526,7 @@ public ClusterMetadataManifest getLatestClusterMetadataManifest(String clusterNa
* @param clusterName name of the cluster
* @return latest ClusterMetadataManifest filename
*/
private String getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException {
private Optional<String> getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException {
try {
/**
* {@link BlobContainer#listBlobsByPrefixInSortedOrder} will get the latest manifest file
Expand All @@ -441,13 +539,13 @@ private String getLatestManifestFileName(String clusterName, String clusterUUID)
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
);
if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) {
return manifestFilesMetadata.get(0).name();
return Optional.of(manifestFilesMetadata.get(0).name());
}
} catch (IOException e) {
throw new IllegalStateException("Error while fetching latest manifest file for remote cluster state", e);
}

throw new IllegalStateException(String.format(Locale.ROOT, "Remote Cluster State not found - %s", clusterUUID));
logger.info("No manifest file present in remote store for cluster name: {}, cluster UUID: {}", clusterName, clusterUUID);
return Optional.empty();
}

/**
Expand Down
5 changes: 4 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1324,6 +1324,8 @@ public Node start() throws NodeValidationException {
injector.getInstance(PeerRecoverySourceService.class).start();
injector.getInstance(SegmentReplicationSourceService.class).start();

final RemoteClusterStateService remoteClusterStateService = injector.getInstance(RemoteClusterStateService.class);
remoteClusterStateService.start();
// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(
Expand All @@ -1335,7 +1337,8 @@ public Node start() throws NodeValidationException {
injector.getInstance(MetadataUpgrader.class),
injector.getInstance(PersistedClusterStateService.class),
injector.getInstance(RemoteClusterStateService.class),
injector.getInstance(PersistedStateRegistry.class)
injector.getInstance(PersistedStateRegistry.class),
injector.getInstance(RemoteStoreRestoreService.class)
);
if (Assertions.ENABLED) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,7 +473,8 @@ public void testDataOnlyNodePersistence() throws Exception {
null,
persistedClusterStateService,
remoteClusterStateServiceSupplier.get(),
new PersistedStateRegistry()
new PersistedStateRegistry(),
null
);
final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
assertThat(persistedState, instanceOf(GatewayMetaState.AsyncLucenePersistedState.class));
Expand Down
Loading

0 comments on commit 4bc570b

Please sign in to comment.