Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move persistent tasks to core #28455

Merged
merged 50 commits into from
Feb 1, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
f136bfa
Adds support for persistent actions
imotov Jan 25, 2017
777b21f
Add a number of auxiliary methods to persistent tasks classes.
martijnvg Jan 30, 2017
ac67d02
Add support for task status on persistent tasks
imotov Feb 1, 2017
32e4061
Fix check style error after upgrade
davidkyle Feb 3, 2017
d340c19
Replace List with Map in PersistentTasksInProgress
imotov Feb 3, 2017
243b7e4
Moved job lifecycle over to persistent tasks
martijnvg Feb 8, 2017
16e661c
Make persistent task persist full cluster restart
imotov Feb 11, 2017
479429c
In order to keep track of restarted tasks, `allocationIdOnLastStatusU…
martijnvg Feb 17, 2017
5eeb480
Add persistent task assignment explanations.
imotov Feb 22, 2017
b33fc05
Request and Status in Persistent Tasks should be serialized using the…
imotov Feb 22, 2017
810d933
Simplify names of PersistentTasks-related classes
imotov Feb 27, 2017
9bd2441
Make PersistentAction independent from TransportActions (#742)
imotov Mar 22, 2017
19f39fd
Persistent Tasks: remove task restart on failure (#815)
imotov Mar 23, 2017
37fad04
Persistent Tasks: Merge NodePersistentTask and RunningPersistentTask …
imotov Mar 27, 2017
b142d7e
Persistent Tasks: Remove unused stopped and removeOnCompletion flags …
imotov Mar 29, 2017
78b844e
Check allocationIdOnLastStatusUpdate when trying to detect whether a …
martijnvg Mar 28, 2017
6ca0447
Persistent Tasks: Add waitForPersistentTaskStatus method (#901)
imotov Mar 31, 2017
1b0f5b9
Persistent Tasks: require correct allocation id for status updates (#…
imotov Apr 1, 2017
a5acb55
Use PersistentTasksService#waitForPersistentTaskStatus(...) to wait f…
martijnvg Apr 3, 2017
5b45b16
Persistent Tasks: check the current state in waitForPersistentTaskSta…
imotov Apr 4, 2017
97822db
Respond to rename random ASCII helper methods
jasontedor Apr 4, 2017
5a8512b
Persistent Tasks: refactor PersistentTasksService to use ActionListen…
imotov Apr 4, 2017
fab0dc4
Remove PersistentTask#isCurrentStatus() usages
martijnvg Apr 4, 2017
95c6005
Persistent Tasks: remove retries on notification failures (#977)
imotov Apr 10, 2017
0a1abd4
Persistent Tasks: remove listener from PersistentTasksExecutor#nodeOp…
imotov Apr 10, 2017
0a1f255
Added PersistentTasksService#waitForPersistentTasksStatus(...) method…
Apr 11, 2017
6bfea09
Persistent Tasks: switch from long task ids to string task ids (#1035)
imotov Apr 11, 2017
abd9ae3
Persistent Tasks: PersistentTaskRequest -> PersistTaskParams (#1057)
imotov Apr 12, 2017
4771965
Use task builder instead of creating persistent tasks directly.
martijnvg Apr 12, 2017
fc524bc
Persistent Tasks: force writeable name of params and status to be the…
imotov Apr 13, 2017
76cd7b1
Fixes compile errors in Eclipse due to generics
colings86 Apr 18, 2017
a08e2d9
Persistent tasks: require allocation id on task completion (#1107)
imotov Apr 19, 2017
44ea5d6
Separate publishing from applying cluster states
ywelsch Apr 3, 2017
e69317b
Don't call ClusterService.state() in a ClusterStateUpdateTask
ywelsch Apr 29, 2017
292e383
Fix static / version based BWC tests (#1456)
s1monw May 17, 2017
614aef2
Pass down the provided timeout.
May 22, 2017
1cef531
Always Accumulate Transport Exceptions (#1619)
pickypg Jun 2, 2017
0d50f9c
Call initialising constructor of BaseTasksRequest (#1771)
davidkyle Jun 19, 2017
ffdb05e
Persistent Tasks: remove unused isCurrentStatus method (#2076)
imotov Jul 26, 2017
b5f2813
Move tribe to a module (#2088)
ywelsch Jul 28, 2017
65ce227
Adapt to upstream changes made to AbstractStreamableXContentTestCase …
javanna Aug 2, 2017
b0de3c3
Moves more classes over to ToXContentObject/Fragment (#2283)
colings86 Aug 17, 2017
7313ad5
Make AllocatedPersistentTask members volatile (#2297)
droberts195 Aug 17, 2017
1c489ee
Refactor/to x content fragments2 (#2329)
colings86 Aug 23, 2017
4dd6995
Make the persistent task status available to PersistentTasksExecutor.…
martijnvg Oct 24, 2017
8521b2d
Remove InternalClient and InternalSecurityClient (#3054)
jaymode Nov 22, 2017
41071e4
Add adding ability to associate an ID with tasks.
imotov Jan 12, 2018
cc16f9d
Added AllocatedPersistentTask#waitForPersistentTaskStatus(...) that d…
droberts195 Jan 23, 2018
07e727c
Removed ClientHelper dependency from PersistentTasksService.
martijnvg Jan 25, 2018
592eedb
Make persistent tasks work.
martijnvg Jan 25, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion server/src/main/java/org/elasticsearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,12 @@
import org.elasticsearch.rest.action.search.RestMultiSearchAction;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.search.RestSearchScrollAction;
import org.elasticsearch.tasks.TaskManager;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.persistent.CompletionPersistentTaskAction;
import org.elasticsearch.persistent.RemovePersistentTaskAction;
import org.elasticsearch.persistent.StartPersistentTaskAction;
import org.elasticsearch.persistent.UpdatePersistentTaskStatusAction;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -507,6 +510,12 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg

actionPlugins.stream().flatMap(p -> p.getActions().stream()).forEach(actions::register);

// Persistent tasks:
actions.register(StartPersistentTaskAction.INSTANCE, StartPersistentTaskAction.TransportAction.class);
actions.register(UpdatePersistentTaskStatusAction.INSTANCE, UpdatePersistentTaskStatusAction.TransportAction.class);
actions.register(CompletionPersistentTaskAction.INSTANCE, CompletionPersistentTaskAction.TransportAction.class);
actions.register(RemovePersistentTaskAction.INSTANCE, RemovePersistentTaskAction.TransportAction.class);

return unmodifiableMap(actions.getRegistry());
}

Expand Down
19 changes: 19 additions & 0 deletions server/src/main/java/org/elasticsearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import org.elasticsearch.plugins.MapperPlugin;
import org.elasticsearch.plugins.MetaDataUpgrader;
import org.elasticsearch.plugins.NetworkPlugin;
import org.elasticsearch.plugins.PersistentTaskPlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.RepositoryPlugin;
Expand All @@ -139,6 +140,10 @@
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.usage.UsageService;
import org.elasticsearch.watcher.ResourceWatcherService;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksExecutor;
import org.elasticsearch.persistent.PersistentTasksExecutorRegistry;
import org.elasticsearch.persistent.PersistentTasksService;

import java.io.BufferedWriter;
import java.io.Closeable;
Expand Down Expand Up @@ -461,6 +466,17 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
threadPool, scriptModule.getScriptService(), bigArrays, searchModule.getFetchPhase(),
responseCollectorService);

final List<PersistentTasksExecutor<?>> tasksExecutors = pluginsService
.filterPlugins(PersistentTaskPlugin.class).stream()
.map(p -> p.getPersistentTasksExecutor(clusterService))
.flatMap(List::stream)
.collect(toList());

final PersistentTasksExecutorRegistry registry = new PersistentTasksExecutorRegistry(settings, tasksExecutors);
final PersistentTasksClusterService persistentTasksClusterService =
new PersistentTasksClusterService(settings, registry, clusterService);
final PersistentTasksService persistentTasksService = new PersistentTasksService(settings, clusterService, threadPool, client);

modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
Expand Down Expand Up @@ -504,6 +520,9 @@ protected Node(final Environment environment, Collection<Class<? extends Plugin>
}
httpBind.accept(b);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
}
);
injector = modules.createInjector();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.persistent;

import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.cluster.node.tasks.cancel.CancelTasksRequest;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskId;
import org.elasticsearch.tasks.TaskManager;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;

/**
* Represents a executor node operation that corresponds to a persistent task
*/
public class AllocatedPersistentTask extends CancellableTask {
private volatile String persistentTaskId;
private volatile long allocationId;

private final AtomicReference<State> state;
@Nullable
private volatile Exception failure;

private volatile PersistentTasksService persistentTasksService;
private volatile Logger logger;
private volatile TaskManager taskManager;


public AllocatedPersistentTask(long id, String type, String action, String description, TaskId parentTask,
Map<String, String> headers) {
super(id, type, action, description, parentTask, headers);
this.state = new AtomicReference<>(State.STARTED);
}

@Override
public boolean shouldCancelChildrenOnCancellation() {
return true;
}

// In case of persistent tasks we always need to return: `false`
// because in case of persistent task the parent task isn't a task in the task manager, but in cluster state.
// This instructs the task manager not to try to kill this persistent task when the task manager cannot find
// a fake parent node id "cluster" in the cluster state
@Override
public final boolean cancelOnParentLeaving() {
return false;
}

@Override
public Status getStatus() {
return new PersistentTasksNodeService.Status(state.get());
}

/**
* Updates the persistent state for the corresponding persistent task.
* <p>
* This doesn't affect the status of this allocated task.
*/
public void updatePersistentStatus(Task.Status status, ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>> listener) {
persistentTasksService.updateStatus(persistentTaskId, allocationId, status, listener);
}

public String getPersistentTaskId() {
return persistentTaskId;
}

void init(PersistentTasksService persistentTasksService, TaskManager taskManager, Logger logger, String persistentTaskId, long
allocationId) {
this.persistentTasksService = persistentTasksService;
this.logger = logger;
this.taskManager = taskManager;
this.persistentTaskId = persistentTaskId;
this.allocationId = allocationId;
}

public Exception getFailure() {
return failure;
}

boolean markAsCancelled() {
return state.compareAndSet(AllocatedPersistentTask.State.STARTED, AllocatedPersistentTask.State.PENDING_CANCEL);
}

public State getState() {
return state.get();
}

public long getAllocationId() {
return allocationId;
}

public enum State {
STARTED, // the task is currently running
PENDING_CANCEL, // the task is cancelled on master, cancelling it locally
COMPLETED // the task is done running and trying to notify caller
}

/**
* Waits for this persistent task to have the desired state.
*/
public void waitForPersistentTaskStatus(Predicate<PersistentTasksCustomMetaData.PersistentTask<?>> predicate,
@Nullable TimeValue timeout,
PersistentTasksService.WaitForPersistentTaskStatusListener<?> listener) {
persistentTasksService.waitForPersistentTaskStatus(persistentTaskId, predicate, timeout, listener);
}

public void markAsCompleted() {
completeAndNotifyIfNeeded(null);
}

public void markAsFailed(Exception e) {
if (CancelTasksRequest.DEFAULT_REASON.equals(getReasonCancelled())) {
completeAndNotifyIfNeeded(null);
} else {
completeAndNotifyIfNeeded(e);
}

}

private void completeAndNotifyIfNeeded(@Nullable Exception failure) {
State prevState = state.getAndSet(AllocatedPersistentTask.State.COMPLETED);
if (prevState == State.COMPLETED) {
logger.warn("attempt to complete task [{}] with id [{}] in the [{}] state", getAction(), getPersistentTaskId(), prevState);
} else {
if (failure != null) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage(
"task {} failed with an exception", getPersistentTaskId()), failure);
}
try {
this.failure = failure;
if (prevState == State.STARTED) {
logger.trace("sending notification for completed task [{}] with id [{}]", getAction(), getPersistentTaskId());
persistentTasksService.sendCompletionNotification(getPersistentTaskId(), getAllocationId(), failure, new
ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
logger.trace("notification for task [{}] with id [{}] was successful", getAction(),
getPersistentTaskId());
}

@Override
public void onFailure(Exception e) {
logger.warn((Supplier<?>) () ->
new ParameterizedMessage("notification for task [{}] with id [{}] failed",
getAction(), getPersistentTaskId()), e);
}
});
}
} finally {
taskManager.unregister(this);
}
}
}
}
Loading