Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update index mappings when ccr restore complete #36879

Merged
merged 6 commits into from
Dec 20, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ccr.action;

import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.xcontent.XContentType;

public final class CcrRequests {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


private CcrRequests() {}

public static ClusterStateRequest metaDataRequest(String leaderIndex) {
ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex);
return clusterStateRequest;
}

public static PutMappingRequest putMappingRequest(String followerIndex, MappingMetaData mappingMetaData) {
PutMappingRequest putMappingRequest = new PutMappingRequest(followerIndex);
putMappingRequest.type(mappingMetaData.type());
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
return putMappingRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.engine.CommitStats;
Expand Down Expand Up @@ -123,10 +122,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
Index leaderIndex = params.getLeaderShardId().getIndex();
Index followIndex = params.getFollowShardId().getIndex();

ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex.getName());
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());

remoteClient.admin().cluster().state(clusterStateRequest, ActionListener.wrap(clusterStateResponse -> {
IndexMetaData indexMetaData = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
Expand All @@ -140,9 +136,7 @@ protected void innerUpdateMapping(LongConsumer handler, Consumer<Exception> erro
indexMetaData.getMappings().size() + "]";
MappingMetaData mappingMetaData = indexMetaData.getMappings().iterator().next().value;

PutMappingRequest putMappingRequest = new PutMappingRequest(followIndex.getName());
putMappingRequest.type(mappingMetaData.type());
putMappingRequest.source(mappingMetaData.source().string(), XContentType.JSON);
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followIndex.getName(), mappingMetaData);
followerClient.admin().indices().putMapping(putMappingRequest, ActionListener.wrap(
putMappingResponse -> handler.accept(indexMetaData.getMappingVersion()),
errorHandler));
Expand All @@ -154,10 +148,7 @@ protected void innerUpdateSettings(final LongConsumer finalHandler, final Consum
final Index leaderIndex = params.getLeaderShardId().getIndex();
final Index followIndex = params.getFollowShardId().getIndex();

final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(leaderIndex.getName());
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());

CheckedConsumer<ClusterStateResponse, Exception> onResponse = clusterStateResponse -> {
final IndexMetaData leaderIMD = clusterStateResponse.getState().metaData().getIndexSafe(leaderIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@

import org.apache.lucene.index.IndexCommit;
import org.elasticsearch.Version;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.RepositoryMetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -21,6 +24,7 @@
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.engine.EngineException;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardRecoveryException;
Expand All @@ -37,6 +41,7 @@
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.xpack.ccr.Ccr;
import org.elasticsearch.xpack.ccr.CcrLicenseChecker;
import org.elasticsearch.xpack.ccr.action.CcrRequests;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionAction;
import org.elasticsearch.xpack.ccr.action.repositories.ClearCcrRestoreSessionRequest;
import org.elasticsearch.xpack.ccr.action.repositories.PutCcrRestoreSessionAction;
Expand Down Expand Up @@ -111,15 +116,10 @@ public SnapshotInfo getSnapshotInfo(SnapshotId snapshotId) {
public MetaData getSnapshotGlobalMetaData(SnapshotId snapshotId) {
assert SNAPSHOT_ID.equals(snapshotId) : "RemoteClusterRepository only supports " + SNAPSHOT_ID + " as the SnapshotId";
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
ClusterStateResponse response = remoteClient
.admin()
.cluster()
.prepareState()
.clear()
.setMetaData(true)
.setIndices("dummy_index_name") // We set a single dummy index name to avoid fetching all the index data
.get();
return response.getState().metaData();
// We set a single dummy index name to avoid fetching all the index data
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest("dummy_index_name");
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
return clusterState.getState().metaData();
}

@Override
Expand All @@ -128,18 +128,12 @@ public IndexMetaData getSnapshotIndexMetaData(SnapshotId snapshotId, IndexId ind
String leaderIndex = index.getName();
Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);

ClusterStateResponse response = remoteClient
.admin()
.cluster()
.prepareState()
.clear()
.setMetaData(true)
.setIndices(leaderIndex)
.get();
ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex);
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();

// Validates whether the leader cluster has been configured properly:
PlainActionFuture<String[]> future = PlainActionFuture.newFuture();
IndexMetaData leaderIndexMetaData = response.getState().metaData().index(leaderIndex);
IndexMetaData leaderIndexMetaData = clusterState.getState().metaData().index(leaderIndex);
ccrLicenseChecker.fetchLeaderHistoryUUIDs(remoteClient, leaderIndexMetaData, future::onFailure, future::onResponse);
String[] leaderHistoryUUIDs = future.actionGet();

Expand Down Expand Up @@ -252,7 +246,8 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v

Map<String, String> ccrMetaData = indexShard.indexSettings().getIndexMetaData().getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY);
String leaderUUID = ccrMetaData.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
ShardId leaderShardId = new ShardId(shardId.getIndexName(), leaderUUID, shardId.getId());
Index leaderIndex = new Index(shardId.getIndexName(), leaderUUID);
ShardId leaderShardId = new ShardId(leaderIndex, shardId.getId());

Client remoteClient = client.getRemoteClusterClient(remoteClusterAlias);
String sessionUUID = UUIDs.randomBase64UUID();
Expand All @@ -261,13 +256,28 @@ public void restoreShard(IndexShard indexShard, SnapshotId snapshotId, Version v
String nodeId = response.getNodeId();
// TODO: Implement file restore
closeSession(remoteClient, nodeId, sessionUUID);
maybeUpdateMappings(client, remoteClient, leaderIndex, indexShard.indexSettings());
}

@Override
public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, Version version, IndexId indexId, ShardId leaderShardId) {
throw new UnsupportedOperationException("Unsupported for repository of type: " + TYPE);
}

private void maybeUpdateMappings(Client localClient, Client remoteClient, Index leaderIndex, IndexSettings followerIndexSettings) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@martijnvg would it make sense to share this code with the one in ShardFollowTasksExecutor?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually this code is doing the same as the update mapping code in shard follow task, but the update mapping code is tightly coupled with ShardFollowTasksExecutor and ShardFollowNodeTask. Also the mapping update code is asynchronous over there and here it is synchronous.

What I think is possible, is that factory methods for both ClusterStateRequest and PutMappingRequest are introduced that is then used here and in ShardFollowTasksExecutor.

ClusterStateRequest clusterStateRequest = CcrRequests.metaDataRequest(leaderIndex.getName());
ClusterStateResponse clusterState = remoteClient.admin().cluster().state(clusterStateRequest).actionGet();
IndexMetaData leaderIndexMetadata = clusterState.getState().metaData().getIndexSafe(leaderIndex);
long leaderMappingVersion = leaderIndexMetadata.getMappingVersion();

if (leaderMappingVersion > followerIndexSettings.getIndexMetaData().getMappingVersion()) {
Index followerIndex = followerIndexSettings.getIndex();
MappingMetaData mappingMetaData = leaderIndexMetadata.mapping();
PutMappingRequest putMappingRequest = CcrRequests.putMappingRequest(followerIndex.getName(), mappingMetaData);
localClient.admin().indices().putMapping(putMappingRequest).actionGet();
}
}

private void closeSession(Client remoteClient, String nodeId, String sessionUUID) {
ClearCcrRestoreSessionRequest clearRequest = new ClearCcrRestoreSessionRequest(nodeId,
new ClearCcrRestoreSessionRequest.Request(nodeId, sessionUUID));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,22 @@

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.RestoreInProgress;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.support.XContentMapValues;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.repositories.RepositoriesService;
Expand All @@ -35,13 +38,15 @@
import org.elasticsearch.xpack.ccr.repository.CcrRestoreSourceService;

import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import static java.util.Collections.singletonMap;
import static org.elasticsearch.snapshots.RestoreService.restoreInProgress;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;

// TODO: Fold this integration test into a more expansive integration test as more bootstrap from remote work
// TODO: is completed.
Expand Down Expand Up @@ -195,6 +200,60 @@ public void testThatSessionIsRegisteredWithPrimaryShard() throws IOException {
assertEquals(0, restoreInfo.failedShards());
}

public void testFollowerMappingIsUpdated() throws IOException {
String leaderClusterRepoName = CcrRepository.NAME_PREFIX + "leader_cluster";
String leaderIndex = "index1";
String followerIndex = "index2";

final int numberOfPrimaryShards = randomIntBetween(1, 3);
final String leaderIndexSettings = getIndexSettings(numberOfPrimaryShards, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(leaderClient().admin().indices().prepareCreate(leaderIndex).setSource(leaderIndexSettings, XContentType.JSON));
ensureLeaderGreen(leaderIndex);

final RestoreService restoreService = getFollowerCluster().getCurrentMasterNodeInstance(RestoreService.class);
final ClusterService clusterService = getFollowerCluster().getCurrentMasterNodeInstance(ClusterService.class);

Settings.Builder settingsBuilder = Settings.builder()
.put(IndexMetaData.SETTING_INDEX_PROVIDED_NAME, followerIndex)
.put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true);
RestoreService.RestoreRequest restoreRequest = new RestoreService.RestoreRequest(leaderClusterRepoName,
CcrRepository.LATEST, new String[]{leaderIndex}, indicesOptions,
"^(.*)$", followerIndex, Settings.EMPTY, new TimeValue(1, TimeUnit.HOURS), false,
false, true, settingsBuilder.build(), new String[0],
"restore_snapshot[" + leaderClusterRepoName + ":" + leaderIndex + "]");

// TODO: Eventually when the file recovery work is complete, we should test updated mappings by
// indexing to the leader while the recovery is happening. However, into order to that test mappings
// are updated prior to that work, we index documents in the clear session callback. This will
// ensure a mapping change prior to the final mapping check on the follower side.
for (CcrRestoreSourceService restoreSourceService : getLeaderCluster().getDataNodeInstances(CcrRestoreSourceService.class)) {
restoreSourceService.addCloseSessionListener(s -> {
final String source = String.format(Locale.ROOT, "{\"k\":%d}", 1);
leaderClient().prepareIndex("index1", "doc", Long.toString(1)).setSource(source, XContentType.JSON).get();
});
}

PlainActionFuture<RestoreInfo> future = PlainActionFuture.newFuture();
restoreService.restoreSnapshot(restoreRequest, waitForRestore(clusterService, future));
RestoreInfo restoreInfo = future.actionGet();

assertEquals(restoreInfo.totalShards(), restoreInfo.successfulShards());
assertEquals(0, restoreInfo.failedShards());

ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.clear();
clusterStateRequest.metaData(true);
clusterStateRequest.indices(followerIndex);
ClusterStateResponse clusterState = followerClient().admin().cluster().state(clusterStateRequest).actionGet();
IndexMetaData followerIndexMetadata = clusterState.getState().metaData().index(followerIndex);
assertEquals(2, followerIndexMetadata.getMappingVersion());

MappingMetaData mappingMetaData = followerClient().admin().indices().prepareGetMappings("index2").get().getMappings()
.get("index2").get("doc");
assertThat(XContentMapValues.extractValue("properties.k.type", mappingMetaData.sourceAsMap()), equalTo("long"));
}

private ActionListener<RestoreService.RestoreCompletionResponse> waitForRestore(ClusterService clusterService,
ActionListener<RestoreInfo> listener) {
return new ActionListener<RestoreService.RestoreCompletionResponse>() {
Expand Down