Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Routing Table] Initial commit for RemoteRoutingTableService setup #13304

Merged
merged 19 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
DiscoveryNode dn = remoteDN.orElseGet(() -> (currentNodes.getNodes().values()).stream().findFirst().get());
RepositoriesMetadata repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
dn,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
currentState.getMetadata().custom(RepositoriesMetadata.TYPE),
currentState.getMetadata().settings()
);

assert nodesBuilder.isLocalNodeElectedClusterManager();
Expand Down Expand Up @@ -223,7 +224,8 @@ public ClusterTasksResult<Task> execute(ClusterState currentState, List<Task> jo
logger.info("Updating system repository now for remote store");
repositoriesMetadata = remoteStoreNodeService.updateRepositoriesMetadata(
node,
currentState.getMetadata().custom(RepositoriesMetadata.TYPE)
currentState.getMetadata().custom(RepositoriesMetadata.TYPE),
currentState.getMetadata().settings()
);
}

Expand Down Expand Up @@ -515,7 +517,7 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod

DiscoveryNode existingNode = existingNodes.get(0);
if (joiningNode.isRemoteStoreNode()) {
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode);
ensureRemoteStoreNodesCompatibility(joiningNode, existingNode, metadata.settings());
} else {
if (existingNode.isRemoteStoreNode()) {
throw new IllegalStateException(
Expand All @@ -538,18 +540,20 @@ private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNod
}
if (joiningNode.isRemoteStoreNode()) {
Optional<DiscoveryNode> remoteDN = existingNodes.stream().filter(DiscoveryNode::isRemoteStoreNode).findFirst();
remoteDN.ifPresent(discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode));
remoteDN.ifPresent(
discoveryNode -> ensureRemoteStoreNodesCompatibility(joiningNode, discoveryNode, metadata.settings())
);
}
}
}
}

