Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Routing Table]Read remote index routing #13894

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,39 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.common.blobstore.BlobContainer;
import org.opensearch.common.lifecycle.AbstractLifecycleComponent;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;
import org.opensearch.gateway.remote.ClusterMetadataManifest;
import org.opensearch.gateway.remote.RemoteClusterStateService;
import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable;
import org.opensearch.node.Node;
import org.opensearch.node.remotestore.RemoteStoreNodeAttribute;
import org.opensearch.repositories.RepositoriesService;
import org.opensearch.repositories.Repository;
import org.opensearch.repositories.blobstore.BlobStoreRepository;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled;

Expand All @@ -31,19 +53,114 @@
*/
public class RemoteRoutingTableService extends AbstractLifecycleComponent {

/**
* Cluster setting to specify if routing table should be published to remote store
*/
public static final Setting<Boolean> REMOTE_ROUTING_TABLE_ENABLED_SETTING = Setting.boolSetting(
"cluster.remote_store.routing.enabled",
true,
Setting.Property.NodeScope,
Setting.Property.Final
);
public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing";
public static final String ROUTING_TABLE = "routing-table";
public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing";
public static final String DELIMITER = "__";
public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--";
private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class);
private final Settings settings;
private final Supplier<RepositoriesService> repositoriesService;
private BlobStoreRepository blobStoreRepository;
private final ThreadPool threadPool;

public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService, Settings settings) {
private static final DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = new DiffableUtils.NonDiffableValueSerializer<String, IndexRoutingTable>() {
@Override
public void write(IndexRoutingTable value, StreamOutput out) throws IOException {
value.writeTo(out);
}

@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
}
};


public RemoteRoutingTableService(Supplier<RepositoriesService> repositoriesService,
Settings settings, ThreadPool threadPool) {
assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled";
this.repositoriesService = repositoriesService;
this.settings = settings;
this.threadPool = threadPool;
}


private String getIndexRoutingFileName() {
return String.join(
DELIMITER,
INDEX_ROUTING_FILE_PREFIX,
RemoteStoreUtils.invertLong(System.currentTimeMillis())
);

}
Comment on lines +98 to +105
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cc: @ashking94 for reviewing prefix changes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is not in use for read flow, got added by mistake. I think the following PR has changes related to the prefix hashing: https://github.com/opensearch-project/OpenSearch/pull/13870/files#diff-54b9eb2094b2f28930e8c235f57f00d8cb6547e790684cf54e533756e1251f85R216


public CheckedRunnable<IOException> getAsyncIndexMetadataReadAction(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where can I see the consumer of this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

String uploadedFilename,
Index index,
LatchedActionListener<IndexRoutingTable> latchedActionListener) {
int idx = uploadedFilename.lastIndexOf("/");
String blobFileName = uploadedFilename.substring(idx+1);
BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer( BlobPath.cleanPath().add(uploadedFilename.substring(0,idx)));

return () -> readAsync(
blobContainer,
blobFileName,
index,
threadPool.executor(ThreadPool.Names.GENERIC),
ActionListener.wrap(response -> latchedActionListener.onResponse(response.getIndexRoutingTable()), latchedActionListener::onFailure)
);
}

public void readAsync(BlobContainer blobContainer, String name, Index index, ExecutorService executorService, ActionListener<RemoteIndexRoutingTable> listener) throws IOException {
executorService.execute(() -> {
try {
listener.onResponse(read(blobContainer, name, index));
} catch (Exception e) {
listener.onFailure(e);
}
});
}

public RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) {
try {
return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index);
} catch (IOException | AssertionError e) {
logger.info("RoutingTable read failed with error: {}", e.toString());
throw new RemoteClusterStateService.RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e);
}
}

public List<ClusterMetadataManifest.UploadedIndexMetadata> getUpdatedIndexRoutingTableMetadata(List<String> updatedIndicesRouting, List<ClusterMetadataManifest.UploadedIndexMetadata> allIndicesRouting) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Used to create the Metadata from the list of updated indicesRouting from Manifest in read flow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is this getting called from in RemoteClusterStateService?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in the flow now.

