Skip to content

Commit

Permalink
Update index mappings when ccr restore complete (#36879) (#36920)
Browse files Browse the repository at this point in the history
This is related to #35975. When the shard restore process is complete,
the index mappings need to be updated to ensure that the data in the
files restores is compatible with the follower mappings. This commit
implements a mapping update as the final step in a shard restore.
  • Loading branch information
Tim-Brooks authored Dec 21, 2018
1 parent 7d3d12f commit b1a520e
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 31 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.xcontent.XContentType;

public final class CcrRequests {

private CcrRequests() {}

public static ClusterStateRequest metaDataRequest(String leaderIndex) {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex);
return clusterStateRequest;
}

public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) {
PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex);
putMappingRequest.type(mappingMetaData.type());
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
return putMappingRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CommitStats;
Expand Down Expand Up @@ -123,10 +122,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
Index leaderIndex = params.getLeaderShardId().getIndex();
Index followIndex = params.getFollowShardId().getIndex();

ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex.getName());
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());

remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
Expand All @@ -140,9 +136,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
indexMetaData.getMappings().size() + "]";
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value;

PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName());
putMappingRequest.type(mappingMetaData.type());
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followIndex.getName(), mappingMetaData);
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
errorHandler));
Expand All @@ -154,10 +148,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
final Index leaderIndex = params.getLeaderShardId().getIndex();
final Index followIndex = params.getFollowShardId().getIndex();

final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex.getName());
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());

CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -21,6 +24,7 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRecoveryException;
Expand All @@ -37,6 +41,7 @@
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.action.CcrRequests;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
Expand Down Expand Up @@ -111,15 +116,10 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
ClusterStateResponse response = remoteClient
.admin()
.cluster()
.prepareState()
.clear()
.setMetaData(true)
.setIndices("dummy_index_name") // We set a single dummy index name to avoid fetching all the index data
.get();
return response.getState().metaData();
// We set a single dummy index name to avoid fetching all the index data
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name");
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
return clusterState.getState().metaData();
}

@Override
Expand All @@ -128,18 +128,12 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
String leaderIndex = index.getName();
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);

ClusterStateResponse response = remoteClient
.admin()
.cluster()
.prepareState()
.clear()
.setMetaData(true)
.setIndices(leaderIndex)
.get();
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex);
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();

// Validates whether the leader cluster has been configured properly:
PlainActionFuture<String[]> future = PlainActionFuture.newFuture();
IndexMetaData leaderIndexMetaData = response.getState().metaData().index(leaderIndex);
IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex);
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse);
String[] leaderHistoryUUIDs = future.actionGet();

Expand Down Expand Up @@ -252,7 +246,8 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v

Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId());
Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID);
ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId());

Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
String sessionUUID = UUIDs.randomBase64UUID();
Expand All @@ -261,13 +256,30 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
String nodeId = response.getNodeId();
// TODO: Implement file restore
closeSession(remoteClient, nodeId, sessionUUID);
maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings());
}

@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();

if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) {
Index followerIndex = followerIndexSettings.getIndex();
assert leaderIndexMetadata.getMappings().size() == 1 : "expected exactly one mapping, but got [" +
leaderIndexMetadata.getMappings().size() + "]";
MappingMetaData mappingMetaData = leaderIndexMetadata.getMappings().iterator().next().value;
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
localClient.admin().indices().putMapping(putMappingRequest).actionGet();
}
}

private void closeSession(Client remoteClient, String nodeId, String sessionUUID) {
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId,
new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.RepositoriesService;
Expand All @@ -35,13 +38,15 @@
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;

import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

// TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work
// TODO: is completed.
Expand Down Expand Up @@ -195,6 +200,60 @@ public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException {
assertEquals(0, restoreInfo.failedShards());
}

public void testFollowerMappingIsUpdated() throws IOException {
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
String followerIndex = "index2";

final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen(leaderIndex);

final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);

Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName,
CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions,
"^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false,
false, true, settingsBuilder.build(), new String[0],
"restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]");

// TODO: Eventually when the file recovery work is complete, we should test updated mappings by
// indexing to the leader while the recovery is happening. However, into order to that test mappings
// are updated prior to that work, we index documents in the clear session callback. This will
// ensure a mapping change prior to the final mapping check on the follower side.
for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) {
restoreSourceService.addCloseSessionListener(s -> {
final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1);
leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get();
});
}

PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
RestoreInfo restoreInfo = future.actionGet();

assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards());
assertEquals(0, restoreInfo.failedShards());

ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(followerIndex);
ClusterStateResponse clusterState = followerClient().admin().cluster().state(clusterStateRequest).actionGet();
IndexMetaData followerIndexMetadata = clusterState.getState().metaData().index(followerIndex);
assertEquals(2, followerIndexMetadata.getMappingVersion());

MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings()
.get("index2").get("doc");
assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));
}

private ActionListener<RestoreService.RestoreCompletionResponse> waitForRestore(ClusterService clusterService,
ActionListener<RestoreInfo> listener) {
return new ActionListener<RestoreService.RestoreCompletionResponse>() {
Expand Down

0 comments on commit b1a520e

Please sign in to comment.