Skip to content

Commit

Permalink
Make PersistentAction independent from TransportActions (#742)
Browse files Browse the repository at this point in the history
Removes the transport layer dependency from PersistentActions, makes PersistentActionRegistry immutable and rename actions into tasks in class and variable names.
  • Loading branch information
imotov authored and martijnvg committed Feb 5, 2018
1 parent af1108c commit 81e835c
Show file tree
Hide file tree
Showing 28 changed files with 820 additions and 825 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,36 @@ public int hashCode() {
}

public static class Response extends AcknowledgedResponse {
public Response() {
super();
}

public Response(boolean acknowledged) {
super(acknowledged);
}

@Override
public void readFrom(StreamInput in) throws IOException {
readAcknowledged(in);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
writeAcknowledged(out);
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
AcknowledgedResponse that = (AcknowledgedResponse) o;
return isAcknowledged() == that.isAcknowledged();
}

@Override
public int hashCode() {
return Objects.hash(isAcknowledged());
}

}

Expand All @@ -131,16 +161,16 @@ protected RequestBuilder(ElasticsearchClient client, CompletionPersistentTaskAct

public static class TransportAction extends TransportMasterNodeAction<Request, Response> {

private final PersistentTaskClusterService persistentTaskClusterService;
private final PersistentTasksClusterService persistentTasksClusterService;

@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
PersistentTaskClusterService persistentTaskClusterService,
PersistentTasksClusterService persistentTasksClusterService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, CompletionPersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.persistentTaskClusterService = persistentTaskClusterService;
this.persistentTasksClusterService = persistentTasksClusterService;
}

@Override
Expand All @@ -161,7 +191,7 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)

@Override
protected final void masterOperation(final Request request, ClusterState state, final ActionListener<Response> listener) {
persistentTaskClusterService.completeOrRestartPersistentTask(request.taskId, request.exception, new ActionListener<Empty>() {
persistentTasksClusterService.completeOrRestartPersistentTask(request.taskId, request.exception, new ActionListener<Empty>() {
@Override
public void onResponse(Empty empty) {
listener.onResponse(newResponse());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
* This action can be used to add the record for the persistent action to the cluster state.
*/
public class CreatePersistentTaskAction extends Action<CreatePersistentTaskAction.Request,
PersistentActionResponse,
PersistentTaskResponse,
CreatePersistentTaskAction.RequestBuilder> {

public static final CreatePersistentTaskAction INSTANCE = new CreatePersistentTaskAction();
Expand All @@ -59,15 +59,15 @@ public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
}

@Override
public PersistentActionResponse newResponse() {
return new PersistentActionResponse();
public PersistentTaskResponse newResponse() {
return new PersistentTaskResponse();
}

public static class Request extends MasterNodeRequest<Request> {

private String action;

private PersistentActionRequest request;
private PersistentTaskRequest request;

private boolean stopped;

Expand All @@ -77,7 +77,7 @@ public Request() {

}

public Request(String action, PersistentActionRequest request) {
public Request(String action, PersistentTaskRequest request) {
this.action = action;
this.request = request;
this.stopped = false;
Expand All @@ -88,7 +88,7 @@ public Request(String action, PersistentActionRequest request) {
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
action = in.readString();
request = in.readNamedWriteable(PersistentActionRequest.class);
request = in.readNamedWriteable(PersistentTaskRequest.class);
stopped = in.readBoolean();
removeOnCompletion = in.readBoolean();
}
Expand Down Expand Up @@ -138,11 +138,11 @@ public void setAction(String action) {
this.action = action;
}

public PersistentActionRequest getRequest() {
public PersistentTaskRequest getRequest() {
return request;
}

public void setRequest(PersistentActionRequest request) {
public void setRequest(PersistentTaskRequest request) {
this.request = request;
}

Expand All @@ -164,7 +164,7 @@ public void setRemoveOnCompletion(boolean removeOnCompletion) {
}

public static class RequestBuilder extends MasterNodeOperationRequestBuilder<CreatePersistentTaskAction.Request,
PersistentActionResponse, CreatePersistentTaskAction.RequestBuilder> {
PersistentTaskResponse, CreatePersistentTaskAction.RequestBuilder> {

protected RequestBuilder(ElasticsearchClient client, CreatePersistentTaskAction action) {
super(client, action, new Request());
Expand All @@ -175,8 +175,8 @@ public RequestBuilder setAction(String action) {
return this;
}

public RequestBuilder setRequest(PersistentActionRequest persistentActionRequest) {
request.setRequest(persistentActionRequest);
public RequestBuilder setRequest(PersistentTaskRequest persistentTaskRequest) {
request.setRequest(persistentTaskRequest);
return this;
}

Expand All @@ -197,23 +197,23 @@ public RequestBuilder setRemoveOnCompletion(boolean removeOnCompletion) {
}
}

public static class TransportAction extends TransportMasterNodeAction<Request, PersistentActionResponse> {
public static class TransportAction extends TransportMasterNodeAction<Request, PersistentTaskResponse> {

private final PersistentTaskClusterService persistentTaskClusterService;
private final PersistentTasksClusterService persistentTasksClusterService;

@Inject
public TransportAction(Settings settings, TransportService transportService, ClusterService clusterService,
ThreadPool threadPool, ActionFilters actionFilters,
PersistentTaskClusterService persistentTaskClusterService,
PersistentActionRegistry persistentActionRegistry,
PersistentActionService persistentActionService,
PersistentTasksClusterService persistentTasksClusterService,
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry,
PersistentTasksService persistentTasksService,
IndexNameExpressionResolver indexNameExpressionResolver) {
super(settings, CreatePersistentTaskAction.NAME, transportService, clusterService, threadPool, actionFilters,
indexNameExpressionResolver, Request::new);
this.persistentTaskClusterService = persistentTaskClusterService;
PersistentActionExecutor executor = new PersistentActionExecutor(threadPool);
clusterService.addListener(new PersistentActionCoordinator(settings, persistentActionService, persistentActionRegistry,
transportService.getTaskManager(), executor));
this.persistentTasksClusterService = persistentTasksClusterService;
NodePersistentTasksExecutor executor = new NodePersistentTasksExecutor(threadPool);
clusterService.addListener(new PersistentTasksNodeService(settings, persistentTasksService, persistentTasksExecutorRegistry,
transportService.getTaskManager(), threadPool, executor));
}

@Override
Expand All @@ -222,8 +222,8 @@ protected String executor() {
}

@Override
protected PersistentActionResponse newResponse() {
return new PersistentActionResponse();
protected PersistentTaskResponse newResponse() {
return new PersistentTaskResponse();
}

@Override
Expand All @@ -234,12 +234,12 @@ protected ClusterBlockException checkBlock(Request request, ClusterState state)

@Override
protected final void masterOperation(final Request request, ClusterState state,
final ActionListener<PersistentActionResponse> listener) {
persistentTaskClusterService.createPersistentTask(request.action, request.request, request.stopped, request.removeOnCompletion,
final ActionListener<PersistentTaskResponse> listener) {
persistentTasksClusterService.createPersistentTask(request.action, request.request, request.stopped, request.removeOnCompletion,
new ActionListener<Long>() {
@Override
public void onResponse(Long newTaskId) {
listener.onResponse(new PersistentActionResponse(newTaskId));
listener.onResponse(new PersistentTaskResponse(newTaskId));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@
import org.elasticsearch.transport.TransportResponse.Empty;

/**
* This component is responsible for execution of persistent actions.
* This component is responsible for execution of persistent tasks.
*/
public class PersistentActionExecutor {
public class NodePersistentTasksExecutor {
private final ThreadPool threadPool;

public PersistentActionExecutor(ThreadPool threadPool) {
public NodePersistentTasksExecutor(ThreadPool threadPool) {
this.threadPool = threadPool;
}

public <Request extends PersistentActionRequest> void executeAction(Request request,
NodePersistentTask task,
PersistentActionRegistry.PersistentActionHolder<Request> holder,
ActionListener<Empty> listener) {
threadPool.executor(holder.getExecutor()).execute(new AbstractRunnable() {
public <Request extends PersistentTaskRequest> void executeTask(Request request,
NodePersistentTask task,
PersistentTasksExecutor<Request> action,
ActionListener<Empty> listener) {
threadPool.executor(action.getExecutor()).execute(new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
listener.onFailure(e);
Expand All @@ -47,7 +47,7 @@ public void onFailure(Exception e) {
@Override
protected void doRun() throws Exception {
try {
holder.getPersistentAction().nodeOperation(task, request, listener);
action.nodeOperation(task, request, listener);
} catch (Exception ex) {
listener.onFailure(ex);
}
Expand Down

This file was deleted.

Loading

0 comments on commit 81e835c

Please sign in to comment.