Skip to content

Commit

Permalink
Download cluster state from remote
Browse files Browse the repository at this point in the history
  • Loading branch information
soosinha committed May 14, 2024
1 parent a10b062 commit acdb97e
Show file tree
Hide file tree
Showing 14 changed files with 233 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import org.opensearch.discovery.PeerFinder;
import org.opensearch.discovery.SeedHostsProvider;
import org.opensearch.discovery.SeedHostsResolver;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.monitor.StatusInfo;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
Expand Down Expand Up @@ -207,7 +208,8 @@ public Coordinator(
ElectionStrategy electionStrategy,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
RemoteClusterStateService remoteClusterStateService
) {
this.settings = settings;
this.transportService = transportService;
Expand Down Expand Up @@ -259,7 +261,8 @@ public Coordinator(
transportService,
namedWriteableRegistry,
this::handlePublishRequest,
this::handleApplyCommit
this::handleApplyCommit,
remoteClusterStateService
);
this.leaderChecker = new LeaderChecker(settings, clusterSettings, transportService, this::onLeaderFailure, nodeHealthService);
this.followersChecker = new FollowersChecker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

package org.opensearch.cluster.coordination;

import java.util.Optional;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
Expand All @@ -47,6 +48,8 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.BytesTransportRequest;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -97,16 +100,19 @@ public class PublicationTransportHandler {
private final TransportRequestOptions stateRequestOptions = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE)
.build();
private final RemoteClusterStateService remoteClusterStateService;

public PublicationTransportHandler(
TransportService transportService,
NamedWriteableRegistry namedWriteableRegistry,
Function<PublishRequest, PublishWithJoinResponse> handlePublishRequest,
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit
BiConsumer<ApplyCommitRequest, ActionListener<Void>> handleApplyCommit,
RemoteClusterStateService remoteClusterStateService
) {
this.transportService = transportService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.handlePublishRequest = handlePublishRequest;
this.remoteClusterStateService = remoteClusterStateService;

transportService.registerRequestHandler(
PUBLISH_STATE_ACTION_NAME,
Expand Down Expand Up @@ -211,6 +217,40 @@ private PublishWithJoinResponse handleIncomingPublishRequest(BytesTransportReque
}
}

private PublishWithJoinResponse handleIncomingRemotePublishRequest(RemotePublishRequest request) throws IOException {
final Optional<ClusterMetadataManifest> manifestOptional = remoteClusterStateService.getClusterMetadataManifestByTermVersion(request.getClusterName(), request.getClusterUUID(), request.term, request.version);
if (manifestOptional.isPresent() == false) {
// todo change exception
throw new IncompatibleClusterStateVersionException("No remote state for term version");
}
ClusterMetadataManifest manifest = manifestOptional.get();
boolean applyFullState = false;
final ClusterState lastSeen = lastSeenClusterState.get();
if (lastSeen == null) {
logger.debug("Diff cannot be applied as there is not last cluster state");
applyFullState = true;
} else if (manifest.getDiffManifest() == null) {
logger.debug("There is no diff in the manifest");
applyFullState = true;
} else if (manifest.getDiffManifest().getFromStateUUID().equals(lastSeen.stateUUID()) == false) {
logger.debug("Last cluster state not compatible with the diff");
applyFullState = true;
} else {
ClusterState clusterState = remoteClusterStateService.getClusterStateUsingDiff(request.getClusterName(), manifest, lastSeenClusterState.get());
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.compareAndSet(lastSeen, clusterState);
return response;
}

if (applyFullState == true) {
ClusterState clusterState = remoteClusterStateService.getClusterStateForManifest(request.getClusterName(), manifest);
logger.debug("Downloaded full cluster state version [{}]", clusterState.version());
final PublishWithJoinResponse response = acceptState(clusterState);
lastSeenClusterState.set(clusterState);
return response;
}
}

private PublishWithJoinResponse acceptState(ClusterState incomingState) {
// if the state is coming from the current node, use original request instead (see currentPublishRequestToSelf for explanation)
if (transportService.getLocalNode().equals(incomingState.nodes().getClusterManagerNode())) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.coordination;

import java.io.IOException;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

public class RemotePublishRequest extends TermVersionRequest {

// todo Do we need cluster name and UUID ?
private final String clusterName;
private final String clusterUUID;

public RemotePublishRequest(DiscoveryNode sourceNode, long term, long version, String clusterName, String clusterUUID) {
super(sourceNode, term, version);
this.clusterName = clusterName;
this.clusterUUID = clusterUUID;
}

public RemotePublishRequest(StreamInput in) throws IOException {
super(in);
this.clusterName = in.readString();
this.clusterUUID = in.readString();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(clusterName);
out.writeString(clusterUUID);
}

public String getClusterName() {
return clusterName;
}

public String getClusterUUID() {
return clusterUUID;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.opensearch.core.common.io.stream.NamedWriteableRegistry;
import org.opensearch.core.common.transport.TransportAddress;
import org.opensearch.gateway.GatewayMetaState;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.monitor.NodeHealthService;
import org.opensearch.node.remotestore.RemoteStoreNodeService;
import org.opensearch.plugins.DiscoveryPlugin;
Expand Down Expand Up @@ -133,7 +134,8 @@ public DiscoveryModule(
RerouteService rerouteService,
NodeHealthService nodeHealthService,
PersistedStateRegistry persistedStateRegistry,
RemoteStoreNodeService remoteStoreNodeService
RemoteStoreNodeService remoteStoreNodeService,
RemoteClusterStateService remoteClusterStateService
) {
final Collection<BiConsumer<DiscoveryNode, ClusterState>> joinValidators = new ArrayList<>();
final Map<String, Supplier<SeedHostsProvider>> hostProviders = new HashMap<>();
Expand Down Expand Up @@ -211,7 +213,8 @@ public DiscoveryModule(
electionStrategy,
nodeHealthService,
persistedStateRegistry,
remoteStoreNodeService
remoteStoreNodeService,
remoteClusterStateService
);
} else {
throw new IllegalArgumentException("Unknown discovery type [" + discoveryType + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1081,5 +1081,45 @@ public List<String> findUpdatedIndices(Map<String, IndexMetadata> indices, Map<S
}
return updatedIndices;
}

public String getFromStateUUID() {
return fromStateUUID;
}

public String getToStateUUID() {
return toStateUUID;
}

public boolean isCoordinationMetadataUpdated() {
return coordinationMetadataUpdated;
}

public boolean isSettingsMetadataUpdated() {
return settingsMetadataUpdated;
}

public boolean isTemplatesMetadataUpdated() {
return templatesMetadataUpdated;
}

public Map<String, Boolean> getCustomMetadataUpdated() {
return customMetadataUpdated;
}

public List<String> getIndicesUpdated() {
return indicesUpdated;
}

public List<String> getIndicesDeleted() {
return indicesDeleted;
}

public boolean isClusterBlocksUpdated() {
return clusterBlocksUpdated;
}

public boolean isDiscoveryNodesUpdated() {
return discoveryNodesUpdated;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.TemplatesMetadata;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Nullable;
Expand All @@ -29,6 +31,7 @@
import org.opensearch.common.util.concurrent.AbstractAsyncTask;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.gateway.remote.ClusterMetadataManifest.ClusterDiffManifest;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
Expand Down Expand Up @@ -664,6 +667,10 @@ public Optional<ClusterMetadataManifest> getLatestClusterMetadataManifest(String
return remoteManifestManager.getLatestClusterMetadataManifest(clusterName, clusterUUID);
}

public Optional<ClusterMetadataManifest> getClusterMetadataManifestByTermVersion(String clusterName, String clusterUUID, long term, long version) {
return remoteManifestManager.getClusterMetadataManifestByTermVersion(clusterName, clusterUUID, term, version);
}

@Override
public void close() throws IOException {
if (staleFileDeletionTask != null) {
Expand Down Expand Up @@ -728,25 +735,65 @@ public ClusterState getLatestClusterState(String clusterName, String clusterUUID
);
}

return getClusterStateForManifest(clusterName, clusterMetadataManifest.get());
}

public ClusterState getClusterStateForManifest(String clusterName, ClusterMetadataManifest manifest) {
// todo make this async
// Fetch Global Metadata
Metadata globalMetadata = remoteGlobalMetadataManager.getGlobalMetadata(clusterName, clusterUUID, clusterMetadataManifest.get());
Metadata globalMetadata = remoteGlobalMetadataManager.getGlobalMetadata(clusterName, manifest.getClusterUUID(), manifest);

// Fetch Index Metadata
Map<String, IndexMetadata> indices = remoteIndexMetadataManager.getIndexMetadataMap(
clusterName,
clusterUUID,
clusterMetadataManifest.get()
manifest.getClusterUUID(),
manifest
);

Map<String, IndexMetadata> indexMetadataMap = new HashMap<>();
indices.values().forEach(indexMetadata -> { indexMetadataMap.put(indexMetadata.getIndex().getName(), indexMetadata); });

return ClusterState.builder(ClusterState.EMPTY_STATE)
.version(clusterMetadataManifest.get().getStateVersion())
.version(manifest.getStateVersion())
.metadata(Metadata.builder(globalMetadata).indices(indexMetadataMap).build())
.build();
}

public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadataManifest manifest, ClusterState previousState) {
assert manifest.getDiffManifest() != null;
ClusterDiffManifest diff = manifest.getDiffManifest();
ClusterState.Builder clusterStateBuilder = ClusterState.builder(previousState);
Metadata.Builder metadataBuilder = Metadata.builder(previousState.metadata());

for (String index:diff.getIndicesUpdated()) {
//todo optimize below iteration
Optional<UploadedIndexMetadata> uploadedIndexMetadataOptional = manifest.getIndices().stream().filter(idx -> idx.getIndexName().equals(index)).findFirst();
assert uploadedIndexMetadataOptional.isPresent() == true;
IndexMetadata indexMetadata = remoteIndexMetadataManager.getIndexMetadata(clusterName, manifest.getClusterUUID(), uploadedIndexMetadataOptional.get());
metadataBuilder.put(indexMetadata, false);
}
for (String index:diff.getIndicesDeleted()) {
metadataBuilder.remove(index);
}
if (diff.isCoordinationMetadataUpdated()) {
CoordinationMetadata coordinationMetadata = remoteGlobalMetadataManager.getCoordinationMetadata(clusterName, manifest.getClusterUUID(), manifest.getCoordinationMetadata().getUploadedFilename());
metadataBuilder.coordinationMetadata(coordinationMetadata);
}
if (diff.isSettingsMetadataUpdated()) {
Settings settings = remoteGlobalMetadataManager.getSettingsMetadata(clusterName, manifest.getClusterUUID(), manifest.getSettingsMetadata().getUploadedFilename());
metadataBuilder.persistentSettings(settings);
}
if (diff.isTemplatesMetadataUpdated()) {
TemplatesMetadata templatesMetadata = remoteGlobalMetadataManager.getTemplatesMetadata(clusterName, manifest.getClusterUUID(), manifest.getTemplatesMetadata().getUploadedFilename());
metadataBuilder.templates(templatesMetadata);
}
for (String customType : diff.getCustomMetadataUpdated().keySet()) {
Metadata.Custom custom = remoteGlobalMetadataManager.getCustomsMetadata(clusterName, manifest.getClusterUUID(), manifest.getCustomMetadataMap().get(customType).getUploadedFilename(), customType);
metadataBuilder.putCustom(customType, custom);
}
return clusterStateBuilder.metadata(metadataBuilder).build();
}

/**
* Fetch the previous cluster UUIDs from remote state store and return the most recent valid cluster UUID
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ Metadata getGlobalMetadata(String clusterName, String clusterUUID, ClusterMetada
}
}

private CoordinationMetadata getCoordinationMetadata(String clusterName, String clusterUUID, String coordinationMetadataFileName) {
public CoordinationMetadata getCoordinationMetadata(String clusterName, String clusterUUID, String coordinationMetadataFileName) {
try {
// Fetch Coordination metadata
if (coordinationMetadataFileName != null) {
Expand All @@ -200,7 +200,7 @@ private CoordinationMetadata getCoordinationMetadata(String clusterName, String
}
}

private Settings getSettingsMetadata(String clusterName, String clusterUUID, String settingsMetadataFileName) {
public Settings getSettingsMetadata(String clusterName, String clusterUUID, String settingsMetadataFileName) {
try {
// Fetch Settings metadata
if (settingsMetadataFileName != null) {
Expand All @@ -221,7 +221,7 @@ private Settings getSettingsMetadata(String clusterName, String clusterUUID, Str
}
}

private TemplatesMetadata getTemplatesMetadata(String clusterName, String clusterUUID, String templatesMetadataFileName) {
public TemplatesMetadata getTemplatesMetadata(String clusterName, String clusterUUID, String templatesMetadataFileName) {
try {
// Fetch Templates metadata
if (templatesMetadataFileName != null) {
Expand All @@ -242,7 +242,7 @@ private TemplatesMetadata getTemplatesMetadata(String clusterName, String cluste
}
}

private Metadata.Custom getCustomsMetadata(String clusterName, String clusterUUID, String customMetadataFileName, String custom) {
public Metadata.Custom getCustomsMetadata(String clusterName, String clusterUUID, String customMetadataFileName, String custom) {
requireNonNull(customMetadataFileName);
try {
// Fetch Custom metadata
Expand Down
Loading

0 comments on commit acdb97e

Please sign in to comment.