Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Remote Object managers and use them in orchestration from RemoteClusterStateService #13924

Merged
merged 8 commits into from
Jun 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateService.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateService.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.SETTING_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateService.TEMPLATES_METADATA;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.METADATA_FILE_PREFIX;
import static org.opensearch.gateway.remote.model.RemoteCoordinationMetadata.COORDINATION_METADATA;
import static org.opensearch.gateway.remote.model.RemoteCustomMetadata.CUSTOM_METADATA;
import static org.opensearch.gateway.remote.model.RemotePersistentSettingsMetadata.SETTING_METADATA;
import static org.opensearch.gateway.remote.model.RemoteTemplatesMetadata.TEMPLATES_METADATA;

@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
public class RemoteClusterStateServiceIT extends RemoteStoreBaseIntegTestCase {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import static org.opensearch.cluster.metadata.Metadata.CLUSTER_READ_ONLY_BLOCK;
import static org.opensearch.cluster.metadata.Metadata.SETTING_READ_ONLY_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING;
import static org.opensearch.gateway.remote.RemoteClusterStateUtils.encodeString;
import static org.opensearch.indices.ShardLimitValidator.SETTING_CLUSTER_MAX_SHARDS_PER_NODE;
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand Down Expand Up @@ -326,9 +327,7 @@ public void testFullClusterRestoreManifestFilePointsToInvalidIndexMetadataPathTh
// Step - 3 Delete index metadata file in remote
try {
Files.move(
segmentRepoPath.resolve(
RemoteClusterStateService.encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"
),
segmentRepoPath.resolve(encodeString(clusterName) + "/cluster-state/" + prevClusterUUID + "/index"),
segmentRepoPath.resolve("cluster-state/")
);
} catch (IOException e) {
Expand All @@ -354,10 +353,7 @@ public void testRemoteStateFullRestart() throws Exception {
try {
Files.move(
segmentRepoPath.resolve(
RemoteClusterStateService.encodeString(clusterService().state().getClusterName().value())
+ "/cluster-state/"
+ prevClusterUUID
+ "/manifest"
encodeString(clusterService().state().getClusterName().value()) + "/cluster-state/" + prevClusterUUID + "/manifest"
),
segmentRepoPath.resolve("cluster-state/")
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.bytes.BytesReference;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.RemoteStateTransferException;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.index.remote.RemoteStoreEnums;
import org.opensearch.index.remote.RemoteStorePathStrategy;
Expand All @@ -52,6 +52,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.gateway.remote.RemoteClusterStateUtils.DELIMITER;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
Expand Down Expand Up @@ -87,7 +88,6 @@ public class InternalRemoteRoutingTableService extends AbstractLifecycleComponen

public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String DELIMITER = "__";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";

private static final Logger logger = LogManager.getLogger(InternalRemoteRoutingTableService.class);
Expand Down Expand Up @@ -175,10 +175,7 @@ public CheckedRunnable<IOException> getIndexRoutingAsyncAction(
)
),
ex -> latchedActionListener.onFailure(
new RemoteClusterStateService.RemoteStateTransferException(
"Exception in writing index to remote store: " + indexRouting.getIndex().toString(),
ex
)
new RemoteStateTransferException("Exception in writing index to remote store: " + indexRouting.getIndex().toString(), ex)
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,10 @@
import java.util.Set;
import java.util.function.Predicate;

import static org.opensearch.gateway.remote.RemoteGlobalMetadataManager.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING;
import static org.opensearch.gateway.remote.RemoteIndexMetadataManager.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING;
import static org.opensearch.gateway.remote.RemoteManifestManager.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING;

/**
* Encapsulates all valid cluster level settings.
*
Expand Down Expand Up @@ -717,9 +721,9 @@ public void apply(Settings value, Settings current, Settings previous) {
// Remote cluster state settings
RemoteClusterStateCleanupManager.REMOTE_CLUSTER_STATE_CLEANUP_INTERVAL_SETTING,
RemoteClusterStateService.REMOTE_CLUSTER_STATE_ENABLED_SETTING,
RemoteClusterStateService.INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
RemoteClusterStateService.METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
INDEX_METADATA_UPLOAD_TIMEOUT_SETTING,
GLOBAL_METADATA_UPLOAD_TIMEOUT_SETTING,
METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING,
RemoteStoreNodeService.REMOTE_STORE_COMPATIBILITY_MODE_SETTING,
RemoteStoreNodeService.MIGRATION_DIRECTION_SETTING,
IndicesService.CLUSTER_REMOTE_INDEX_RESTRICT_ASYNC_DURABILITY_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,8 +1158,7 @@ public String getComponent() {
}

public String getUploadedFilename() {
String[] splitPath = uploadedFilename.split("/");
return splitPath[splitPath.length - 1];
return uploadedFilename;
}

public String getIndexName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,7 @@
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_FORMAT;
import static org.opensearch.gateway.remote.RemoteClusterStateService.GLOBAL_METADATA_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_METADATA_FORMAT;
import static org.opensearch.gateway.remote.RemoteClusterStateService.INDEX_PATH_TOKEN;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_FILE_PREFIX;
import static org.opensearch.gateway.remote.RemoteClusterStateService.MANIFEST_PATH_TOKEN;
import static org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest.MANIFEST;

/**
* A Manager which provides APIs to clean up stale cluster state files and runs an async stale cleanup task
Expand Down Expand Up @@ -150,12 +145,7 @@ void cleanUpStaleFiles() {

private void addStaleGlobalMetadataPath(String fileName, Set<String> filesToKeep, Set<String> staleGlobalMetadataPaths) {
if (!filesToKeep.contains(fileName)) {
String[] splitPath = fileName.split("/");
staleGlobalMetadataPaths.add(
new BlobPath().add(GLOBAL_METADATA_PATH_TOKEN).buildAsString() + GLOBAL_METADATA_FORMAT.blobName(
splitPath[splitPath.length - 1]
)
);
staleGlobalMetadataPaths.add(fileName);
}
}

Expand All @@ -172,15 +162,24 @@ void deleteClusterMetadata(
Set<String> staleIndexMetadataPaths = new HashSet<>();
Set<String> staleGlobalMetadataPaths = new HashSet<>();
activeManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
blobMetadata.name()
);
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.getRemoteManifestManager()
shiv0408 marked this conversation as resolved.
Show resolved Hide resolved
.fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, blobMetadata.name());
clusterMetadataManifest.getIndices()
.forEach(uploadedIndexMetadata -> filesToKeep.add(uploadedIndexMetadata.getUploadedFilename()));
.forEach(
uploadedIndexMetadata -> filesToKeep.add(
RemoteClusterStateUtils.getFormattedFileName(
uploadedIndexMetadata.getUploadedFilename(),
clusterMetadataManifest.getCodecVersion()
)
)
);
if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
filesToKeep.add(clusterMetadataManifest.getGlobalMetadataFileName());
filesToKeep.add(
RemoteClusterStateUtils.getFormattedFileName(
clusterMetadataManifest.getGlobalMetadataFileName(),
clusterMetadataManifest.getCodecVersion()
)
);
} else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
filesToKeep.add(clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename());
filesToKeep.add(clusterMetadataManifest.getSettingsMetadata().getUploadedFilename());
Expand All @@ -191,14 +190,21 @@ void deleteClusterMetadata(
}
});
staleManifestBlobMetadata.forEach(blobMetadata -> {
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.fetchRemoteClusterMetadataManifest(
clusterName,
clusterUUID,
blobMetadata.name()
ClusterMetadataManifest clusterMetadataManifest = remoteClusterStateService.getRemoteManifestManager()
.fetchRemoteClusterMetadataManifest(clusterName, clusterUUID, blobMetadata.name());
staleManifestPaths.add(
remoteClusterStateService.getRemoteManifestManager().getManifestFolderPath(clusterName, clusterUUID).buildAsString()
+ blobMetadata.name()
);
staleManifestPaths.add(new BlobPath().add(MANIFEST_PATH_TOKEN).buildAsString() + blobMetadata.name());
if (clusterMetadataManifest.getCodecVersion() == ClusterMetadataManifest.CODEC_V1) {
addStaleGlobalMetadataPath(clusterMetadataManifest.getGlobalMetadataFileName(), filesToKeep, staleGlobalMetadataPaths);
addStaleGlobalMetadataPath(
RemoteClusterStateUtils.getFormattedFileName(
clusterMetadataManifest.getGlobalMetadataFileName(),
clusterMetadataManifest.getCodecVersion()
),
filesToKeep,
staleGlobalMetadataPaths
);
} else if (clusterMetadataManifest.getCodecVersion() >= ClusterMetadataManifest.CODEC_V2) {
addStaleGlobalMetadataPath(
clusterMetadataManifest.getCoordinationMetadata().getUploadedFilename(),
Expand All @@ -223,11 +229,12 @@ void deleteClusterMetadata(
}

clusterMetadataManifest.getIndices().forEach(uploadedIndexMetadata -> {
if (filesToKeep.contains(uploadedIndexMetadata.getUploadedFilename()) == false) {
staleIndexMetadataPaths.add(
new BlobPath().add(INDEX_PATH_TOKEN).add(uploadedIndexMetadata.getIndexUUID()).buildAsString()
+ INDEX_METADATA_FORMAT.blobName(uploadedIndexMetadata.getUploadedFilename())
);
String fileName = RemoteClusterStateUtils.getFormattedFileName(
uploadedIndexMetadata.getUploadedFilename(),
clusterMetadataManifest.getCodecVersion()
);
if (filesToKeep.contains(fileName) == false) {
staleIndexMetadataPaths.add(fileName);
}
});
});
Expand All @@ -237,9 +244,9 @@ void deleteClusterMetadata(
return;
}

deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(clusterName, clusterUUID, new ArrayList<>(staleManifestPaths));
deleteStalePaths(new ArrayList<>(staleGlobalMetadataPaths));
deleteStalePaths(new ArrayList<>(staleIndexMetadataPaths));
deleteStalePaths(new ArrayList<>(staleManifestPaths));
} catch (IllegalStateException e) {
logger.error("Error while fetching Remote Cluster Metadata manifests", e);
} catch (IOException e) {
Expand Down Expand Up @@ -267,8 +274,8 @@ void deleteStaleClusterMetadata(String clusterName, String clusterUUID, int mani
try {
getBlobStoreTransferService().listAllInSortedOrderAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteClusterStateService.getManifestFolderPath(clusterName, clusterUUID),
MANIFEST_FILE_PREFIX,
remoteClusterStateService.getRemoteManifestManager().getManifestFolderPath(clusterName, clusterUUID),
MANIFEST,
Integer.MAX_VALUE,
new ActionListener<>() {
@Override
Expand Down Expand Up @@ -312,7 +319,11 @@ void deleteStaleUUIDsClusterMetadata(String clusterName, List<String> clusterUUI
clusterUUIDs.forEach(
clusterUUID -> getBlobStoreTransferService().deleteAsync(
ThreadPool.Names.REMOTE_PURGE,
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
RemoteClusterStateUtils.getClusterMetadataBasePath(
remoteClusterStateService.getBlobStoreRepository(),
clusterName,
clusterUUID
),
new ActionListener<>() {
@Override
public void onResponse(Void unused) {
Expand All @@ -336,12 +347,9 @@ public void onFailure(Exception e) {
}

// package private for testing
void deleteStalePaths(String clusterName, String clusterUUID, List<String> stalePaths) throws IOException {
void deleteStalePaths(List<String> stalePaths) throws IOException {
logger.debug(String.format(Locale.ROOT, "Deleting stale files from remote - %s", stalePaths));
getBlobStoreTransferService().deleteBlobs(
remoteClusterStateService.getCusterMetadataBasePath(clusterName, clusterUUID),
stalePaths
);
getBlobStoreTransferService().deleteBlobs(BlobPath.cleanPath(), stalePaths);
}

/**
Expand Down
Loading
Loading