Skip to content

Commit

Permalink
Move IndexLifecycleMetadata installation to put-lifecycle-action (#31346
Browse files Browse the repository at this point in the history
)

There is a problematic scenario with x-pack-cluster master-nodes
attempting to install custom metadata into the cluster-state and
broadcasting that to non-x-pack-enabled nodes. Since those nodes
are not aware of this custom metadata, their cluster-state recovery
will be broken. This change ensures that newly-elected x-pack master
nodes bootstrap IndexLifecycleMetadata upon the first request to
leverage its features. This means that PutLifecycleAction is
now responsible for installing the metadata. Since this X-Pack API
can only be called once all nodes in the cluster have x-pack enabled,
it is safe to assume that the cluster will appropriately handle the
cluster-state recovery with the new set of index-lifecycle metadata.
  • Loading branch information
talevy authored Jun 15, 2018
1 parent 2af05e5 commit 80ab773
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
public class IndexLifecycleMetadata implements XPackMetaDataCustom {
public static final String TYPE = "index_lifecycle";
public static final ParseField POLICIES_FIELD = new ParseField("policies");
public static final IndexLifecycleMetadata EMPTY = new IndexLifecycleMetadata(Collections.emptySortedMap());

@SuppressWarnings("unchecked")
public static final ConstructingObjectParser<IndexLifecycleMetadata, Void> PARSER = new ConstructingObjectParser<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import org.elasticsearch.xpack.indexlifecycle.action.RestPutLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.RestRetryAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportSetPolicyForIndexAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportDeleteLifcycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportDeleteLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportExplainLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportGetLifecycleAction;
import org.elasticsearch.xpack.indexlifecycle.action.TransportMoveToStepAction;
Expand Down Expand Up @@ -124,7 +124,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
return emptyList();
}
indexLifecycleInitialisationService
.set(new IndexLifecycleService(settings, client, clusterService, getClock(), threadPool, System::currentTimeMillis));
.set(new IndexLifecycleService(settings, client, clusterService, getClock(), System::currentTimeMillis));
return Collections.singletonList(indexLifecycleInitialisationService.get());
}

