Skip to content

Commit

Permalink
[CCR] Add metadata to keep track of the index uuid of the leader inde…
Browse files Browse the repository at this point in the history
…x in the follow index (#33367)

The follow index api checks if the recorded uuid in the follow index matches
with uuid of the leader index and fails otherwise. This validation will
prevent a follow index from following an incompatible leader index.

The create_and_follow api will automatically add this custom index metadata
when it creates the follow index.

Closes #31505
  • Loading branch information
martijnvg committed Sep 13, 2018
1 parent 7791c89 commit 9864c86
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public class Ccr extends Plugin implements ActionPlugin, PersistentTaskPlugin, E
public static final String CCR_THREAD_POOL_NAME = "ccr";
public static final String CCR_CUSTOM_METADATA_KEY = "ccr";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS = "leader_index_shard_history_uuids";
public static final String CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY = "leader_index_uuid";

private final boolean enabled;
private final Settings settings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ public ClusterState execute(final ClusterState currentState) throws Exception {
// Adding the leader index uuid for each shard as custom metadata:
Map<String, String> metadata = new HashMap<>();
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_SHARD_HISTORY_UUIDS, String.join(",", historyUUIDs));
metadata.put(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY, leaderIndexMetaData.getIndexUUID());
imdBuilder.putCustom(Ccr.CCR_CUSTOM_METADATA_KEY, metadata);

// Copy all settings, but overwrite a few settings.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,14 @@ static void validate(
if (followIndex == null) {
throw new IllegalArgumentException("follow index [" + request.getFollowerIndex() + "] does not exist");
}
String leaderIndexUUID = leaderIndex.getIndex().getUUID();
String recordedLeaderIndexUUID = followIndex
.getCustomData(Ccr.CCR_CUSTOM_METADATA_KEY)
.get(Ccr.CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY);
if (leaderIndexUUID.equals(recordedLeaderIndexUUID) == false) {
throw new IllegalArgumentException("follow index [" + request.getFollowerIndex() + "] should reference [" + leaderIndexUUID +
"] as leader index but instead reference [" + recordedLeaderIndexUUID + "] as leader index");
}

String[] recordedHistoryUUIDs = extractIndexShardHistoryUUIDs(followIndex);
assert recordedHistoryUUIDs.length == leaderIndexHistoryUUID.length;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.support.ActiveShardCount;
import org.elasticsearch.analysis.common.CommonAnalysisPlugin;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.MappingMetaData;
Expand Down Expand Up @@ -313,10 +312,7 @@ public void testFollowIndexAndCloseNode() throws Exception {
internalCluster().ensureAtLeastNumDataNodes(3);
String leaderIndexSettings = getIndexSettings(3, 1, singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));

String followerIndexSettings = getIndexSettings(3, 1, singletonMap(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index2").setSource(followerIndexSettings, XContentType.JSON));
ensureGreen("index1", "index2");
ensureGreen("index1");

AtomicBoolean run = new AtomicBoolean(true);
Thread thread = new Thread(() -> {
Expand All @@ -338,7 +334,7 @@ public void testFollowIndexAndCloseNode() throws Exception {
final FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", randomIntBetween(32, 2048),
randomIntBetween(2, 10), Long.MAX_VALUE, randomIntBetween(2, 10),
FollowIndexAction.DEFAULT_MAX_WRITE_BUFFER_SIZE, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
client().execute(FollowIndexAction.INSTANCE, followRequest).get();
client().execute(CreateAndFollowIndexAction.INSTANCE, new CreateAndFollowIndexAction.Request(followRequest)).get();

long maxNumDocsReplicated = Math.min(1000, randomLongBetween(followRequest.getMaxBatchOperationCount(),
followRequest.getMaxBatchOperationCount() * 10));
Expand Down Expand Up @@ -415,33 +411,6 @@ public void testFollowNonExistentIndex() throws Exception {
expectThrows(IndexNotFoundException.class, () -> client().execute(FollowIndexAction.INSTANCE, followRequest3).actionGet());
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/33379")
public void testValidateFollowingIndexSettings() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test-leader")
.setSettings(Settings.builder().put(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), true)));
// TODO: indexing should be optional but the current mapping logic requires for now.
client().prepareIndex("test-leader", "doc", "id").setSource("{\"f\": \"v\"}", XContentType.JSON).get();
assertAcked(client().admin().indices().prepareCreate("test-follower").get());
IllegalArgumentException followError = expectThrows(IllegalArgumentException.class, () -> client().execute(
FollowIndexAction.INSTANCE, createFollowRequest("test-leader", "test-follower")).actionGet());
assertThat(followError.getMessage(), equalTo("the following index [test-follower] is not ready to follow;" +
" the setting [index.xpack.ccr.following_index] must be enabled."));
// updating the `following_index` with an open index must not be allowed.
IllegalArgumentException updateError = expectThrows(IllegalArgumentException.class, () -> {
client().admin().indices().prepareUpdateSettings("test-follower")
.setSettings(Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)).get();
});
assertThat(updateError.getMessage(), containsString("Can't update non dynamic settings " +
"[[index.xpack.ccr.following_index]] for open indices [[test-follower/"));
assertAcked(client().admin().indices().prepareClose("test-follower"));
assertAcked(client().admin().indices().prepareUpdateSettings("test-follower")
.setSettings(Settings.builder().put(CcrSettings.CCR_FOLLOWING_INDEX_SETTING.getKey(), true)));
assertAcked(client().admin().indices().prepareOpen("test-follower").setWaitForActiveShards(ActiveShardCount.DEFAULT));
assertAcked(client().execute(FollowIndexAction.INSTANCE,
createFollowRequest("test-leader", "test-follower")).actionGet());
unfollowIndex("test-follower");
}

public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
final String leaderIndexSettings = getIndexSettings(1, between(0, 1),
singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
Expand Down Expand Up @@ -476,6 +445,37 @@ public void testFollowIndex_lowMaxTranslogBytes() throws Exception {
unfollowIndex("index2");
}

public void testDontFollowTheWrongIndex() throws Exception {
String leaderIndexSettings = getIndexSettings(1, 0,
Collections.singletonMap(IndexSettings.INDEX_SOFT_DELETES_SETTING.getKey(), "true"));
assertAcked(client().admin().indices().prepareCreate("index1").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("index1");
assertAcked(client().admin().indices().prepareCreate("index3").setSource(leaderIndexSettings, XContentType.JSON));
ensureGreen("index3");

FollowIndexAction.Request followRequest = new FollowIndexAction.Request("index1", "index2", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
CreateAndFollowIndexAction.Request createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();

followRequest = new FollowIndexAction.Request("index3", "index4", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
createAndFollowRequest = new CreateAndFollowIndexAction.Request(followRequest);
client().execute(CreateAndFollowIndexAction.INSTANCE, createAndFollowRequest).get();
unfollowIndex("index2", "index4");

FollowIndexAction.Request wrongRequest1 = new FollowIndexAction.Request("index1", "index4", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
Exception e = expectThrows(IllegalArgumentException.class,
() -> client().execute(FollowIndexAction.INSTANCE, wrongRequest1).actionGet());
assertThat(e.getMessage(), containsString("follow index [index4] should reference"));

FollowIndexAction.Request wrongRequest2 = new FollowIndexAction.Request("index3", "index2", 1024, 1, 1024L,
1, 10240, TimeValue.timeValueMillis(500), TimeValue.timeValueMillis(10));
e = expectThrows(IllegalArgumentException.class, () -> client().execute(FollowIndexAction.INSTANCE, wrongRequest2).actionGet());
assertThat(e.getMessage(), containsString("follow index [index2] should reference"));
}

private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, final Map<ShardId, Long> numDocsPerShard) {
return () -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
Expand Down Expand Up @@ -512,10 +512,12 @@ private CheckedRunnable<Exception> assertTask(final int numberOfPrimaryShards, f
};
}

private void unfollowIndex(String index) throws Exception {
final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
unfollowRequest.setFollowIndex(index);
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
private void unfollowIndex(String... indices) throws Exception {
for (String index : indices) {
final UnfollowIndexAction.Request unfollowRequest = new UnfollowIndexAction.Request();
unfollowRequest.setFollowIndex(index);
client().execute(UnfollowIndexAction.INSTANCE, unfollowRequest).get();
}
assertBusy(() -> {
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
final PersistentTasksCustomMetaData tasks = clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
Expand Down
Loading

0 comments on commit 9864c86

Please sign in to comment.