return updatedIndicesRouting.stream().map(idx -> {
Optional<ClusterMetadataManifest.UploadedIndexMetadata> uploadedIndexMetadataOptional = allIndicesRouting.stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst();
assert uploadedIndexMetadataOptional.isPresent() == true;
return uploadedIndexMetadataOptional.get();
}).collect(Collectors.toList());
}


public static DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> getIndicesRoutingMapDiff(RoutingTable before, RoutingTable after) {
return DiffableUtils.diff(
before.getIndicesRouting(),
after.getIndicesRouting(),
DiffableUtils.getStringKeySerializer(),
CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER
);
}


@Override
protected void doClose() throws IOException {
public void doClose() throws IOException {
if (blobStoreRepository != null) {
IOUtils.close(blobStoreRepository);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,17 @@
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.Version;
import org.opensearch.action.LatchedActionListener;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlocks;
import org.opensearch.cluster.coordination.CoordinationMetadata;
import org.opensearch.cluster.metadata.DiffableStringMap;
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.TemplatesMetadata;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.cluster.routing.IndexRoutingTable;
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.remote.RemoteRoutingTableService;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.CheckedRunnable;
Expand All @@ -33,6 +39,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.index.Index;
import org.opensearch.core.xcontent.ToXContent;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute;
Expand Down Expand Up @@ -816,6 +823,175 @@ private ClusterMetadataManifest uploadManifest(
}
}

public ClusterState getClusterStateForManifest(String clusterName, ClusterMetadataManifest manifest, String localNodeId, boolean includeEphemeral)
throws IOException {
return readClusterStateInParallel(
ClusterState.builder(new ClusterName(clusterName)).build(),
manifest,
clusterName,
manifest.getClusterUUID(),
localNodeId,
manifest.getIndices(),
manifest.getCustomMetadataMap(),
manifest.getCoordinationMetadata() != null,
manifest.getSettingsMetadata() != null,
manifest.getTransientSettingsMetadata() != null,
manifest.getTemplatesMetadata() != null,
includeEphemeral && manifest.getDiscoveryNodesMetadata() != null,
includeEphemeral && manifest.getClusterBlocksMetadata() != null,
includeEphemeral ? manifest.getIndicesRouting() : Collections.emptyList(),
includeEphemeral && manifest.getHashesOfConsistentSettings() != null,
includeEphemeral ? manifest.getClusterStateCustomMap() : Collections.emptyMap()
);
}

public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId)
throws IOException {
assert manifest.getDiffManifest() != null;


List<UploadedIndexMetadata> updatedIndexRouting = remoteRoutingTableService.get().getUpdatedIndexRoutingTableMetadata(diff.getIndicesRoutingUpdated(),
manifest.getIndicesRouting());


ClusterState updatedClusterState = readClusterStateInParallel(
previousState,
manifest,
clusterName,
manifest.getClusterUUID(),
localNodeId,
updatedIndices,
updatedCustomMetadata,
diff.isCoordinationMetadataUpdated(),
diff.isSettingsMetadataUpdated(),
diff.isTransientSettingsMetadataUpdated(),
diff.isTemplatesMetadataUpdated(),
diff.isDiscoveryNodesUpdated(),
diff.isClusterBlocksUpdated(),
updatedIndexRouting,
diff.isHashesOfConsistentSettingsUpdated(),
updatedClusterStateCustom
);
ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState);

HashMap<String, IndexRoutingTable> indexRoutingTables = new HashMap<>(updatedClusterState.getRoutingTable().getIndicesRouting());

for (String indexName : diff.getIndicesRoutingDeleted()) {
indexRoutingTables.remove(indexName);
}

RoutingTable routingTable = new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables);

return clusterStateBuilder.
stateUUID(manifest.getStateUUID()).
version(manifest.getStateVersion()).
metadata(metadataBuilder).
routingTable(routingTable).
build();
}

