Skip to content

Commit

Permalink
Restore cluster metadata during bootstrap
Browse files Browse the repository at this point in the history
Signed-off-by: Sooraj Sinha <soosinha@amazon.com>
  • Loading branch information
soosinha committed Sep 7, 2023
1 parent 3f18e0c commit d7052d5
Show file tree
Hide file tree
Showing 6 changed files with 380 additions and 97 deletions.
42 changes: 26 additions & 16 deletions server/src/main/java/org/opensearch/gateway/GatewayMetaState.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
import org.opensearch.env.NodeMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.index.recovery.RemoteStoreRestoreService;
import org.opensearch.index.recovery.RemoteStoreRestoreService.RemoteRestoreResult;
import org.opensearch.node.Node;
import org.opensearch.plugins.MetadataUpgrader;
import org.opensearch.repositories.RepositoryMissingException;
Expand Down Expand Up @@ -126,7 +128,8 @@ public void start(
MetadataUpgrader metadataUpgrader,
PersistedClusterStateService persistedClusterStateService,
RemoteClusterStateService remoteClusterStateService,
PersistedStateRegistry persistedStateRegistry
PersistedStateRegistry persistedStateRegistry,
RemoteStoreRestoreService remoteStoreRestoreService
) {
assert this.persistedStateRegistry == null : "Persisted state registry should only be set once";
this.persistedStateRegistry = persistedStateRegistry;
Expand Down Expand Up @@ -154,7 +157,7 @@ public void start(
PersistedState remotePersistedState = null;
boolean success = false;
try {
final ClusterState clusterState = prepareInitialClusterState(
ClusterState clusterState = prepareInitialClusterState(
transportService,
clusterService,
ClusterState.builder(ClusterName.CLUSTER_NAME_SETTING.get(settings))
Expand All @@ -164,10 +167,28 @@ public void start(
);

if (DiscoveryNode.isClusterManagerNode(settings)) {
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
if (REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true) {
// If the cluster UUID loaded from local is unknown (_na_) then fetch the best state from remote
// If there is no valid state on remote, continue with initial empty state
// If there is a valid state, then restore index metadata using this state
if (ClusterState.UNKNOWN_UUID.equals(clusterState.metadata().clusterUUID())) {
String previousClusterUUID = remoteClusterStateService.getLatestClusterUUID(
clusterState.getClusterName().value()
);
if (!ClusterState.UNKNOWN_UUID.equals(previousClusterUUID)) {
// Load state from remote
final RemoteRestoreResult remoteRestoreResult = remoteStoreRestoreService.restore(
clusterState,
previousClusterUUID,
false,
new String[] {}
);
clusterState = remoteRestoreResult.getClusterState();
}
}
remotePersistedState = new RemotePersistedState(remoteClusterStateService);
}
persistedState = new LucenePersistedState(persistedClusterStateService, currentTerm, clusterState);
} else {
persistedState = new AsyncLucenePersistedState(
settings,
Expand Down Expand Up @@ -651,12 +672,6 @@ public void setCurrentTerm(long currentTerm) {
@Override
public void setLastAcceptedState(ClusterState clusterState) {
try {
if (lastAcceptedState == null || lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.info("Cluster is not yet ready to publish state to remote store");
lastAcceptedState = clusterState;
return;
}
final ClusterMetadataManifest manifest;
if (shouldWriteFullClusterState(clusterState)) {
manifest = remoteClusterStateService.writeFullMetadata(clusterState);
Expand Down Expand Up @@ -706,13 +721,8 @@ private boolean shouldWriteFullClusterState(ClusterState clusterState) {
@Override
public void markLastAcceptedStateAsCommitted() {
try {
if (lastAcceptedState == null
|| lastAcceptedManifest == null
|| lastAcceptedState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
// On the initial bootstrap, repository will not be available. So we do not persist the cluster state and bail out.
logger.trace("Cluster is not yet ready to publish state to remote store");
return;
}
assert lastAcceptedState != null : "Last accepted state is not present";
assert lastAcceptedManifest != null : "Last accepted manifest is not present";
final ClusterMetadataManifest committedManifest = remoteClusterStateService.markLastStateAsCommitted(
lastAcceptedState,
lastAcceptedManifest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
Expand Down Expand Up @@ -132,19 +134,25 @@ public RemoteClusterStateService(
*/
@Nullable
public ClusterMetadataManifest writeFullMetadata(ClusterState clusterState) throws IOException {
// should fetch the previous cluster UUID before writing full cluster state.
// Whenever a new election happens, a new master will be elected and that master might have stale previous UUID
final long startTimeNanos = relativeTimeNanosSupplier.getAsLong();
if (clusterState.nodes().isLocalNodeElectedClusterManager() == false) {
logger.error("Local node is not elected cluster manager. Exiting");
return null;
}
ensureRepositorySet();

final String previousClusterUUID = fetchPreviousClusterUUID(
clusterState.getClusterName().value(),
clusterState.metadata().clusterUUID()
);

// any validations before/after upload ?
final List<UploadedIndexMetadata> allUploadedIndexMetadata = writeIndexMetadataParallel(
clusterState,
new ArrayList<>(clusterState.metadata().indices().values())
);
final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, false);
final ClusterMetadataManifest manifest = uploadManifest(clusterState, allUploadedIndexMetadata, previousClusterUUID, false);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
Expand Down Expand Up @@ -221,7 +229,12 @@ public ClusterMetadataManifest writeIncrementalMetadata(
for (String removedIndexName : previousStateIndexMetadataVersionByName.keySet()) {
allUploadedIndexMetadata.remove(removedIndexName);
}
final ClusterMetadataManifest manifest = uploadManifest(clusterState, new ArrayList<>(allUploadedIndexMetadata.values()), false);
final ClusterMetadataManifest manifest = uploadManifest(
clusterState,
new ArrayList<>(allUploadedIndexMetadata.values()),
previousManifest.getPreviousClusterUUID(),
false
);
final long durationMillis = TimeValue.nsecToMSec(relativeTimeNanosSupplier.getAsLong() - startTimeNanos);
if (durationMillis >= slowWriteLoggingThreshold.getMillis()) {
logger.warn(
Expand Down Expand Up @@ -364,7 +377,7 @@ public ClusterMetadataManifest markLastStateAsCommitted(ClusterState clusterStat
}
assert clusterState != null : "Last accepted cluster state is not set";
assert previousManifest != null : "Last cluster metadata manifest is not set";
return uploadManifest(clusterState, previousManifest.getIndices(), true);
return uploadManifest(clusterState, previousManifest.getIndices(), previousManifest.getPreviousClusterUUID(), true);
}

public ClusterState getLatestClusterState(String clusterUUID) {
Expand All @@ -379,11 +392,7 @@ public void close() throws IOException {
}
}

// Visible for testing
void ensureRepositorySet() {
if (blobStoreRepository != null) {
return;
}
public void start() {
assert REMOTE_CLUSTER_STATE_ENABLED_SETTING.get(settings) == true : "Remote cluster state is not enabled";
final String remoteStoreRepo = REMOTE_CLUSTER_STATE_REPOSITORY_SETTING.get(settings);
assert remoteStoreRepo != null : "Remote Cluster State repository is not configured";
Expand All @@ -395,6 +404,7 @@ void ensureRepositorySet() {
private ClusterMetadataManifest uploadManifest(
ClusterState clusterState,
List<UploadedIndexMetadata> uploadedIndexMetadata,
String previousClusterUUID,
boolean committed
) throws IOException {
synchronized (this) {
Expand All @@ -407,7 +417,8 @@ private ClusterMetadataManifest uploadManifest(
Version.CURRENT,
nodeId,
committed,
uploadedIndexMetadata
uploadedIndexMetadata,
previousClusterUUID
);
writeMetadataManifest(clusterState.getClusterName().value(), clusterState.metadata().clusterUUID(), manifest, manifestFileName);
return manifest;
Expand All @@ -420,6 +431,11 @@ private void writeMetadataManifest(String clusterName, String clusterUUID, Clust
CLUSTER_METADATA_MANIFEST_FORMAT.write(uploadManifest, metadataManifestContainer, fileName, blobStoreRepository.getCompressor());
}

private String fetchPreviousClusterUUID(String clusterName, String clusterUUID) {
final Optional<ClusterMetadataManifest> latestManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
return latestManifest.isPresent() ? latestManifest.get().getPreviousClusterUUID() : ClusterState.UNKNOWN_UUID;
}

private BlobContainer indexMetadataContainer(String clusterName, String clusterUUID, String indexUUID) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/index/ftqsCnn9TgOX
return blobStoreRepository.blobStore()
Expand Down Expand Up @@ -467,12 +483,15 @@ private static String indexMetadataFileName(IndexMetadata indexMetadata) {
* @return {@code Map<String, IndexMetadata>} latest IndexUUID to IndexMetadata map
*/
public Map<String, IndexMetadata> getLatestIndexMetadata(String clusterName, String clusterUUID) throws IOException {
ensureRepositorySet();
start();
Map<String, IndexMetadata> remoteIndexMetadata = new HashMap<>();
ClusterMetadataManifest clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
assert Objects.equals(clusterUUID, clusterMetadataManifest.getClusterUUID())
Optional<ClusterMetadataManifest> clusterMetadataManifest = getLatestClusterMetadataManifest(clusterName, clusterUUID);
if (!clusterMetadataManifest.isPresent()) {
throw new IllegalStateException("Latest index metadata is not present for the provided clusterUUID");
}
assert Objects.equals(clusterUUID, clusterMetadataManifest.get().getClusterUUID())
: "Corrupt ClusterMetadataManifest found. Cluster UUID mismatch.";
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.getIndices()) {
for (UploadedIndexMetadata uploadedIndexMetadata : clusterMetadataManifest.get().getIndices()) {
IndexMetadata indexMetadata = getIndexMetadata(clusterName, clusterUUID, uploadedIndexMetadata);
remoteIndexMetadata.put(uploadedIndexMetadata.getIndexUUID(), indexMetadata);
}
Expand Down Expand Up @@ -508,9 +527,12 @@ private IndexMetadata getIndexMetadata(String clusterName, String clusterUUID, U
* @param clusterName name of the cluster
* @return ClusterMetadataManifest
*/
public ClusterMetadataManifest getLatestClusterMetadataManifest(String clusterName, String clusterUUID) {
String latestManifestFileName = getLatestManifestFileName(clusterName, clusterUUID);
return fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, latestManifestFileName);
public Optional<ClusterMetadataManifest> getLatestClusterMetadataManifest(String clusterName, String clusterUUID) {
Optional<String> latestManifestFileName = getLatestManifestFileName(clusterName, clusterUUID);
if (latestManifestFileName.isPresent()) {
return Optional.of(fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, latestManifestFileName.get()));
}
return Optional.empty();
}

/**
Expand All @@ -519,7 +541,7 @@ public ClusterMetadataManifest getLatestClusterMetadataManifest(String clusterNa
* @param clusterName name of the cluster
* @return latest ClusterMetadataManifest filename
*/
private String getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException {
private Optional<String> getLatestManifestFileName(String clusterName, String clusterUUID) throws IllegalStateException {
try {
/**
* {@link BlobContainer#listBlobsByPrefixInSortedOrder} will get the latest manifest file
Expand All @@ -532,13 +554,13 @@ private String getLatestManifestFileName(String clusterName, String clusterUUID)
BlobContainer.BlobNameSortOrder.LEXICOGRAPHIC
);
if (manifestFilesMetadata != null && !manifestFilesMetadata.isEmpty()) {
return manifestFilesMetadata.get(0).name();
return Optional.of(manifestFilesMetadata.get(0).name());
}
} catch (IOException e) {
throw new IllegalStateException("Error while fetching latest manifest file for remote cluster state", e);
}

throw new IllegalStateException(String.format(Locale.ROOT, "Remote Cluster State not found - %s", clusterUUID));
logger.info("No manifest file present in remote store for cluster name: {}, cluster UUID: {}", clusterName, clusterUUID);
return Optional.empty();
}

/**
Expand Down
5 changes: 4 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1334,6 +1334,8 @@ public Node start() throws NodeValidationException {
injector.getInstance(PeerRecoverySourceService.class).start();
injector.getInstance(SegmentReplicationSourceService.class).start();

final RemoteClusterStateService remoteClusterStateService = injector.getInstance(RemoteClusterStateService.class);
remoteClusterStateService.start();
// Load (and maybe upgrade) the metadata stored on disk
final GatewayMetaState gatewayMetaState = injector.getInstance(GatewayMetaState.class);
gatewayMetaState.start(
Expand All @@ -1345,7 +1347,8 @@ public Node start() throws NodeValidationException {
injector.getInstance(MetadataUpgrader.class),
injector.getInstance(PersistedClusterStateService.class),
injector.getInstance(RemoteClusterStateService.class),
injector.getInstance(PersistedStateRegistry.class)
injector.getInstance(PersistedStateRegistry.class),
injector.getInstance(RemoteStoreRestoreService.class)
);
if (Assertions.ENABLED) {
try {
Expand Down
Loading

0 comments on commit d7052d5

Please sign in to comment.