Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: Require all actions have a Task #31627

Merged
merged 4 commits into from
Jun 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,19 @@ public final Task execute(Request request, ActionListener<Response> listener) {
* this method.
*/
Task task = taskManager.register("transport", actionName, request);
if (task == null) {
execute(null, request, listener);
} else {
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}
execute(task, request, new ActionListener<Response>() {
@Override
public void onResponse(Response response) {
taskManager.unregister(task);
listener.onResponse(response);
}

@Override
public void onFailure(Exception e) {
taskManager.unregister(task);
listener.onFailure(e);
}
});
}
@Override
public void onFailure(Exception e) {
taskManager.unregister(task);
listener.onFailure(e);
}
});
return task;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,6 @@ default void setParentTask(String parentTaskNode, long parentTaskId) {

/**
* Returns the task object that should be used to keep track of the processing of the request.
*
* A request can override this method and return null to avoid being tracked by the task
* manager.
*/
default Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
return new Task(id, type, action, getDescription(), parentTaskId, headers);
Expand Down
7 changes: 2 additions & 5 deletions server/src/main/java/org/elasticsearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
Expand Down Expand Up @@ -91,8 +92,6 @@ public void setTaskResultsService(TaskResultsService taskResultsService) {

/**
* Registers a task without parent task
* <p>
* Returns the task manager tracked task or null if the task doesn't support the task manager
*/
public Task register(String type, String action, TaskAwareRequest request) {
Map<String, String> headers = new HashMap<>();
Expand All @@ -110,9 +109,7 @@ public Task register(String type, String action, TaskAwareRequest request) {
}
}
Task task = request.createTask(taskIdGenerator.incrementAndGet(), type, action, request.getParentTask(), headers);
if (task == null) {
return null;
}
Objects.requireNonNull(task);
assert task.getParentTaskId().equals(request.getParentTask()) : "Request [ " + request + "] didn't preserve it parentTaskId";
if (logger.isTraceEnabled()) {
logger.trace("register {} [{}] [{}] [{}]", task.getId(), type, action, task.getDescription());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,13 @@ public Request newRequest(StreamInput in) throws IOException {

public void processMessageReceived(Request request, TransportChannel channel) throws Exception {
final Task task = taskManager.register(channel.getChannelType(), action, request);
if (task == null) {
handler.messageReceived(request, channel, null);
} else {
boolean success = false;
try {
handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task);
success = true;
} finally {
if (success == false) {
taskManager.unregister(task);
}
boolean success = false;
try {
handler.messageReceived(request, new TaskTransportChannel(taskManager, task, channel), task);
success = true;
} finally {
if (success == false) {
taskManager.unregister(task);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ public class TransportTasksActionTests extends TaskManagerTestCase {

public static class NodeRequest extends BaseNodeRequest {
protected String requestName;
private boolean enableTaskManager;

public NodeRequest() {
super();
Expand All @@ -88,82 +87,63 @@ public NodeRequest() {
public NodeRequest(NodesRequest request, String nodeId) {
super(nodeId);
requestName = request.requestName;
enableTaskManager = request.enableTaskManager;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
enableTaskManager = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
out.writeBoolean(enableTaskManager);
}

@Override
public String getDescription() {
return "CancellableNodeRequest[" + requestName + ", " + enableTaskManager + "]";
return "CancellableNodeRequest[" + requestName + "]";
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
if (enableTaskManager) {
return super.createTask(id, type, action, parentTaskId, headers);
} else {
return null;
}
return super.createTask(id, type, action, parentTaskId, headers);
}
}

public static class NodesRequest extends BaseNodesRequest<NodesRequest> {
private String requestName;
private boolean enableTaskManager;

NodesRequest() {
super();
}

public NodesRequest(String requestName, String... nodesIds) {
this(requestName, true, nodesIds);
}

public NodesRequest(String requestName, boolean enableTaskManager, String... nodesIds) {
super(nodesIds);
this.requestName = requestName;
this.enableTaskManager = enableTaskManager;
}

@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
requestName = in.readString();
enableTaskManager = in.readBoolean();
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeString(requestName);
out.writeBoolean(enableTaskManager);
}

@Override
public String getDescription() {
return "CancellableNodesRequest[" + requestName + ", " + enableTaskManager + "]";
return "CancellableNodesRequest[" + requestName + "]";
}

@Override
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
if (enableTaskManager) {
return super.createTask(id, type, action, parentTaskId, headers);
} else {
return null;
}
return super.createTask(id, type, action, parentTaskId, headers);
}
}

Expand Down Expand Up @@ -400,7 +380,7 @@ public void onFailure(Exception e) {
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertEquals("CancellableNodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
assertEquals("CancellableNodeRequest[Test Request]", entry.getValue().get(0).getDescription());
}

// Make sure that the main task on coordinating node is the task that was returned to us by execute()
Expand Down Expand Up @@ -455,27 +435,6 @@ public void testFindChildTasks() throws Exception {
assertEquals(0, responses.failureCount());
}

public void testTaskManagementOptOut() throws Exception {
setupTestNodes(Settings.EMPTY);
connectNodes(testNodes);
CountDownLatch checkLatch = new CountDownLatch(1);
// Starting actions that disable task manager
ActionFuture<NodesResponse> future = startBlockingTestNodesAction(checkLatch, new NodesRequest("Test Request", false));

TestNode testNode = testNodes[randomIntBetween(0, testNodes.length - 1)];

// Get the parent task
ListTasksRequest listTasksRequest = new ListTasksRequest();
listTasksRequest.setActions("testAction*");
ListTasksResponse response = ActionTestUtils.executeBlocking(testNode.transportListTasksAction, listTasksRequest);
assertEquals(0, response.getTasks().size());

// Release all tasks and wait for response
checkLatch.countDown();
NodesResponse responses = future.get();
assertEquals(0, responses.failureCount());
}

public void testTasksDescriptions() throws Exception {
long minimalStartTime = System.currentTimeMillis();
setupTestNodes(Settings.EMPTY);
Expand All @@ -502,7 +461,7 @@ public void testTasksDescriptions() throws Exception {
assertEquals(testNodes.length, response.getPerNodeTasks().size());
for (Map.Entry<String, List<TaskInfo>> entry : response.getPerNodeTasks().entrySet()) {
assertEquals(1, entry.getValue().size());
assertEquals("CancellableNodeRequest[Test Request, true]", entry.getValue().get(0).getDescription());
assertEquals("CancellableNodeRequest[Test Request]", entry.getValue().get(0).getDescription());
assertThat(entry.getValue().get(0).getStartTime(), greaterThanOrEqualTo(minimalStartTime));
assertThat(entry.getValue().get(0).getRunningTimeNanos(), greaterThanOrEqualTo(minimalDurationNanos));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,15 @@ public MockTaskManager(Settings settings, ThreadPool threadPool, Set<String> tas
@Override
public Task register(String type, String action, TaskAwareRequest request) {
Task task = super.register(type, action, request);
if (task != null) {
for (MockTaskManagerListener listener : listeners) {
try {
listener.onTaskRegistered(task);
} catch (Exception e) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to notify task manager listener about registering the task with id {}",
task.getId()),
e);
}
for (MockTaskManagerListener listener : listeners) {
try {
listener.onTaskRegistered(task);
} catch (Exception e) {
logger.warn(
(Supplier<?>) () -> new ParameterizedMessage(
"failed to notify task manager listener about registering the task with id {}",
task.getId()),
e);
}
}
return task;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.monitoring.action;

import java.util.concurrent.ExecutorService;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.support.ActionFilter;
Expand All @@ -28,6 +27,7 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.discovery.DiscoverySettings;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskAwareRequest;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.test.ESTestCase;
Expand All @@ -51,6 +51,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ExecutorService;

import static org.elasticsearch.Version.CURRENT;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void setUpMocks() {
filters = mock(ActionFilters.class);

when(transportService.getTaskManager()).thenReturn(taskManager);
when(taskManager.register(anyString(), eq(MonitoringBulkAction.NAME), any(TaskAwareRequest.class))).thenReturn(null);
when(taskManager.register(anyString(), eq(MonitoringBulkAction.NAME), any(TaskAwareRequest.class))).thenReturn(mock(Task.class));
when(filters.filters()).thenReturn(new ActionFilter[0]);
when(threadPool.executor(ThreadPool.Names.GENERIC)).thenReturn(executor);

Expand Down