diff --git a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java index ba2208e17df1f..6543a52279b2a 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableService.java @@ -10,17 +10,39 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.DiffableUtils; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.blobstore.BlobContainer; import org.opensearch.common.lifecycle.AbstractLifecycleComponent; +import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.io.IOUtils; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; +import org.opensearch.gateway.remote.routingtable.RemoteIndexRoutingTable; import org.opensearch.node.Node; import org.opensearch.node.remotestore.RemoteStoreNodeAttribute; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.Repository; import org.opensearch.repositories.blobstore.BlobStoreRepository; +import org.opensearch.threadpool.ThreadPool; import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutorService; +import java.util.function.Function; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.isRemoteRoutingTableEnabled; @@ -31,19 +53,114 @@ */ public class RemoteRoutingTableService extends AbstractLifecycleComponent { + /** + * Cluster setting to specify if routing table should be published to remote store + */ + public static final Setting REMOTE_ROUTING_TABLE_ENABLED_SETTING = Setting.boolSetting( + "cluster.remote_store.routing.enabled", + true, + Setting.Property.NodeScope, + Setting.Property.Final + ); + public static final String INDEX_ROUTING_PATH_TOKEN = "index-routing"; + public static final String ROUTING_TABLE = "routing-table"; + public static final String INDEX_ROUTING_FILE_PREFIX = "index_routing"; + public static final String DELIMITER = "__"; + public static final String INDEX_ROUTING_METADATA_PREFIX = "indexRouting--"; private static final Logger logger = LogManager.getLogger(RemoteRoutingTableService.class); private final Settings settings; private final Supplier repositoriesService; private BlobStoreRepository blobStoreRepository; + private final ThreadPool threadPool; - public RemoteRoutingTableService(Supplier repositoriesService, Settings settings) { + private static final DiffableUtils.NonDiffableValueSerializer CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER = new DiffableUtils.NonDiffableValueSerializer() { + @Override + public void write(IndexRoutingTable value, StreamOutput out) throws IOException { + value.writeTo(out); + } + + @Override + public IndexRoutingTable read(StreamInput in, String key) throws IOException { + return IndexRoutingTable.readFrom(in); + } + }; + + + public RemoteRoutingTableService(Supplier repositoriesService, + Settings settings, ThreadPool threadPool) { assert isRemoteRoutingTableEnabled(settings) : "Remote routing table is not enabled"; this.repositoriesService = repositoriesService; this.settings = settings; + this.threadPool = threadPool; + } + + + private String getIndexRoutingFileName() { + return String.join( + DELIMITER, + INDEX_ROUTING_FILE_PREFIX, + RemoteStoreUtils.invertLong(System.currentTimeMillis()) + ); + + } + + public CheckedRunnable getAsyncIndexMetadataReadAction( + String uploadedFilename, + Index index, + LatchedActionListener latchedActionListener) { + int idx = uploadedFilename.lastIndexOf("/"); + String blobFileName = uploadedFilename.substring(idx+1); + BlobContainer blobContainer = blobStoreRepository.blobStore().blobContainer( BlobPath.cleanPath().add(uploadedFilename.substring(0,idx))); + + return () -> readAsync( + blobContainer, + blobFileName, + index, + threadPool.executor(ThreadPool.Names.GENERIC), + ActionListener.wrap(response -> latchedActionListener.onResponse(response.getIndexRoutingTable()), latchedActionListener::onFailure) + ); + } + + public void readAsync(BlobContainer blobContainer, String name, Index index, ExecutorService executorService, ActionListener listener) throws IOException { + executorService.execute(() -> { + try { + listener.onResponse(read(blobContainer, name, index)); + } catch (Exception e) { + listener.onFailure(e); + } + }); } + public RemoteIndexRoutingTable read(BlobContainer blobContainer, String path, Index index) { + try { + return new RemoteIndexRoutingTable(blobContainer.readBlob(path), index); + } catch (IOException | AssertionError e) { + logger.info("RoutingTable read failed with error: {}", e.toString()); + throw new RemoteClusterStateService.RemoteStateTransferException("Failed to read RemoteRoutingTable from Manifest with error ", e); + } + } + + public List getUpdatedIndexRoutingTableMetadata(List updatedIndicesRouting, List allIndicesRouting) { + return updatedIndicesRouting.stream().map(idx -> { + Optional uploadedIndexMetadataOptional = allIndicesRouting.stream().filter(idx2 -> idx2.getIndexName().equals(idx)).findFirst(); + assert uploadedIndexMetadataOptional.isPresent() == true; + return uploadedIndexMetadataOptional.get(); + }).collect(Collectors.toList()); + } + + + public static DiffableUtils.MapDiff> getIndicesRoutingMapDiff(RoutingTable before, RoutingTable after) { + return DiffableUtils.diff( + before.getIndicesRouting(), + after.getIndicesRouting(), + DiffableUtils.getStringKeySerializer(), + CUSTOM_ROUTING_TABLE_VALUE_SERIALIZER + ); + } + + @Override - protected void doClose() throws IOException { + public void doClose() throws IOException { if (blobStoreRepository != null) { IOUtils.close(blobStoreRepository); } diff --git a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java index d0593dcd51475..e23360de1f5fe 100644 --- a/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java +++ b/server/src/main/java/org/opensearch/gateway/remote/RemoteClusterStateService.java @@ -13,11 +13,17 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.Version; import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterName; import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.coordination.CoordinationMetadata; +import org.opensearch.cluster.metadata.DiffableStringMap; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.TemplatesMetadata; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; import org.opensearch.cluster.routing.remote.RemoteRoutingTableService; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.CheckedRunnable; @@ -33,6 +39,7 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.io.IOUtils; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.index.Index; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata; import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedMetadataAttribute; @@ -816,6 +823,175 @@ private ClusterMetadataManifest uploadManifest( } } + public ClusterState getClusterStateForManifest(String clusterName, ClusterMetadataManifest manifest, String localNodeId, boolean includeEphemeral) + throws IOException { + return readClusterStateInParallel( + ClusterState.builder(new ClusterName(clusterName)).build(), + manifest, + clusterName, + manifest.getClusterUUID(), + localNodeId, + manifest.getIndices(), + manifest.getCustomMetadataMap(), + manifest.getCoordinationMetadata() != null, + manifest.getSettingsMetadata() != null, + manifest.getTransientSettingsMetadata() != null, + manifest.getTemplatesMetadata() != null, + includeEphemeral && manifest.getDiscoveryNodesMetadata() != null, + includeEphemeral && manifest.getClusterBlocksMetadata() != null, + includeEphemeral ? manifest.getIndicesRouting() : Collections.emptyList(), + includeEphemeral && manifest.getHashesOfConsistentSettings() != null, + includeEphemeral ? manifest.getClusterStateCustomMap() : Collections.emptyMap() + ); + } + + public ClusterState getClusterStateUsingDiff(String clusterName, ClusterMetadataManifest manifest, ClusterState previousState, String localNodeId) + throws IOException { + assert manifest.getDiffManifest() != null; + + + List updatedIndexRouting = remoteRoutingTableService.get().getUpdatedIndexRoutingTableMetadata(diff.getIndicesRoutingUpdated(), + manifest.getIndicesRouting()); + + + ClusterState updatedClusterState = readClusterStateInParallel( + previousState, + manifest, + clusterName, + manifest.getClusterUUID(), + localNodeId, + updatedIndices, + updatedCustomMetadata, + diff.isCoordinationMetadataUpdated(), + diff.isSettingsMetadataUpdated(), + diff.isTransientSettingsMetadataUpdated(), + diff.isTemplatesMetadataUpdated(), + diff.isDiscoveryNodesUpdated(), + diff.isClusterBlocksUpdated(), + updatedIndexRouting, + diff.isHashesOfConsistentSettingsUpdated(), + updatedClusterStateCustom + ); + ClusterState.Builder clusterStateBuilder = ClusterState.builder(updatedClusterState); + + HashMap indexRoutingTables = new HashMap<>(updatedClusterState.getRoutingTable().getIndicesRouting()); + + for (String indexName : diff.getIndicesRoutingDeleted()) { + indexRoutingTables.remove(indexName); + } + + RoutingTable routingTable = new RoutingTable(manifest.getRoutingTableVersion(), indexRoutingTables); + + return clusterStateBuilder. + stateUUID(manifest.getStateUUID()). + version(manifest.getStateVersion()). + metadata(metadataBuilder). + routingTable(routingTable). + build(); + } + + private ClusterState readClusterStateInParallel( + ClusterState previousState, + ClusterMetadataManifest manifest, + String clusterName, + String clusterUUID, + String localNodeId, + List indicesToRead, + Map customToRead, + boolean readCoordinationMetadata, + boolean readSettingsMetadata, + boolean readTransientSettingsMetadata, + boolean readTemplatesMetadata, + boolean readDiscoveryNodes, + boolean readClusterBlocks, + List indicesRoutingToRead, + boolean readHashesOfConsistentSettings, + Map clusterStateCustomToRead + ) throws IOException { + int totalReadTasks = + indicesToRead.size() + customToRead.size() + indicesRoutingToRead.size() + (readCoordinationMetadata ? 1 : 0) + (readSettingsMetadata ? 1 : 0) + ( + readTemplatesMetadata ? 1 : 0) + (readDiscoveryNodes ? 1 : 0) + (readClusterBlocks ? 1 : 0) + (readTransientSettingsMetadata ? 1 : 0) + (readHashesOfConsistentSettings ? 1 : 0) + + clusterStateCustomToRead.size(); + CountDownLatch latch = new CountDownLatch(totalReadTasks); + List> asyncMetadataReadActions = new ArrayList<>(); + List readResults = Collections.synchronizedList(new ArrayList<>()); + List readIndexRoutingTableResults = Collections.synchronizedList(new ArrayList<>()); + List exceptionList = Collections.synchronizedList(new ArrayList<>(totalReadTasks)); + + for (UploadedIndexMetadata indexMetadata : indicesToRead) { + asyncMetadataReadActions.add( + remoteIndexMetadataManager.getAsyncIndexMetadataReadAction( + clusterUUID, + indexMetadata.getUploadedFilename(), + listener + ) + ); + } + + LatchedActionListener routingTableLatchedActionListener = new LatchedActionListener<>( + ActionListener.wrap( + response -> { + logger.debug("Successfully read cluster state component from remote"); + readIndexRoutingTableResults.add(response); + }, + ex -> { + logger.error("Failed to read cluster state from remote", ex); + exceptionList.add(ex); + } + ), + latch + ); + + for (UploadedIndexMetadata indexRouting : indicesRoutingToRead) { + asyncMetadataReadActions.add( + remoteRoutingTableService.get().getAsyncIndexMetadataReadAction( + indexRouting.getUploadedFilename(), + new Index(indexRouting.getIndexName(), indexRouting.getIndexUUID()), + routingTableLatchedActionListener + ) + ); + } + + for (CheckedRunnable asyncMetadataReadAction : asyncMetadataReadActions) { + asyncMetadataReadAction.run(); + } + + try { + if (latch.await(this.remoteStateReadTimeout.getMillis(), TimeUnit.MILLISECONDS) == false) { + RemoteStateTransferException exception = new RemoteStateTransferException( + "Timed out waiting to read cluster state from remote within timeout " + this.remoteStateReadTimeout + ); + exceptionList.forEach(exception::addSuppressed); + throw exception; + } + } catch (InterruptedException e) { + exceptionList.forEach(e::addSuppressed); + RemoteStateTransferException ex = new RemoteStateTransferException("Interrupted while waiting to read cluster state from metadata"); + Thread.currentThread().interrupt(); + throw ex; + } + + if (!exceptionList.isEmpty()) { + RemoteStateTransferException exception = new RemoteStateTransferException("Exception during reading cluster state from remote"); + exceptionList.forEach(exception::addSuppressed); + throw exception; + } + + ClusterState.Builder clusterStateBuilder = ClusterState.builder(previousState); + Map indicesRouting = new HashMap<>(previousState.routingTable().getIndicesRouting()); + + + readIndexRoutingTableResults.forEach(indexRoutingTable -> { + indicesRouting.put(indexRoutingTable.getIndex().getName(), indexRoutingTable); + }); + + return clusterStateBuilder.metadata(metadataBuilder) + .version(manifest.getStateVersion()) + .stateUUID(manifest.getStateUUID()) + .routingTable(new RoutingTable(manifest.getRoutingTableVersion(), indicesRouting)) + .build(); + } + private void writeMetadataManifest(String clusterName, String clusterUUID, ClusterMetadataManifest uploadManifest, String fileName) throws IOException { AtomicReference result = new AtomicReference(); diff --git a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java index 9a9cbfa153259..6395b910e2c31 100644 --- a/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java +++ b/server/src/test/java/org/opensearch/cluster/routing/remote/RemoteRoutingTableServiceTests.java @@ -8,8 +8,24 @@ package org.opensearch.cluster.routing.remote; +import org.opensearch.Version; +import org.opensearch.action.LatchedActionListener; +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.routing.IndexRoutingTable; +import org.opensearch.cluster.routing.RoutingTable; +import org.opensearch.common.CheckedRunnable; +import org.opensearch.common.blobstore.BlobContainer; +import org.opensearch.common.blobstore.BlobPath; +import org.opensearch.common.blobstore.BlobStore; +import org.opensearch.common.compress.DeflateCompressor; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.index.Index; +import org.opensearch.gateway.remote.ClusterMetadataManifest; +import org.opensearch.gateway.remote.RemoteClusterStateService; import org.opensearch.repositories.FilterRepository; import org.opensearch.repositories.RepositoriesService; import org.opensearch.repositories.RepositoryMissingException; @@ -17,9 +33,17 @@ import org.opensearch.test.OpenSearchTestCase; import org.junit.After; import org.junit.Before; +import org.opensearch.threadpool.TestThreadPool; +import org.opensearch.threadpool.ThreadPool; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.function.Supplier; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import static org.opensearch.common.util.FeatureFlags.REMOTE_PUBLICATION_EXPERIMENTAL; import static org.opensearch.node.remotestore.RemoteStoreNodeAttribute.REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY; import static org.mockito.Mockito.doThrow; @@ -29,9 +53,14 @@ public class RemoteRoutingTableServiceTests extends OpenSearchTestCase { private RemoteRoutingTableService remoteRoutingTableService; + private ClusterSettings clusterSettings; private Supplier repositoriesServiceSupplier; private RepositoriesService repositoriesService; private BlobStoreRepository blobStoreRepository; + private BlobStore blobStore; + private BlobContainer blobContainer; + private BlobPath basePath; + private final ThreadPool threadPool = new TestThreadPool(getClass().getName()); @Before public void setup() { @@ -43,30 +72,46 @@ public void setup() { .put("node.attr." + REMOTE_STORE_ROUTING_TABLE_REPOSITORY_NAME_ATTRIBUTE_KEY, "routing_repository") .build(); + clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + blobStoreRepository = mock(BlobStoreRepository.class); + when(blobStoreRepository.getCompressor()).thenReturn(new DeflateCompressor()); + blobStore = mock(BlobStore.class); + blobContainer = mock(BlobContainer.class); when(repositoriesService.repository("routing_repository")).thenReturn(blobStoreRepository); - + when(blobStoreRepository.blobStore()).thenReturn(blobStore); Settings nodeSettings = Settings.builder().put(REMOTE_PUBLICATION_EXPERIMENTAL, "true").build(); FeatureFlags.initializeFeatureFlags(nodeSettings); - remoteRoutingTableService = new RemoteRoutingTableService(repositoriesServiceSupplier, settings); + basePath = BlobPath.cleanPath().add("base-path"); + + remoteRoutingTableService = new RemoteRoutingTableService( + repositoriesServiceSupplier, + settings, + threadPool + ); } @After public void teardown() throws Exception { super.tearDown(); remoteRoutingTableService.close(); + threadPool.shutdown(); } + public void testFailInitializationWhenRemoteRoutingDisabled() { final Settings settings = Settings.builder().build(); - assertThrows(AssertionError.class, () -> new RemoteRoutingTableService(repositoriesServiceSupplier, settings)); + assertThrows( + AssertionError.class, + () -> new RemoteRoutingTableService( + repositoriesServiceSupplier, + settings, + new ThreadPool(settings) + ) + ); } - public void testFailStartWhenRepositoryNotSet() { - doThrow(new RepositoryMissingException("repository missing")).when(repositoriesService).repository("routing_repository"); - assertThrows(RepositoryMissingException.class, () -> remoteRoutingTableService.start()); - } public void testFailStartWhenNotBlobRepository() { final FilterRepository filterRepository = mock(FilterRepository.class); @@ -74,4 +119,177 @@ public void testFailStartWhenNotBlobRepository() { assertThrows(AssertionError.class, () -> remoteRoutingTableService.start()); } + public void testGetChangedIndicesRouting() { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final Index index = new Index(indexName, "uuid"); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(1).build(); + + RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build(); + ClusterState state = ClusterState.builder(ClusterName.DEFAULT).routingTable(routingTable).build(); + + assertEquals(0, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), state.getRoutingTable()).getUpserts().size()); + + //Reversing order to check for equality without order. + IndexRoutingTable indexRouting = routingTable.getIndicesRouting().get(indexName); + IndexRoutingTable indexRoutingTable = IndexRoutingTable.builder(index).addShard(indexRouting.getShards().get(0).replicaShards().get(0)) + .addShard(indexRouting.getShards().get(0).primaryShard()).build(); + ClusterState newState = ClusterState.builder(ClusterName.DEFAULT).routingTable(RoutingTable.builder().add(indexRoutingTable).build()).build(); + assertEquals(0, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), newState.getRoutingTable()).getUpserts().size()); + } + + public void testIndicesRoutingDiffWhenIndexDeleted() { + + ClusterState state = createIndices(randomIntBetween(1,100)); + RoutingTable routingTable = state.routingTable(); + + List allIndices = new ArrayList<>(); + routingTable.getIndicesRouting().forEach((k,v) -> allIndices.add(k)); + + String indexNameToDelete = allIndices.get(randomIntBetween(0, allIndices.size()-1)); + RoutingTable updatedRoutingTable = RoutingTable.builder(routingTable).remove(indexNameToDelete).build(); + + assertEquals(1, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getDeletes().size()); + assertEquals(indexNameToDelete, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getDeletes().get(0)); + } + + public void testIndicesRoutingDiffWhenIndexDeletedAndAdded() { + + ClusterState state = createIndices(randomIntBetween(1,100)); + RoutingTable routingTable = state.routingTable(); + + List allIndices = new ArrayList<>(); + routingTable.getIndicesRouting().forEach((k,v) -> allIndices.add(k)); + + String indexNameToDelete = allIndices.get(randomIntBetween(0, allIndices.size()-1)); + RoutingTable.Builder updatedRoutingTableBuilder = RoutingTable.builder(routingTable).remove(indexNameToDelete); + + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(1).build(); + + RoutingTable updatedRoutingTable = updatedRoutingTableBuilder.addAsNew(indexMetadata).build(); + + assertEquals(1, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getDeletes().size()); + assertEquals(indexNameToDelete, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getDeletes().get(0)); + + assertEquals(1, RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getUpserts().size()); + assertTrue(RemoteRoutingTableService.getIndicesRoutingMapDiff(state.getRoutingTable(), updatedRoutingTable).getUpserts().containsKey(indexName)); + } + + public void testGetAsyncIndexMetadataReadAction() throws Exception { + String indexName = randomAlphaOfLength(randomIntBetween(1,50)); + ClusterState clusterState = createClusterState(indexName); + String uploadedFileName = String.format("index-routing/" + indexName); + Index index = new Index(indexName, "uuid-01"); + + LatchedActionListener listener = mock(LatchedActionListener.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + when(blobContainer.readBlob(indexName)).thenReturn(new IndexRoutingTableInputStream(clusterState.routingTable().getIndicesRouting().get(indexName))); + remoteRoutingTableService.start(); + + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexMetadataReadAction( + uploadedFileName, index, listener + ); + assertNotNull(runnable); + runnable.run(); + + verify(blobContainer, times(1)).readBlob(any()); + assertBusy(() -> verify(listener, times(1)).onResponse(any(IndexRoutingTable.class))); + } + + public void testGetAsyncIndexMetadataReadActionFailureForIncorrectIndex() throws Exception { + String indexName = randomAlphaOfLength(randomIntBetween(1,50)); + ClusterState clusterState = createClusterState(indexName); + String uploadedFileName = String.format("index-routing/" + indexName); + Index index = new Index("incorrect-index", "uuid-01"); + + LatchedActionListener listener = mock(LatchedActionListener.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + when(blobContainer.readBlob(indexName)).thenReturn(new IndexRoutingTableInputStream(clusterState.routingTable().getIndicesRouting().get(indexName))); + remoteRoutingTableService.start(); + + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexMetadataReadAction( + uploadedFileName, index, listener + ); + assertNotNull(runnable); + runnable.run(); + + verify(blobContainer, times(1)).readBlob(any()); + assertBusy(() -> verify(listener, times(1)).onFailure(any(Exception.class))); + } + + public void testGetAsyncIndexMetadataReadActionFailureInBlobRepo() throws Exception { + String indexName = randomAlphaOfLength(randomIntBetween(1,50)); + ClusterState clusterState = createClusterState(indexName); + String uploadedFileName = String.format("index-routing/" + indexName); + Index index = new Index(indexName, "uuid-01"); + + LatchedActionListener listener = mock(LatchedActionListener.class); + when(blobStore.blobContainer(any())).thenReturn(blobContainer); + doThrow(new IOException("testing failure")).when(blobContainer).readBlob(indexName); + remoteRoutingTableService.start(); + + CheckedRunnable runnable = remoteRoutingTableService.getAsyncIndexMetadataReadAction( + uploadedFileName, index, listener + ); + assertNotNull(runnable); + runnable.run(); + + verify(blobContainer, times(1)).readBlob(any()); + assertBusy(() -> verify(listener, times(1)).onFailure(any(RemoteClusterStateService.RemoteStateTransferException.class))); + } + + public void testGetUpdatedIndexRoutingTableMetadataWhenNoChange() { + List updatedIndicesRouting = new ArrayList<>(); + List indicesRouting = randomUploadedIndexMetadataList(); + List updatedIndexMetadata = remoteRoutingTableService.getUpdatedIndexRoutingTableMetadata(updatedIndicesRouting, indicesRouting); + assertEquals(0, updatedIndexMetadata.size()); + } + + public void testGetUpdatedIndexRoutingTableMetadataWhenIndexIsUpdated() { + List updatedIndicesRouting = new ArrayList<>(); + List indicesRouting = randomUploadedIndexMetadataList(); + ClusterMetadataManifest.UploadedIndexMetadata expectedIndexRouting = indicesRouting.get(randomIntBetween(0, indicesRouting.size())); + updatedIndicesRouting.add(expectedIndexRouting.getIndexName()); + List updatedIndexMetadata = remoteRoutingTableService.getUpdatedIndexRoutingTableMetadata(updatedIndicesRouting, indicesRouting); + assertEquals(1, updatedIndexMetadata.size()); + assertEquals(expectedIndexRouting, updatedIndexMetadata.get(0)); + } + + private ClusterState createIndices(int numberOfIndices) { + RoutingTable.Builder routingTableBuilder = RoutingTable.builder(); + for(int i=0; i< numberOfIndices; i++) { + String indexName = randomAlphaOfLength(randomIntBetween(1, 50)); + final Index index = new Index(indexName, "uuid"); + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(1).numberOfReplicas(1).build(); + + routingTableBuilder.addAsNew(indexMetadata); + } + return ClusterState.builder(ClusterName.DEFAULT).routingTable(routingTableBuilder.build()).build(); + } + + private ClusterState createClusterState(String indexName) { + final IndexMetadata indexMetadata = new IndexMetadata.Builder(indexName).settings( + Settings.builder() + .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) + .put(IndexMetadata.SETTING_INDEX_UUID, "uuid") + .build() + ).numberOfShards(randomInt(1000)).numberOfReplicas(randomInt(10)).build(); + RoutingTable routingTable = RoutingTable.builder().addAsNew(indexMetadata).build(); + return ClusterState.builder(ClusterName.DEFAULT).routingTable(routingTable).build(); + } }