Skip to content

Commit

Permalink
Persistent Tasks: Remove unused stopped and removeOnCompletion flags (e…
Browse files Browse the repository at this point in the history
…lastic#853)

The stopped and removeOnCompletion flags are not currently used, this commit removes them for now to simplify things.
  • Loading branch information
imotov authored and martijnvg committed Jan 31, 2018
1 parent 37fad04 commit b142d7e
Show file tree
Hide file tree
Showing 9 changed files with 84 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,37 +69,27 @@ public static class Request extends MasterNodeRequest<Request> {

private PersistentTaskRequest request;

private boolean stopped;

private boolean removeOnCompletion = true;

public Request() {

}

public Request(String action, PersistentTaskRequest request) {
this.action = action;
this.request = request;
this.stopped = false;
this.removeOnCompletion = true;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
action = in.readString();
request = in.readNamedWriteable(PersistentTaskRequest.class);
stopped = in.readBoolean();
removeOnCompletion = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(action);
out.writeNamedWriteable(request);
out.writeBoolean(stopped);
out.writeBoolean(removeOnCompletion);
}

@Override
Expand All @@ -120,14 +110,12 @@ public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
Request request1 = (Request) o;
return Objects.equals(action, request1.action) &&
Objects.equals(request, request1.request) &&
removeOnCompletion == request1.removeOnCompletion &&
stopped == request1.stopped;
Objects.equals(request, request1.request);
}

@Override
public int hashCode() {
return Objects.hash(action, request, removeOnCompletion, stopped);
return Objects.hash(action, request);
}

public String getAction() {
Expand All @@ -146,21 +134,6 @@ public void setRequest(PersistentTaskRequest request) {
this.request = request;
}

public boolean isStopped() {
return stopped;
}

public void setStopped(boolean stopped) {
this.stopped = stopped;
}

public boolean shouldRemoveOnCompletion() {
return removeOnCompletion;
}

public void setRemoveOnCompletion(boolean removeOnCompletion) {
this.removeOnCompletion = removeOnCompletion;
}
}

public static class RequestBuilder extends MasterNodeOperationRequestBuilder<CreatePersistentTaskAction.Request,
Expand All @@ -180,21 +153,6 @@ public RequestBuilder setRequest(PersistentTaskRequest persistentTaskRequest) {
return this;
}

/**
* Indicates if the persistent task should be created in the stopped state. Defaults to false.
*/
public RequestBuilder setStopped(boolean stopped) {
request.setStopped(stopped);
return this;
}

/**
* Indicates if the persistent task record should be removed upon the first successful completion of the task. Defaults to true.
*/
public RequestBuilder setRemoveOnCompletion(boolean removeOnCompletion) {
request.setRemoveOnCompletion(removeOnCompletion);
return this;
}
}

