Skip to content

Commit

Permalink
Remove InternalClient and InternalSecurityClient (#3054)
Browse files Browse the repository at this point in the history
This change removes the InternalClient and the InternalSecurityClient. These are replaced with
usage of the ThreadContext and a transient value, `action.origin`, to indicate which component the
request came from. The security code has been updated to look for this value and ensure the
request is executed as the proper user. This work comes from #2808 where @s1monw suggested
that we do this.

While working on this, I came across index template registries and rather than updating them to use
the new method, I replaced the ML one with the template upgrade framework so that we could
remove this template registry. The watcher template registry is still needed as the template must be
updated for rolling upgrades to work (see #2950).
  • Loading branch information
jaymode authored and martijnvg committed Feb 5, 2018
1 parent fb9dba6 commit 0d8c9da
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateObserver;
import org.elasticsearch.cluster.node.DiscoveryNode;
Expand All @@ -34,20 +35,22 @@
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.security.InternalClient;

import java.util.function.Predicate;

import static org.elasticsearch.ClientHelper.PERSISTENT_TASK_ORIGIN;
import static org.elasticsearch.ClientHelper.executeAsyncWithOrigin;

/**
* This service is used by persistent actions to propagate changes in the action state and notify about completion
*/
public class PersistentTasksService extends AbstractComponent {

private final InternalClient client;
private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;

public PersistentTasksService(Settings settings, ClusterService clusterService, ThreadPool threadPool, InternalClient client) {
public PersistentTasksService(Settings settings, ClusterService clusterService, ThreadPool threadPool, Client client) {
super(settings);
this.client = client;
this.clusterService = clusterService;
Expand All @@ -63,8 +66,8 @@ public <Params extends PersistentTaskParams> void startPersistentTask(String tas
StartPersistentTaskAction.Request createPersistentActionRequest =
new StartPersistentTaskAction.Request(taskId, taskName, params);
try {
client.execute(StartPersistentTaskAction.INSTANCE, createPersistentActionRequest, ActionListener.wrap(
o -> listener.onResponse((PersistentTask<Params>) o.getTask()), listener::onFailure));
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, StartPersistentTaskAction.INSTANCE, createPersistentActionRequest,
ActionListener.wrap(o -> listener.onResponse((PersistentTask<Params>) o.getTask()), listener::onFailure));
} catch (Exception e) {
listener.onFailure(e);
}
Expand All @@ -77,7 +80,7 @@ public void sendCompletionNotification(String taskId, long allocationId, Excepti
ActionListener<PersistentTask<?>> listener) {
CompletionPersistentTaskAction.Request restartRequest = new CompletionPersistentTaskAction.Request(taskId, allocationId, failure);
try {
client.execute(CompletionPersistentTaskAction.INSTANCE, restartRequest,
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, CompletionPersistentTaskAction.INSTANCE, restartRequest,
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
} catch (Exception e) {
listener.onFailure(e);
Expand All @@ -93,7 +96,8 @@ void sendTaskManagerCancellation(long taskId, ActionListener<CancelTasksResponse
cancelTasksRequest.setTaskId(new TaskId(localNode.getId(), taskId));
cancelTasksRequest.setReason("persistent action was removed");
try {
client.admin().cluster().cancelTasks(cancelTasksRequest, listener);
executeAsyncWithOrigin(client.threadPool().getThreadContext(), PERSISTENT_TASK_ORIGIN, cancelTasksRequest, listener,
client.admin().cluster()::cancelTasks);
} catch (Exception e) {
listener.onFailure(e);
}
Expand All @@ -109,8 +113,8 @@ void updateStatus(String taskId, long allocationId, Task.Status status, ActionLi
UpdatePersistentTaskStatusAction.Request updateStatusRequest =
new UpdatePersistentTaskStatusAction.Request(taskId, allocationId, status);
try {
client.execute(UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest, ActionListener.wrap(
o -> listener.onResponse(o.getTask()), listener::onFailure));
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, UpdatePersistentTaskStatusAction.INSTANCE, updateStatusRequest,
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
} catch (Exception e) {
listener.onFailure(e);
}
Expand All @@ -122,8 +126,8 @@ void updateStatus(String taskId, long allocationId, Task.Status status, ActionLi
public void cancelPersistentTask(String taskId, ActionListener<PersistentTask<?>> listener) {
RemovePersistentTaskAction.Request removeRequest = new RemovePersistentTaskAction.Request(taskId);
try {
client.execute(RemovePersistentTaskAction.INSTANCE, removeRequest, ActionListener.wrap(o -> listener.onResponse(o.getTask()),
listener::onFailure));
executeAsyncWithOrigin(client, PERSISTENT_TASK_ORIGIN, RemovePersistentTaskAction.INSTANCE, removeRequest,
ActionListener.wrap(o -> listener.onResponse(o.getTask()), listener::onFailure));
} catch (Exception e) {
listener.onFailure(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.security.InternalClient;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -105,8 +104,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
InternalClient internalClient = new InternalClient(Settings.EMPTY, threadPool, client);
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, internalClient);
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, client);
TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, clusterService);
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY,
Collections.singletonList(testPersistentAction));
Expand Down

0 comments on commit 0d8c9da

Please sign in to comment.