Skip to content

Commit

Permalink
Make persistent tasks work.
Browse files Browse the repository at this point in the history
Made persistent tasks executors pluggable.
  • Loading branch information
martijnvg committed Jan 31, 2018
1 parent 07e727c commit 592eedb
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 22 deletions.
11 changes: 10 additions & 1 deletion server/src/main/java/org/elasticsearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,12 @@
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.persistent.RemovePersistentTaskAction;
import org.elasticsearch.persistent.StartPersistentTaskAction;
import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -507,6 +510,12 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);

// Persistent tasks:
actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class);
actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class);
actions.register(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class);
actions.register(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class);

return unmodifiableMap(actions.getRegistry());
}

Expand Down
19 changes: 19 additions & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.RepositoryPlugin;
Expand All @@ -139,6 +140,10 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
import org.elasticsearch.persistent.PersistentTasksService;

import java.io.BufferedWriter;
import java.io.Closeable;
Expand Down Expand Up @@ -461,6 +466,17 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
responseCollectorService);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
.filterPlugins(PersistentTaskPlugin.class).stream()
.map(p -> p.getPersistentTasksExecutor(clusterService))
.flatMap(List::stream)
.collect(toList());

final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);
final PersistentTasksClusterService persistentTasksClusterService =
new PersistentTasksClusterService(settings, registry, clusterService);
final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);

modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
Expand Down Expand Up @@ -504,6 +520,9 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
}
httpBind.accept(b);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
}
);
injector = modules.createInjector();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.plugins;

import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.persistent.PersistentTasksExecutor;

import java.util.Collections;
import java.util.List;

/**
* Plugin for registering persistent tasks executors.
*/
public interface PersistentTaskPlugin {

/**
* Returns additional persistent tasks executors added by this plugin.
*/
default List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService) {
return Collections.emptyList();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.plugins.ActionPlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -87,33 +88,16 @@
/**
* A plugin that adds a test persistent task.
*/
public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin {
public class TestPersistentTasksPlugin extends Plugin implements ActionPlugin, PersistentTaskPlugin {

@Override
public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
return Arrays.asList(
new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class),
new ActionHandler<>(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class),
new ActionHandler<>(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class),
new ActionHandler<>(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class),
new ActionHandler<>(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class)
);
return Collections.singletonList(new ActionHandler<>(TestTaskAction.INSTANCE, TransportTestTaskAction.class));
}

@Override
public Collection<Object> createComponents(Client client, ClusterService clusterService, ThreadPool threadPool,
ResourceWatcherService resourceWatcherService, ScriptService scriptService,
NamedXContentRegistry xContentRegistry, Environment environment,
NodeEnvironment nodeEnvironment, NamedWriteableRegistry namedWriteableRegistry) {
PersistentTasksService persistentTasksService = new PersistentTasksService(Settings.EMPTY, clusterService, threadPool, client);
TestPersistentTasksExecutor testPersistentAction = new TestPersistentTasksExecutor(Settings.EMPTY, clusterService);
PersistentTasksExecutorRegistry persistentTasksExecutorRegistry = new PersistentTasksExecutorRegistry(Settings.EMPTY,
Collections.singletonList(testPersistentAction));
return Arrays.asList(
persistentTasksService,
persistentTasksExecutorRegistry,
new PersistentTasksClusterService(Settings.EMPTY, persistentTasksExecutorRegistry, clusterService)
);
public List<PersistentTasksExecutor<?>> getPersistentTasksExecutor(ClusterService clusterService) {
return Collections.singletonList(new TestPersistentTasksExecutor(Settings.EMPTY, clusterService));
}

@Override
Expand Down

0 comments on commit 592eedb

Please sign in to comment.