Skip to content

Commit

Permalink
Un-assign persistent tasks as nodes exit the cluster (#37656)
Browse files Browse the repository at this point in the history
PersistentTasksClusterService decides if a task should be reassigned by 
checking there is a node in the cluster with the same Id. If a node is 
restarted PersistentTasksClusterService may not observe the change and 
decide the task still has a valid assignment because the node's 
ephemeral Id is not used in that decision. This change un-assigns tasks
as the nodes in the cluster change.
  • Loading branch information
davidkyle authored Jan 22, 2019
1 parent 2286118 commit 3fad1ee
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -187,6 +188,7 @@ protected ClusterState.Builder becomeMasterAndTrimConflictingNodes(ClusterState
.blocks(currentState.blocks())
.removeGlobalBlock(DiscoverySettings.NO_MASTER_BLOCK_ID)).build();
logger.trace("becomeMasterAndTrimConflictingNodes: {}", tmpState.nodes());
tmpState = PersistentTasksCustomMetaData.deassociateDeadNodes(tmpState);
return ClusterState.builder(allocationService.deassociateDeadNodes(tmpState, false, "removed dead nodes on election"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.allocation.AllocationService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;

import java.util.List;

Expand Down Expand Up @@ -91,8 +92,9 @@ public ClusterTasksResult<Task> execute(final ClusterState currentState, final L

protected ClusterTasksResult<Task> getTaskClusterTasksResult(ClusterState currentState, List<Task> tasks,
ClusterState remainingNodesClusterState) {
ClusterState ptasksDeassociatedState = PersistentTasksCustomMetaData.deassociateDeadNodes(remainingNodesClusterState);
final ClusterTasksResult.Builder<Task> resultBuilder = ClusterTasksResult.<Task>builder().successes(tasks);
return resultBuilder.build(allocationService.deassociateDeadNodes(remainingNodesClusterState, true, describeTasks(tasks)));
return resultBuilder.build(allocationService.deassociateDeadNodes(ptasksDeassociatedState, true, describeTasks(tasks)));
}

// visible for testing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ private void deassociateDeadNodes(RoutingAllocation allocation) {
for (ShardRouting shardRouting : node.copyShards()) {
final IndexMetaData indexMetaData = allocation.metaData().getIndexSafe(shardRouting.index());
boolean delayed = INDEX_DELAYED_NODE_LEFT_TIMEOUT_SETTING.get(indexMetaData.getSettings()).nanos() > 0;
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left[" + node.nodeId() + "]",
UnassignedInfo unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.NODE_LEFT, "node_left [" + node.nodeId() + "]",
null, 0, allocation.getCurrentNanoTime(), System.currentTimeMillis(), delayed, AllocationStatus.NO_ATTEMPT);
allocation.routingNodes().failShard(logger, shardRouting, unassignedInfo, indexMetaData, allocation.changes());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public final class PersistentTasksCustomMetaData extends AbstractNamedDiffable<M

public static final String TYPE = "persistent_tasks";
private static final String API_CONTEXT = MetaData.XContentContext.API.toString();
static final Assignment LOST_NODE_ASSIGNMENT = new Assignment(null, "awaiting reassignment after node loss");

// TODO: Implement custom Diff for tasks
private final Map<String, PersistentTask<?>> tasks;
Expand Down Expand Up @@ -119,6 +120,11 @@ public PersistentTasksCustomMetaData(long lastAllocationId, Map<String, Persiste
new ParseField("allocation_id_on_last_status_update"));
}


public static PersistentTasksCustomMetaData getPersistentTasksCustomMetaData(ClusterState clusterState) {
return clusterState.getMetaData().custom(PersistentTasksCustomMetaData.TYPE);
}

/**
* Private builder used in XContent parser to build task-specific portion (params and state)
*/
Expand Down Expand Up @@ -209,6 +215,39 @@ public static <Params extends PersistentTaskParams> PersistentTask<Params> getTa
return null;
}

/**
* Unassign any persistent tasks executing on nodes that are no longer in
* the cluster. If the task's assigment has a non-null executor node and that
* node is no longer in the cluster then the assignment is set to
* {@link #LOST_NODE_ASSIGNMENT}
*
* @param clusterState The clusterstate
* @return If no changes the argument {@code clusterState} is returned else
* a copy with the modified tasks
*/
public static ClusterState deassociateDeadNodes(ClusterState clusterState) {
PersistentTasksCustomMetaData tasks = getPersistentTasksCustomMetaData(clusterState);
if (tasks == null) {
return clusterState;
}

PersistentTasksCustomMetaData.Builder taskBuilder = PersistentTasksCustomMetaData.builder(tasks);
for (PersistentTask<?> task : tasks.tasks()) {
if (task.getAssignment().getExecutorNode() != null &&
clusterState.nodes().nodeExists(task.getAssignment().getExecutorNode()) == false) {
taskBuilder.reassignTask(task.getId(), LOST_NODE_ASSIGNMENT);
}
}

if (taskBuilder.isChanged() == false) {
return clusterState;
}

MetaData.Builder metaDataBuilder = MetaData.builder(clusterState.metaData());
metaDataBuilder.putCustom(TYPE, taskBuilder.build());
return ClusterState.builder(clusterState).metaData(metaDataBuilder).build();
}

public static class Assignment {
@Nullable
private final String executorNode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.cluster.NamedDiff;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.metadata.MetaData.Custom;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.UUIDs;
import org.elasticsearch.common.bytes.BytesReference;
Expand All @@ -33,9 +37,11 @@
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry.Entry;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.XContentType;
Expand Down Expand Up @@ -65,6 +71,8 @@
import static org.elasticsearch.test.VersionUtils.getPreviousVersion;
import static org.elasticsearch.test.VersionUtils.randomVersionBetween;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;

public class PersistentTasksCustomMetaDataTests extends AbstractDiffableSerializationTestCase<Custom> {

Expand Down Expand Up @@ -307,6 +315,91 @@ public void testFeatureSerialization() throws IOException {
assertThat(read.taskMap().keySet(), equalTo(Collections.singleton("test_compatible")));
}

public void testDisassociateDeadNodes_givenNoPersistentTasks() {
ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests")).build();
ClusterState returnedState = PersistentTasksCustomMetaData.deassociateDeadNodes(originalState);
assertThat(originalState, sameInstance(returnedState));
}

public void testDisassociateDeadNodes_givenAssignedPersistentTask() {
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT))
.localNodeId("node1")
.masterNodeId("node1")
.build();

String taskName = "test/task";
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder()
.addTask("task-id", taskName, emptyTaskParams(taskName),
new PersistentTasksCustomMetaData.Assignment("node1", "test assignment"));

ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests"))
.nodes(nodes)
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
.build();
ClusterState returnedState = PersistentTasksCustomMetaData.deassociateDeadNodes(originalState);
assertThat(originalState, sameInstance(returnedState));

PersistentTasksCustomMetaData originalTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(originalState);
PersistentTasksCustomMetaData returnedTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(returnedState);
assertEquals(originalTasks, returnedTasks);
}

public void testDisassociateDeadNodes() {
DiscoveryNodes nodes = DiscoveryNodes.builder()
.add(new DiscoveryNode("node1", buildNewFakeTransportAddress(), Version.CURRENT))
.localNodeId("node1")
.masterNodeId("node1")
.build();

String taskName = "test/task";
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder()
.addTask("assigned-task", taskName, emptyTaskParams(taskName),
new PersistentTasksCustomMetaData.Assignment("node1", "test assignment"))
.addTask("task-on-deceased-node", taskName, emptyTaskParams(taskName),
new PersistentTasksCustomMetaData.Assignment("left-the-cluster", "test assignment"));

ClusterState originalState = ClusterState.builder(new ClusterName("persistent-tasks-tests"))
.nodes(nodes)
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()))
.build();
ClusterState returnedState = PersistentTasksCustomMetaData.deassociateDeadNodes(originalState);
assertThat(originalState, not(sameInstance(returnedState)));

PersistentTasksCustomMetaData originalTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(originalState);
PersistentTasksCustomMetaData returnedTasks = PersistentTasksCustomMetaData.getPersistentTasksCustomMetaData(returnedState);
assertNotEquals(originalTasks, returnedTasks);

assertEquals(originalTasks.getTask("assigned-task"), returnedTasks.getTask("assigned-task"));
assertNotEquals(originalTasks.getTask("task-on-deceased-node"), returnedTasks.getTask("task-on-deceased-node"));
assertEquals(PersistentTasksCustomMetaData.LOST_NODE_ASSIGNMENT, returnedTasks.getTask("task-on-deceased-node").getAssignment());
}

private PersistentTaskParams emptyTaskParams(String taskName) {
return new PersistentTaskParams() {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) {
return builder;
}

@Override
public void writeTo(StreamOutput out) {

}

@Override
public String getWriteableName() {
return taskName;
}

@Override
public Version getMinimalSupportedVersion() {
return Version.CURRENT;
}
};
}

private Assignment randomAssignment() {
if (randomBoolean()) {
if (randomBoolean()) {
Expand Down

0 comments on commit 3fad1ee

Please sign in to comment.