public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {
Expand Down Expand Up @@ -235,7 +193,7 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)
@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.createPersistentTask(request.action, request.request, request.stopped, request.removeOnCompletion,
persistentTasksClusterService.createPersistentTask(request.action, request.request,
new ActionListener<Long>() {
@Override
public void onResponse(Long newTaskId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,15 @@ public PersistentTasksClusterService(Settings settings, PersistentTasksExecutorR
* @param request request
* @param listener the listener that will be called when task is started
*/
public <Request extends PersistentTaskRequest> void createPersistentTask(String action, Request request, boolean stopped,
boolean removeOnCompletion,
public <Request extends PersistentTaskRequest> void createPersistentTask(String action, Request request,
ActionListener<Long> listener) {
clusterService.submitStateUpdateTask("create persistent task", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
validate(action, clusterService.state(), request);
final Assignment assignment;
if (stopped) {
// the task is stopped no need to assign it anywhere
assignment = PersistentTasksCustomMetaData.FINISHED_TASK_ASSIGNMENT;
} else {
assignment = getAssignement(action, currentState, request);
}
return update(currentState, builder(currentState).addTask(action, request, stopped, removeOnCompletion, assignment));
assignment = getAssignement(action, currentState, request);
return update(currentState, builder(currentState).addTask(action, request, assignment));
}

@Override
Expand Down Expand Up @@ -319,11 +313,7 @@ static ClusterState reassignTasks(ClusterState currentState, Logger logger, Exec
logger.trace("ignoring task {} because assignment is the same {}", task.getId(), assignment);
}
} else {
if (task.isStopped()) {
logger.trace("ignoring task {} because it is stopped", task.getId());
} else {
logger.trace("ignoring task {} because it is still running", task.getId());
}
logger.trace("ignoring task {} because it is still running", task.getId());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ public PersistentTasksCustomMetaData(long currentId, Map<Long, PersistentTask<?>
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setId, new ParseField("id"));
PERSISTENT_TASK_PARSER.declareString(TaskBuilder::setTaskName, new ParseField("name"));
PERSISTENT_TASK_PARSER.declareLong(TaskBuilder::setAllocationId, new ParseField("allocation_id"));
PERSISTENT_TASK_PARSER.declareBoolean(TaskBuilder::setRemoveOnCompletion, new ParseField("remove_on_completion"));
PERSISTENT_TASK_PARSER.declareBoolean(TaskBuilder::setStopped, new ParseField("stopped"));
PERSISTENT_TASK_PARSER.declareNamedObjects(
(TaskBuilder<PersistentTaskRequest> taskBuilder, List<PersistentTaskRequest> objects) -> {
if (objects.size() != 1) {
Expand Down Expand Up @@ -240,38 +238,34 @@ public static class PersistentTask<Request extends PersistentTaskRequest> implem
private final long allocationId;
private final String taskName;
private final Request request;
private final boolean stopped;
private final boolean removeOnCompletion;
@Nullable
private final Status status;
private final Assignment assignment;
@Nullable
private final Long allocationIdOnLastStatusUpdate;


public PersistentTask(long id, String taskName, Request request, boolean stopped, boolean removeOnCompletion, Assignment assignment) {
this(id, 0L, taskName, request, stopped, removeOnCompletion, null, assignment, null);
public PersistentTask(long id, String taskName, Request request, Assignment assignment) {
this(id, 0L, taskName, request, null, assignment, null);
}

public PersistentTask(PersistentTask<Request> task, boolean stopped, Assignment assignment) {
this(task.id, task.allocationId + 1L, task.taskName, task.request, stopped, task.removeOnCompletion, task.status,
public PersistentTask(PersistentTask<Request> task, Assignment assignment) {
this(task.id, task.allocationId + 1L, task.taskName, task.request, task.status,
assignment, task.allocationId);
}

public PersistentTask(PersistentTask<Request> task, Status status) {
this(task.id, task.allocationId, task.taskName, task.request, task.stopped, task.removeOnCompletion, status,
this(task.id, task.allocationId, task.taskName, task.request, status,
task.assignment, task.allocationId);
}

private PersistentTask(long id, long allocationId, String taskName, Request request, boolean stopped, boolean removeOnCompletion,
private PersistentTask(long id, long allocationId, String taskName, Request request,
Status status, Assignment assignment, Long allocationIdOnLastStatusUpdate) {
this.id = id;
this.allocationId = allocationId;
this.taskName = taskName;
this.request = request;
this.status = status;
this.stopped = stopped;
this.removeOnCompletion = removeOnCompletion;
this.assignment = assignment;
this.allocationIdOnLastStatusUpdate = allocationIdOnLastStatusUpdate;
// Update parent request for starting tasks with correct parent task ID
Expand All @@ -284,8 +278,6 @@ private PersistentTask(StreamInput in) throws IOException {
allocationId = in.readLong();
taskName = in.readString();
request = (Request) in.readNamedWriteable(PersistentTaskRequest.class);
stopped = in.readBoolean();
removeOnCompletion = in.readBoolean();
status = in.readOptionalNamedWriteable(Task.Status.class);
assignment = new Assignment(in.readOptionalString(), in.readString());
allocationIdOnLastStatusUpdate = in.readOptionalLong();
Expand All @@ -297,8 +289,6 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeLong(allocationId);
out.writeString(taskName);
out.writeNamedWriteable(request);
out.writeBoolean(stopped);
out.writeBoolean(removeOnCompletion);
out.writeOptionalNamedWriteable(status);
out.writeOptionalString(assignment.executorNode);
out.writeString(assignment.explanation);
Expand All @@ -314,16 +304,14 @@ public boolean equals(Object o) {
allocationId == that.allocationId &&
Objects.equals(taskName, that.taskName) &&
Objects.equals(request, that.request) &&
stopped == that.stopped &&
removeOnCompletion == that.removeOnCompletion &&
Objects.equals(status, that.status) &&
Objects.equals(assignment, that.assignment) &&
Objects.equals(allocationIdOnLastStatusUpdate, that.allocationIdOnLastStatusUpdate);
}

@Override
public int hashCode() {
return Objects.hash(id, allocationId, taskName, request, stopped, removeOnCompletion, status, assignment,
return Objects.hash(id, allocationId, taskName, request, status, assignment,
allocationIdOnLastStatusUpdate);
}

Expand Down Expand Up @@ -365,22 +353,14 @@ public boolean isAssigned() {
* Returns true if the tasks is not stopped and unassigned or assigned to a non-existing node.
*/
public boolean needsReassignment(DiscoveryNodes nodes) {
return isStopped() == false && (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false);
return (assignment.isAssigned() == false || nodes.nodeExists(assignment.getExecutorNode()) == false);
}

@Nullable
public Status getStatus() {
return status;
}

public boolean isStopped() {
return stopped;
}

public boolean shouldRemoveOnCompletion() {
return removeOnCompletion;
}

/**
* @return Whether the task status isn't stale. When a task gets unassigned from the executor node or assigned
* to a new executor node and the status hasn't been updated then the task status is stale.
Expand Down Expand Up @@ -421,8 +401,6 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field("allocation_id_on_last_status_update", allocationIdOnLastStatusUpdate);
}
}
builder.field("stopped", stopped);
builder.field("remove_on_completion", removeOnCompletion);
}
builder.endObject();
return builder;
Expand All @@ -439,8 +417,6 @@ private static class TaskBuilder<Request extends PersistentTaskRequest> {
private long allocationId;
private String taskName;
private Request request;
private boolean stopped = true;
private boolean removeOnCompletion;
private Status status;
private Assignment assignment = INITIAL_ASSIGNMENT;
private Long allocationIdOnLastStatusUpdate;
Expand Down Expand Up @@ -471,16 +447,6 @@ public TaskBuilder<Request> setStatus(Status status) {
}


public TaskBuilder<Request> setStopped(boolean stopped) {
this.stopped = stopped;
return this;
}

public TaskBuilder<Request> setRemoveOnCompletion(boolean removeOnCompletion) {
this.removeOnCompletion = removeOnCompletion;
return this;
}

public TaskBuilder<Request> setAssignment(Assignment assignment) {
this.assignment = assignment;
return this;
Expand All @@ -492,7 +458,7 @@ public TaskBuilder<Request> setAllocationIdOnLastStatusUpdate(Long allocationIdO
}

public PersistentTask<Request> build() {
return new PersistentTask<>(id, allocationId, taskName, request, stopped, removeOnCompletion, status,
return new PersistentTask<>(id, allocationId, taskName, request, status,
assignment, allocationIdOnLastStatusUpdate);
}
}
Expand Down Expand Up @@ -578,11 +544,10 @@ private <Request extends PersistentTaskRequest> Builder setTasks(List<TaskBuilde
* <p>
* After the task is added its id can be found by calling {{@link #getCurrentId()}} method.
*/
public <Request extends PersistentTaskRequest> Builder addTask(String taskName, Request request, boolean stopped,
boolean removeOnCompletion, Assignment assignment) {
public <Request extends PersistentTaskRequest> Builder addTask(String taskName, Request request, Assignment assignment) {
changed = true;
currentId++;
tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, stopped, removeOnCompletion, assignment));
tasks.put(currentId, new PersistentTask<>(currentId, taskName, request, assignment));
return this;
}

Expand All @@ -593,7 +558,7 @@ public Builder reassignTask(long taskId, Assignment assignment) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment));
}
return this;
}
Expand All @@ -609,9 +574,9 @@ public <Request extends PersistentTaskRequest> Builder assignTask(long taskId,
PersistentTask<Request> taskInProgress = (PersistentTask<Request>) tasks.get(taskId);
if (taskInProgress != null && taskInProgress.assignment.isAssigned() == false) { // only assign unassigned tasks
Assignment assignment = executorNodeFunc.apply(taskInProgress.taskName, taskInProgress.request);
if (assignment.isAssigned() || taskInProgress.isStopped()) {
if (assignment.isAssigned()) {
changed = true;
tasks.put(taskId, new PersistentTask<>(taskInProgress, false, assignment));
tasks.put(taskId, new PersistentTask<>(taskInProgress, assignment));
}
}
return this;
Expand Down Expand Up @@ -649,11 +614,7 @@ public Builder finishTask(long taskId) {
PersistentTask<?> taskInProgress = tasks.get(taskId);
if (taskInProgress != null) {
changed = true;
if (taskInProgress.removeOnCompletion) {
tasks.remove(taskId);
} else {
tasks.put(taskId, new PersistentTask<>(taskInProgress, true, FINISHED_TASK_ASSIGNMENT));
}
tasks.remove(taskId);
}
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,27 +42,14 @@ public PersistentTasksService(Settings settings, ClusterService clusterService,
this.clusterService = clusterService;
}

/**
* Creates the specified persistent action and tries to start it immediately, upon completion the task is
* removed from the cluster state
*/
public <Request extends PersistentTaskRequest> void createPersistentActionTask(String action, Request request,
PersistentTaskOperationListener listener) {
createPersistentActionTask(action, request, false, true, listener);
}

/**
* Creates the specified persistent action. The action is started unless the stopped parameter is equal to true.
* If removeOnCompletion parameter is equal to true, the task is removed from the cluster state upon completion.
* Otherwise it will remain there in the stopped state.
*/
public <Request extends PersistentTaskRequest> void createPersistentActionTask(String action, Request request,
boolean stopped,
boolean removeOnCompletion,
PersistentTaskOperationListener listener) {
CreatePersistentTaskAction.Request createPersistentActionRequest = new CreatePersistentTaskAction.Request(action, request);
createPersistentActionRequest.setStopped(stopped);
createPersistentActionRequest.setRemoveOnCompletion(removeOnCompletion);
try {
client.execute(CreatePersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
o -> listener.onResponse(o.getTaskId()), listener::onFailure));
Expand Down
Loading

0 comments on commit b142d7e

Please sign in to comment.