private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode) {
private static void ensureRemoteStoreNodesCompatibility(DiscoveryNode joiningNode, DiscoveryNode existingNode, Settings settings) {
if (joiningNode.isRemoteStoreNode()) {
if (existingNode.isRemoteStoreNode()) {
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode);
if (existingRemoteStoreNodeAttribute.equals(joiningRemoteStoreNodeAttribute) == false) {
RemoteStoreNodeAttribute joiningRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(joiningNode, settings);
RemoteStoreNodeAttribute existingRemoteStoreNodeAttribute = new RemoteStoreNodeAttribute(existingNode, settings);
if (existingRemoteStoreNodeAttribute.equalsIgnoreOptionalRepo(joiningRemoteStoreNodeAttribute) == false) {
throw new IllegalStateException(
"a remote store node ["
+ joiningNode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;

import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;

Expand Down Expand Up @@ -164,6 +165,34 @@ public boolean equalsIgnoreGenerations(@Nullable RepositoriesMetadata other) {
return true;
}

/**
* Checks if this instance and the give instance share the same repositories, with option to skip checking for a list of repos.
* This will support
* @param other other repositories metadata
* @param reposToSkip list of repos to skip check for equality
* @return {@code true} iff both instances contain the same repositories apart from differences in generations, not including repos provided in reposToSkip.
*/
public boolean equalsIgnoreGenerationsIgnoreOptionalRepos(@Nullable RepositoriesMetadata other, List<String> reposToSkip) {
if (other == null) {
return false;
}
List<RepositoryMetadata> currentRepositories = repositories.stream()
.filter(repo -> !reposToSkip.contains(repo.name()))
.collect(Collectors.toList());
List<RepositoryMetadata> otherRepositories = other.repositories.stream()
.filter(repo -> !reposToSkip.contains(repo.name()))
.collect(Collectors.toList());
if (otherRepositories.size() != currentRepositories.size()) {
return false;
}
for (int i = 0; i < currentRepositories.size(); i++) {
if (currentRepositories.get(i).equalsIgnoreGenerations(otherRepositories.get(i)) == false) {
return false;
}
}
return true;
}

@Override
public int hashCode() {
return repositories.hashCode();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cluster.routing.remote;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;

import java.io.Closeable;
import java.io.IOException;
import java.util.function.Supplier;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

/**
* A Service which provides APIs to upload and download routing table from remote store.
*
* @opensearch.internal
*/
public class RemoteRoutingTableService implements Closeable {
himshikha marked this conversation as resolved.
Show resolved Hide resolved

/**
* Cluster setting to specify if routing table should be published to remote store
*/
public static final Setting<Boolean> REMOTE_ROUTING_TABLE_ENABLED_SETTING = Setting.boolSetting(
"cluster.remote_store.routing_table.enabled",
false,
Setting.Property.NodeScope,
Setting.Property.Final
himshikha marked this conversation as resolved.
Show resolved Hide resolved
);
private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
}

@Override
public void close() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
}

public void start() {
assert isRemoteRoutingTableEnabled(settings) == true : "Remote routing table is not enabled";
final String remoteStoreRepo = settings.get(
Node.NODE_ATTRIBUTES.getKey() + RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY
);
assert remoteStoreRepo != null : "Remote routing table repository is not configured";
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
}
himshikha marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

/** Package containing class to perform operations on remote routing table */
package org.opensearch.cluster.routing.remote;
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.opensearch.cluster.routing.allocation.decider.SameShardAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.ThrottlingAllocationDecider;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterApplierService;
import org.opensearch.cluster.service.ClusterManagerService;
import org.opensearch.cluster.service.ClusterManagerTaskThrottler;
Expand Down Expand Up @@ -741,7 +742,10 @@ public void apply(Settings value, Settings current, Settings previous) {
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_TYPE_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_PATH_HASH_ALGORITHM_SETTING,
RemoteStoreSettings.CLUSTER_REMOTE_MAX_TRANSLOG_READERS,
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA
RemoteStoreSettings.CLUSTER_REMOTE_STORE_TRANSLOG_METADATA,

// Remote Routing table settings
RemoteRoutingTableService.REMOTE_ROUTING_TABLE_ENABLED_SETTING
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ protected FeatureFlagSettings(
FeatureFlags.DATETIME_FORMATTER_CACHING_SETTING,
FeatureFlags.TIERED_REMOTE_INDEX_SETTING,
FeatureFlags.REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
FeatureFlags.PLUGGABLE_CACHE_SETTING
FeatureFlags.PLUGGABLE_CACHE_SETTING,
FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL_SETTING
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public class FeatureFlags {
*/
public static final String PLUGGABLE_CACHE = "opensearch.experimental.feature.pluggable.caching.enabled";

/**
* Gates the functionality of remote routing table.
*/
public static final String REMOTE_PUBLICATION_EXPERIMENTAL = "opensearch.experimental.feature.remote_store.publication.enabled";

public static final Setting<Boolean> REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
REMOTE_STORE_MIGRATION_EXPERIMENTAL,
false,
Expand All @@ -89,14 +94,21 @@ public class FeatureFlags {

public static final Setting<Boolean> PLUGGABLE_CACHE_SETTING = Setting.boolSetting(PLUGGABLE_CACHE, false, Property.NodeScope);

public static final Setting<Boolean> REMOTE_PUBLICATION_EXPERIMENTAL_SETTING = Setting.boolSetting(
REMOTE_PUBLICATION_EXPERIMENTAL,
false,
Property.NodeScope
);

private static final List<Setting<Boolean>> ALL_FEATURE_FLAG_SETTINGS = List.of(
REMOTE_STORE_MIGRATION_EXPERIMENTAL_SETTING,
EXTENSIONS_SETTING,
IDENTITY_SETTING,
TELEMETRY_SETTING,
DATETIME_FORMATTER_CACHING_SETTING,
TIERED_REMOTE_INDEX_SETTING,
PLUGGABLE_CACHE_SETTING
PLUGGABLE_CACHE_SETTING,
REMOTE_PUBLICATION_EXPERIMENTAL_SETTING
);
/**
* Should store the settings from opensearch.yml.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.TemplatesMetadata;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.common.CheckedRunnable;
import org.opensearch.common.Nullable;
import org.opensearch.common.blobstore.BlobContainer;
Expand Down Expand Up @@ -68,6 +69,7 @@

import static java.util.Objects.requireNonNull;
import static org.opensearch.gateway.PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;
import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteStoreClusterStateEnabled;

/**
Expand Down Expand Up @@ -202,6 +204,7 @@ public class RemoteClusterStateService implements Closeable {
private final List<IndexMetadataUploadListener> indexMetadataUploadListeners;
private BlobStoreRepository blobStoreRepository;
private BlobStoreTransferService blobStoreTransferService;
private Optional<RemoteRoutingTableService> remoteRoutingTableService;
private volatile TimeValue slowWriteLoggingThreshold;

private volatile TimeValue indexMetadataUploadTimeout;
Expand Down Expand Up @@ -253,6 +256,9 @@ public RemoteClusterStateService(
clusterSettings.addSettingsUpdateConsumer(METADATA_MANIFEST_UPLOAD_TIMEOUT_SETTING, this::setMetadataManifestUploadTimeout);
this.remoteStateStats = new RemotePersistenceStats();
this.indexMetadataUploadListeners = indexMetadataUploadListeners;
this.remoteRoutingTableService = isRemoteRoutingTableEnabled(settings)
? Optional.of(new RemoteRoutingTableService(repositoriesService, settings))
: Optional.empty();
}

private BlobStoreTransferService getBlobStoreTransferService() {
Expand Down Expand Up @@ -749,6 +755,9 @@ public void close() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
if (this.remoteRoutingTableService.isPresent()) {
this.remoteRoutingTableService.get().close();
}
}

public void start() {
Expand All @@ -760,6 +769,9 @@ public void start() {
final Repository repository = repositoriesService.get().repository(remoteStoreRepo);
assert repository instanceof BlobStoreRepository : "Repository should be instance of BlobStoreRepository";
blobStoreRepository = (BlobStoreRepository) repository;
if (this.remoteRoutingTableService.isPresent()) {
this.remoteRoutingTableService.get().start();
}
}

private ClusterMetadataManifest uploadManifest(
Expand Down Expand Up @@ -933,6 +945,11 @@ public TimeValue getMetadataManifestUploadTimeout() {
return this.metadataManifestUploadTimeout;
}

// Package private for unit test
Optional<RemoteRoutingTableService> getRemoteRoutingTableService() {
return this.remoteRoutingTableService;
}

static String getManifestFileName(long term, long version, boolean committed, int codecVersion) {
// 123456789012_test-cluster/cluster-state/dsgYj10Nkso7/manifest/manifest__<inverted_term>__<inverted_version>__C/P__<inverted__timestamp>__<codec_version>
return String.join(
Expand Down
2 changes: 1 addition & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -1976,7 +1976,7 @@ public DiscoveryNode apply(BoundTransportAddress boundTransportAddress) {
);

if (isRemoteStoreAttributePresent(settings)) {
remoteStoreNodeService.createAndVerifyRepositories(discoveryNode);
remoteStoreNodeService.createAndVerifyRepositories(discoveryNode, settings);
}

localNode.set(discoveryNode);
Expand Down
Loading
Loading