private ClusterState readClusterStateInParallel(
ClusterState previousState,
ClusterMetadataManifest manifest,
String clusterName,
String clusterUUID,
String localNodeId,
List<UploadedIndexMetadata> indicesToRead,
Map<String, UploadedMetadataAttribute> customToRead,
boolean readCoordinationMetadata,
boolean readSettingsMetadata,
boolean readTransientSettingsMetadata,
boolean readTemplatesMetadata,
boolean readDiscoveryNodes,
boolean readClusterBlocks,
Comment on lines +900 to +906
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Create an inner class instead?

List<UploadedIndexMetadata> indicesRoutingToRead,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How are we getting this list indicesRoutingToRead

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

boolean readHashesOfConsistentSettings,
Map<String, UploadedMetadataAttribute> clusterStateCustomToRead
) throws IOException {
int totalReadTasks =
indicesToRead.size() + customToRead.size() + indicesRoutingToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata ? 1 : 0) + (
readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0) + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0)
+ clusterStateCustomToRead.size();
CountDownLatch latch = new CountDownLatch(totalReadTasks);
List<CheckedRunnable<IOException>> asyncMetadataReadActions = new ArrayList<>();
List<RemoteReadResult> readResults = Collections.synchronizedList(new ArrayList<>());
List<IndexRoutingTable> readIndexRoutingTableResults = Collections.synchronizedList(new ArrayList<>());
List<Exception> exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks));

for (UploadedIndexMetadata indexMetadata : indicesToRead) {
asyncMetadataReadActions.add(
remoteIndexMetadataManager.getAsyncIndexMetadataReadAction(
clusterUUID,
indexMetadata.getUploadedFilename(),
listener
)
);
}

LatchedActionListener<IndexRoutingTable> routingTableLatchedActionListener = new LatchedActionListener<>(
ActionListener.wrap(
response -> {
logger.debug("Successfully read cluster state component from remote");
readIndexRoutingTableResults.add(response);
},
ex -> {
logger.error("Failed to read cluster state from remote", ex);
exceptionList.add(ex);
}
),
latch
);

for (UploadedIndexMetadata indexRouting : indicesRoutingToRead) {
asyncMetadataReadActions.add(
remoteRoutingTableService.get().getAsyncIndexMetadataReadAction(
indexRouting.getUploadedFilename(),
new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()),
routingTableLatchedActionListener
)
);
}

for (CheckedRunnable<IOException> asyncMetadataReadAction : asyncMetadataReadActions) {
asyncMetadataReadAction.run();
}

try {
if (latch.await(this.remoteStateReadTimeout.getMillis(), TimeUnit.MILLISECONDS) == false) {
RemoteStateTransferException exception = new RemoteStateTransferException(
"Timed out waiting to read cluster state from remote within timeout " + this.remoteStateReadTimeout
);
exceptionList.forEach(exception::addSuppressed);
throw exception;
}
} catch (InterruptedException e) {
exceptionList.forEach(e::addSuppressed);
RemoteStateTransferException ex = new RemoteStateTransferException("Interrupted while waiting to read cluster state from metadata");
Thread.currentThread().interrupt();
throw ex;
}

if (!exceptionList.isEmpty()) {
RemoteStateTransferException exception = new RemoteStateTransferException("Exception during reading cluster state from remote");
exceptionList.forEach(exception::addSuppressed);
throw exception;
}

ClusterState.Builder clusterStateBuilder = ClusterState.builder(previousState);
Map<String, IndexRoutingTable> indicesRouting = new HashMap<>(previousState.routingTable().getIndicesRouting());


readIndexRoutingTableResults.forEach(indexRoutingTable -> {
indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable);
});
Comment on lines +980 to +986
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if there are index in previousState which are now deleted?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Covered in the main call, added the flow in the PR.


return clusterStateBuilder.metadata(metadataBuilder)
.version(manifest.getStateVersion())
.stateUUID(manifest.getStateUUID())
.routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indicesRouting))
.build();
}

private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName)
throws IOException {
AtomicReference<String> result = new AtomicReference<String>();
Expand Down
Loading
Loading