Expand Down Expand Up @@ -164,7 +164,7 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
return Arrays.asList(
new ActionHandler<>(PutLifecycleAction.INSTANCE, TransportPutLifecycleAction.class),
new ActionHandler<>(GetLifecycleAction.INSTANCE, TransportGetLifecycleAction.class),
new ActionHandler<>(DeleteLifecycleAction.INSTANCE, TransportDeleteLifcycleAction.class),
new ActionHandler<>(DeleteLifecycleAction.INSTANCE, TransportDeleteLifecycleAction.class),
new ActionHandler<>(ExplainLifecycleAction.INSTANCE, TransportExplainLifecycleAction.class),
new ActionHandler<>(SetPolicyForIndexAction.INSTANCE, TransportSetPolicyForIndexAction.class),
new ActionHandler<>(MoveToStepAction.INSTANCE, TransportMoveToStepAction.class),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,12 @@
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateApplier;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.logging.ESLoggerFactory;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.indexlifecycle.IndexLifecycleMetadata;
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleSettings;
Expand All @@ -29,7 +26,6 @@

import java.io.Closeable;
import java.time.Clock;
import java.util.Collections;
import java.util.function.LongSupplier;

/**
Expand All @@ -44,18 +40,15 @@ public class IndexLifecycleService extends AbstractComponent
private final PolicyStepsRegistry policyRegistry;
private Client client;
private ClusterService clusterService;
private ThreadPool threadPool;
private LongSupplier nowSupplier;
private SchedulerEngine.Job scheduledJob;
private IndexLifecycleRunner lifecycleRunner;

public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock,
ThreadPool threadPool, LongSupplier nowSupplier) {
public IndexLifecycleService(Settings settings, Client client, ClusterService clusterService, Clock clock, LongSupplier nowSupplier) {
super(settings);
this.client = client;
this.clusterService = clusterService;
this.clock = clock;
this.threadPool = threadPool;
this.nowSupplier = nowSupplier;
this.scheduledJob = null;
this.policyRegistry = new PolicyStepsRegistry();
Expand Down Expand Up @@ -91,19 +84,17 @@ public PolicyStepsRegistry getPolicyRegistry() {

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (event.localNodeMaster()) { // only act if we are master, otherwise keep idle until elected
IndexLifecycleMetadata lifecycleMetadata = event.state().metaData().custom(IndexLifecycleMetadata.TYPE);
IndexLifecycleMetadata lifecycleMetadata = event.state().metaData().custom(IndexLifecycleMetadata.TYPE);
if (event.localNodeMaster() && lifecycleMetadata != null) {
TimeValue pollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING
.get(event.state().getMetaData().settings());
TimeValue previousPollInterval = LifecycleSettings.LIFECYCLE_POLL_INTERVAL_SETTING
.get(event.previousState().getMetaData().settings());

boolean pollIntervalSettingChanged = !pollInterval.equals(previousPollInterval);

if (lifecycleMetadata == null) { // no lifecycle metadata, install initial empty metadata state
lifecycleMetadata = new IndexLifecycleMetadata(Collections.emptySortedMap());
installMetadata(lifecycleMetadata);
} else if (scheduler.get() == null) { // metadata installed and scheduler should be kicked off. start your engines.

if (scheduler.get() == null) { // metadata installed and scheduler should be kicked off. start your engines.
scheduler.set(new SchedulerEngine(clock));
scheduler.get().register(this);
scheduleJob(pollInterval);
Expand Down Expand Up @@ -151,26 +142,7 @@ public void triggered(SchedulerEngine.Event event) {
}
}

private void installMetadata(IndexLifecycleMetadata lifecycleMetadata) {
threadPool.executor(ThreadPool.Names.GENERIC)
.execute(() -> clusterService.submitStateUpdateTask("install-index-lifecycle-metadata", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState.Builder builder = new ClusterState.Builder(currentState);
MetaData.Builder metadataBuilder = MetaData.builder(currentState.metaData());
metadataBuilder.putCustom(IndexLifecycleMetadata.TYPE, lifecycleMetadata);
builder.metaData(metadataBuilder.build());
return builder.build();
}

@Override
public void onFailure(String source, Exception e) {
logger.error("unable to install index lifecycle metadata", e);
}
}));
}

public void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) {
void triggerPolicies(ClusterState clusterState, boolean fromClusterStateChange) {
// loop through all indices in cluster state and filter for ones that are
// managed by the Index Lifecycle Service they have a index.lifecycle.name setting
// associated to a policy
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ Map<String, Map<Step.StepKey, Step>> getStepMap() {
@SuppressWarnings({ "unchecked", "rawtypes" })
public void update(ClusterState currentState, Client client, LongSupplier nowSupplier) {
IndexLifecycleMetadata meta = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
assert meta != null : "IndexLifecycleMetadata cannot be null when updating the policy steps registry";

Diff<Map<String, LifecyclePolicyMetadata>> diff = DiffableUtils.diff(lifecyclePolicyMap, meta.getPolicyMetadatas(),
DiffableUtils.getStringKeySerializer());
DiffableUtils.MapDiff<String, LifecyclePolicyMetadata, DiffableUtils.KeySerializer<String>> mapDiff = (DiffableUtils.MapDiff) diff;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@
import java.util.SortedMap;
import java.util.TreeMap;

public class TransportDeleteLifcycleAction extends TransportMasterNodeAction<Request, Response> {
public class TransportDeleteLifecycleAction extends TransportMasterNodeAction<Request, Response> {

@Inject
public TransportDeleteLifcycleAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
public TransportDeleteLifecycleAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, DeleteLifecycleAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
}
Expand Down Expand Up @@ -74,7 +75,8 @@ public ClusterState execute(ClusterState currentState) {
}
ClusterState.Builder newState = ClusterState.builder(currentState);
IndexLifecycleMetadata currentMetadata = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata.getPolicyMetadatas().containsKey(request.getPolicyName()) == false) {
if (currentMetadata == null
|| currentMetadata.getPolicyMetadatas().containsKey(request.getPolicyName()) == false) {
throw new ResourceNotFoundException("Lifecycle policy not found: {}", request.getPolicyName());
}
SortedMap<String, LifecyclePolicyMetadata> newPolicies = new TreeMap<>(currentMetadata.getPolicyMetadatas());
Expand All @@ -91,4 +93,4 @@ public ClusterState execute(ClusterState currentState) {
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import java.util.TreeMap;
import java.util.stream.Collectors;

/**
* This class is responsible for bootstrapping {@link IndexLifecycleMetadata} into the cluster-state, as well
* as adding the desired new policy to be inserted.
*/
public class TransportPutLifecycleAction extends TransportMasterNodeAction<Request, Response> {

@Inject
Expand Down Expand Up @@ -64,6 +68,9 @@ protected Response newResponse(boolean acknowledged) {
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState.Builder newState = ClusterState.builder(currentState);
IndexLifecycleMetadata currentMetadata = currentState.metaData().custom(IndexLifecycleMetadata.TYPE);
if (currentMetadata == null) { // first time using index-lifecycle feature, bootstrap metadata
currentMetadata = IndexLifecycleMetadata.EMPTY;
}
if (currentMetadata.getPolicyMetadatas().containsKey(request.getPolicy().getName())) {
throw new ResourceAlreadyExistsException("Lifecycle policy already exists: {}",
request.getPolicy().getName());
Expand All @@ -87,4 +94,4 @@ public ClusterState execute(ClusterState currentState) throws Exception {
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ protected void masterOperation(Request request, ClusterState state, ActionListen
public ClusterState execute(ClusterState currentState) throws Exception {
IndexLifecycleMetadata ilmMetadata = (IndexLifecycleMetadata) currentState.metaData()
.custom(IndexLifecycleMetadata.TYPE);

if (ilmMetadata == null) {
throw new ResourceNotFoundException("Policy does not exist [{}]", newPolicyName);
}

LifecyclePolicy newPolicy = ilmMetadata.getPolicies().get(newPolicyName);

if (newPolicy == null) {
Expand Down
Loading

0 comments on commit 80ab773

Please sign in to comment.