Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[7.x] Expose master timeout for ILM actions (#51130) #51348

Merged
merged 1 commit into from
Jan 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.unit.TimeValue;

import java.util.Objects;

/**
* Performs an action which must be performed asynchronously because it may take time to complete.
Expand All @@ -26,6 +29,16 @@ protected Client getClient() {
return client;
}

//visible for testing
void setClient(Client client){
this.client = client;
}

public static TimeValue getMasterTimeout(ClusterState clusterState){
Objects.requireNonNull(clusterState, "cannot determine master timeout when cluster state is null");
return LifecycleSettings.LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING.get(clusterState.metaData().settings());
}

public boolean indexSurvives() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;

/**
Expand All @@ -28,11 +29,11 @@ protected Client getClient() {
return client;
}

public abstract void evaluateCondition(IndexMetaData indexMetaData, Listener listener);
public abstract void evaluateCondition(IndexMetaData indexMetaData, Listener listener, TimeValue masterTimeout);

public interface Listener {

void onResponse(boolean conditionMet, ToXContentObject infomationContext);
void onResponse(boolean conditionMet, ToXContentObject informationContext);

void onFailure(Exception e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentCl
}

if (indexMetaData.getState() == IndexMetaData.State.OPEN) {
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(followerIndex);
CloseIndexRequest closeIndexRequest = new CloseIndexRequest(followerIndex)
.masterNodeTimeout(getMasterTimeout(currentClusterState));
getClient().admin().indices().close(closeIndexRequest, ActionListener.wrap(
r -> {
assert r.isAcknowledged() : "close index response is not acknowledged";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public DeleteStep(StepKey key, StepKey nextStepKey, Client client) {
@Override
public void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
getClient().admin().indices()
.delete(new DeleteIndexRequest(indexMetaData.getIndex().getName()),
.delete(new DeleteIndexRequest(indexMetaData.getIndex().getName()).masterNodeTimeout(getMasterTimeout(currentState)),
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public FreezeStep(StepKey key, StepKey nextStepKey, Client client) {
@Override
public void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState currentState, Listener listener) {
getClient().admin().indices().execute(FreezeIndexAction.INSTANCE,
new FreezeRequest(indexMetaData.getIndex().getName()),
new FreezeRequest(indexMetaData.getIndex().getName()).masterNodeTimeout(getMasterTimeout(currentState)),
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ public class LifecycleSettings {
public static final String LIFECYCLE_ORIGINATION_DATE = "index.lifecycle.origination_date";
public static final String LIFECYCLE_PARSE_ORIGINATION_DATE = "index.lifecycle.parse_origination_date";
public static final String LIFECYCLE_HISTORY_INDEX_ENABLED = "index.lifecycle.history_index_enabled";
public static final String LIFECYCLE_STEP_MASTER_TIMEOUT = "index.lifecycle.step.master_timeout";

public static final String SLM_HISTORY_INDEX_ENABLED = "slm.history_index_enabled";
public static final String SLM_RETENTION_SCHEDULE = "slm.retention_schedule";
Expand All @@ -38,7 +39,9 @@ public class LifecycleSettings {
false, Setting.Property.Dynamic, Setting.Property.IndexScope);
public static final Setting<Boolean> LIFECYCLE_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(LIFECYCLE_HISTORY_INDEX_ENABLED,
true, Setting.Property.NodeScope);

public static final Setting<TimeValue> LIFECYCLE_STEP_MASTER_TIMEOUT_SETTING =
Setting.positiveTimeSetting(LIFECYCLE_STEP_MASTER_TIMEOUT, TimeValue.timeValueSeconds(30), Setting.Property.Dynamic,
Setting.Property.NodeScope);

public static final Setting<Boolean> SLM_HISTORY_INDEX_ENABLED_SETTING = Setting.boolSetting(SLM_HISTORY_INDEX_ENABLED, true,
Setting.Property.NodeScope);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ final class OpenFollowerIndexStep extends AsyncActionStep {
public void performAction(IndexMetaData indexMetaData, ClusterState currentClusterState,
ClusterStateObserver observer, Listener listener) {
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
OpenIndexRequest request = new OpenIndexRequest(indexMetaData.getIndex().getName());
OpenIndexRequest request = new OpenIndexRequest(indexMetaData.getIndex().getName())
.masterNodeTimeout(getMasterTimeout(currentClusterState));
getClient().admin().indices().open(request, ActionListener.wrap(
r -> {
assert r.isAcknowledged() : "open index response is not acknowledged";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentClust
}

// Calling rollover with no conditions will always roll over the index
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null);
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null)
.masterNodeTimeout(getMasterTimeout(currentClusterState));
// We don't wait for active shards when we perform the rollover because the
// {@link org.elasticsearch.xpack.core.ilm.WaitForActiveShardsStep} step will do so
rolloverRequest.setWaitForActiveShards(ActiveShardCount.NONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ConstructingObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand Down Expand Up @@ -48,7 +49,7 @@ public int getMaxNumSegments() {
}

@Override
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener, TimeValue masterTimeout) {
getClient().admin().indices().segments(new IndicesSegmentsRequest(indexMetaData.getIndex().getName()),
ActionListener.wrap(response -> {
IndexSegments idxSegments = response.getIndices().get(indexMetaData.getIndex().getName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void performAction(IndexMetaData indexMetaData, ClusterState clusterState
Settings settings = Settings.builder()
.put(IndexMetaData.INDEX_ROUTING_REQUIRE_GROUP_SETTING.getKey() + "_id", nodeId.get()).build();
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName())
.masterNodeTimeout(getMasterTimeout(clusterState))
.settings(settings);
getClient().admin().indices().updateSettings(updateSettingsRequest,
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public void performDuringNoSnapshot(IndexMetaData indexMetaData, ClusterState cu
// get target shrink index
String targetIndexName = shrunkIndexPrefix + index;
IndicesAliasesRequest aliasesRequest = new IndicesAliasesRequest()
.masterNodeTimeout(getMasterTimeout(currentState))
.addAliasAction(IndicesAliasesRequest.AliasActions.removeIndex().index(index))
.addAliasAction(IndicesAliasesRequest.AliasActions.add().index(targetIndexName).alias(index));
// copy over other aliases from original index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ public void performAction(IndexMetaData indexMetaData, ClusterState currentState
.build();

String shrunkenIndexName = shrunkIndexPrefix + indexMetaData.getIndex().getName();
ResizeRequest resizeRequest = new ResizeRequest(shrunkenIndexName, indexMetaData.getIndex().getName());
ResizeRequest resizeRequest = new ResizeRequest(shrunkenIndexName, indexMetaData.getIndex().getName())
.masterNodeTimeout(getMasterTimeout(currentState));
resizeRequest.getTargetIndexRequest().settings(relevantTargetSettings);

getClient().admin().indices().resizeIndex(resizeRequest, ActionListener.wrap(response -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,9 @@ public boolean isRetryable() {

@Override
public void performAction(IndexMetaData indexMetaData, ClusterState currentState, ClusterStateObserver observer, Listener listener) {
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName()).settings(settings);
UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexMetaData.getIndex().getName())
.masterNodeTimeout(getMasterTimeout(currentState))
.settings(settings);
getClient().admin().indices().updateSettings(updateSettingsRequest,
ActionListener.wrap(response -> listener.onResponse(true), listener::onFailure));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ccr.ShardFollowNodeTaskStatus;
Expand All @@ -32,7 +33,7 @@ final class WaitForFollowShardTasksStep extends AsyncWaitStep {
}

@Override
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener, TimeValue masterTimeout) {
Map<String, String> customIndexMetadata = indexMetaData.getCustomData(CCR_METADATA_KEY);
if (customIndexMetadata == null) {
listener.onResponse(true, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;

Expand Down Expand Up @@ -42,7 +43,7 @@ public class WaitForNoFollowersStep extends AsyncWaitStep {
}

@Override
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener, TimeValue masterTimeout) {
IndicesStatsRequest request = new IndicesStatsRequest();
request.clear();
String indexName = indexMetaData.getIndex().getName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public boolean isRetryable() {
}

@Override
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
public void evaluateCondition(IndexMetaData indexMetaData, Listener listener, TimeValue masterTimeout) {
String rolloverAlias = RolloverAction.LIFECYCLE_ROLLOVER_ALIAS_SETTING.get(indexMetaData.getSettings());

if (Strings.isNullOrEmpty(rolloverAlias)) {
Expand Down Expand Up @@ -113,7 +113,7 @@ public void evaluateCondition(IndexMetaData indexMetaData, Listener listener) {
"index [%s] is not the write index for alias [%s]", indexMetaData.getIndex().getName(), rolloverAlias)));
}

RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null);
RolloverRequest rolloverRequest = new RolloverRequest(rolloverAlias, null).masterNodeTimeout(masterTimeout);
rolloverRequest.dryRun(true);
if (maxAge != null) {
rolloverRequest.addMaxIndexAgeCondition(maxAge);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.client.NoOpClient;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.LIFECYCLE_STEP_MASTER_TIMEOUT;
import static org.hamcrest.Matchers.equalTo;

public abstract class AbstractStepMasterTimeoutTestCase<T extends AsyncActionStep> extends AbstractStepTestCase<T> {

protected ThreadPool pool;

@Before
public void setupThreadPool() {
pool = new TestThreadPool("timeoutTestPool");
}

@After
public void shutdownThreadPool() {
pool.shutdownNow();
}

public void testMasterTimeout() {
checkMasterTimeout(TimeValue.timeValueSeconds(30),
ClusterState.builder(ClusterName.DEFAULT).metaData(MetaData.builder().build()).build());
checkMasterTimeout(TimeValue.timeValueSeconds(10),
ClusterState.builder(ClusterName.DEFAULT)
.metaData(MetaData.builder()
.persistentSettings(Settings.builder().put(LIFECYCLE_STEP_MASTER_TIMEOUT, "10s").build())
.build())
.build());
}

private void checkMasterTimeout(TimeValue timeValue, ClusterState currentClusterState) {
T instance = createRandomInstance();
instance.setClient(new NoOpClient(pool) {
@Override
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action,
Request request,
ActionListener<Response> listener) {
if (request instanceof MasterNodeRequest) {
assertThat(((MasterNodeRequest<?>) request).masterNodeTimeout(), equalTo(timeValue));
}
}
});
instance.performAction(getIndexMetaData(), currentClusterState, null, new AsyncActionStep.Listener() {
@Override
public void onResponse(boolean complete) {

}

@Override
public void onFailure(Exception e) {

}
});
}

protected abstract IndexMetaData getIndexMetaData();

public static ClusterState emptyClusterState() {
return ClusterState.builder(ClusterName.DEFAULT).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
*/
package org.elasticsearch.xpack.core.ilm;

import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils;
import org.elasticsearch.xpack.core.ilm.Step.StepKey;

public abstract class AbstractStepTestCase<T extends Step> extends ESTestCase {

protected static final int NUMBER_OF_TEST_RUNS = 20;
protected static final TimeValue MASTER_TIMEOUT = TimeValue.timeValueSeconds(30);

protected abstract T createRandomInstance();
protected abstract T mutateInstance(T instance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,20 @@
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;

public class CloseFollowerIndexStepTests extends AbstractStepTestCase<CloseFollowerIndexStep> {
public class CloseFollowerIndexStepTests extends AbstractStepMasterTimeoutTestCase<CloseFollowerIndexStep> {

public void testCloseFollowingIndex() {
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
@Override
protected IndexMetaData getIndexMetaData() {
return IndexMetaData.builder("follower-index")
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
.numberOfShards(1)
.numberOfReplicas(0)
.build();
}

public void testCloseFollowingIndex() {
IndexMetaData indexMetadata = getIndexMetaData();

Client client = Mockito.mock(Client.class);
AdminClient adminClient = Mockito.mock(AdminClient.class);
Expand All @@ -51,7 +56,7 @@ public void testCloseFollowingIndex() {
Boolean[] completed = new Boolean[1];
Exception[] failure = new Exception[1];
CloseFollowerIndexStep step = new CloseFollowerIndexStep(randomStepKey(), randomStepKey(), client);
step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() {
step.performAction(indexMetadata, emptyClusterState(), null, new AsyncActionStep.Listener() {
@Override
public void onResponse(boolean complete) {
completed[0] = complete;
Expand All @@ -67,12 +72,7 @@ public void onFailure(Exception e) {
}

public void testCloseFollowingIndexFailed() {
IndexMetaData indexMetadata = IndexMetaData.builder("follower-index")
.settings(settings(Version.CURRENT).put(LifecycleSettings.LIFECYCLE_INDEXING_COMPLETE, "true"))
.putCustom(CCR_METADATA_KEY, Collections.emptyMap())
.numberOfShards(1)
.numberOfReplicas(0)
.build();
IndexMetaData indexMetadata = getIndexMetaData();

// Mock pause follow api call:
Client client = Mockito.mock(Client.class);
Expand All @@ -93,7 +93,7 @@ public void testCloseFollowingIndexFailed() {
Boolean[] completed = new Boolean[1];
Exception[] failure = new Exception[1];
CloseFollowerIndexStep step = new CloseFollowerIndexStep(randomStepKey(), randomStepKey(), client);
step.performAction(indexMetadata, null, null, new AsyncActionStep.Listener() {
step.performAction(indexMetadata, emptyClusterState(), null, new AsyncActionStep.Listener() {
@Override
public void onResponse(boolean complete) {
completed[0] = complete;
Expand Down
Loading