Skip to content

Commit

Permalink
[ML] Fixes for stop datafeed edge cases
Browse files Browse the repository at this point in the history
The following edge cases were fixed:

1. A request to force-stop a stopping datafeed is no longer
   ignored.  Force-stop is an important recovery mechanism
   if normal stop doesn't work for some reason, and needs
   to operate on a datafeed in any state other than stopped.
2. If the node that a datafeed is running on is removed from
   the cluster during a normal stop then the stop request is
   retried (and will likely succeed on this retry by simply
   cancelling the persistent task for the affected datafeed).
3. If there are multiple simultaneous force-stop requests for
   the same datafeed we no longer fail the one that is
   processed second.  The previous behaviour was wrong as
   stopping a stopped datafeed is not an error, so stopping
   a datafeed twice simultaneously should not be either.

Fixes elastic#43670
Fixes elastic#48931
  • Loading branch information
droberts195 committed Nov 15, 2019
1 parent 5cf112e commit 5114f0b
Show file tree
Hide file tree
Showing 6 changed files with 256 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
} else {
mlController = new DummyController();
autodetectProcessFactory = (job, autodetectParams, executorService, onProcessCrash) ->
new BlackHoleAutodetectProcess(job.getId());
new BlackHoleAutodetectProcess(job.getId(), onProcessCrash);
// factor of 1.0 makes renormalization a no-op
normalizerProcessFactory = (jobId, quantilesState, bucketSpan, executorService) -> new MultiplyingNormalizerProcess(1.0);
analyticsProcessFactory = (jobId, analyticsProcessConfig, state, executorService, onProcessCrash) -> null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.discovery.MasterNotDiscoveredException;
import org.elasticsearch.persistent.PersistentTasksClusterService;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.tasks.Task;
Expand Down Expand Up @@ -85,6 +86,7 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId,
List<String> startedDatafeedIds,
List<String> stoppingDatafeedIds) {
switch (datafeedState) {
case STARTING:
case STARTED:
startedDatafeedIds.add(datafeedId);
break;
Expand All @@ -94,6 +96,7 @@ private static void addDatafeedTaskIdAccordingToState(String datafeedId,
stoppingDatafeedIds.add(datafeedId);
break;
default:
assert false : "Unexpected datafeed state " + datafeedState;
break;
}
}
Expand Down Expand Up @@ -126,9 +129,9 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
request.setResolvedStartedDatafeedIds(startedDatafeeds.toArray(new String[startedDatafeeds.size()]));

if (request.isForce()) {
forceStopDatafeed(request, listener, tasks, startedDatafeeds);
forceStopDatafeed(request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
} else {
normalStopDatafeed(task, request, listener, tasks, startedDatafeeds, stoppingDatafeeds);
normalStopDatafeed(task, request, listener, tasks, nodes, startedDatafeeds, stoppingDatafeeds);
}
},
listener::onFailure
Expand All @@ -137,20 +140,20 @@ protected void doExecute(Task task, StopDatafeedAction.Request request, ActionLi
}

private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, ActionListener<StopDatafeedAction.Response> listener,
PersistentTasksCustomMetaData tasks,
PersistentTasksCustomMetaData tasks, DiscoveryNodes nodes,
List<String> startedDatafeeds, List<String> stoppingDatafeeds) {
Set<String> executorNodes = new HashSet<>();
final Set<String> executorNodes = new HashSet<>();
for (String datafeedId : startedDatafeeds) {
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
if (datafeedTask == null) {
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
String msg = "Requested datafeed [" + datafeedId + "] be stopped, but datafeed's task could not be found.";
assert datafeedTask != null : msg;
logger.error(msg);
} else if (datafeedTask.isAssigned()) {
} else if (PersistentTasksClusterService.needsReassignment(datafeedTask.getAssignment(), nodes) == false) {
executorNodes.add(datafeedTask.getExecutorNode());
} else {
// This is the easy case - the datafeed is not currently assigned to a node,
// This is the easy case - the datafeed is not currently assigned to a valid node,
// so can be gracefully stopped simply by removing its persistent task. (Usually
// a graceful stop cannot be achieved by simply removing the persistent task, but
// if the datafeed has no running code then graceful/forceful are the same.)
Expand All @@ -171,48 +174,64 @@ private void normalStopDatafeed(Task task, StopDatafeedAction.Request request, A

ActionListener<StopDatafeedAction.Response> finalListener = ActionListener.wrap(
r -> waitForDatafeedStopped(allDataFeedsToWaitFor, request, r, listener),
listener::onFailure);
e -> {
if (ExceptionsHelper.unwrapCause(e) instanceof FailedNodeException) {
// A node has dropped out of the cluster since we started executing the requests.
// Since stopping an already stopped datafeed is not an error we can try again.
// The datafeeds that were running on the node that dropped out of the cluster
// will just have their persistent tasks cancelled. Datafeeds that were stopped
// by the previous attempt will be noops in the subsequent attempt.
doExecute(task, request, listener);
} else {
listener.onFailure(e);
}
});

super.doExecute(task, request, finalListener);
}

private void forceStopDatafeed(final StopDatafeedAction.Request request, final ActionListener<StopDatafeedAction.Response> listener,
PersistentTasksCustomMetaData tasks, final List<String> startedDatafeeds) {
PersistentTasksCustomMetaData tasks, final List<String> startedDatafeeds,
List<String> stoppingDatafeeds) {
final List<String> allDatafeeds = Stream.concat(startedDatafeeds.stream(), stoppingDatafeeds.stream()).collect(Collectors.toList());
final AtomicInteger counter = new AtomicInteger();
final AtomicArray<Exception> failures = new AtomicArray<>(startedDatafeeds.size());
final AtomicArray<Exception> failures = new AtomicArray<>(allDatafeeds.size());

for (String datafeedId : startedDatafeeds) {
for (String datafeedId : allDatafeeds) {
PersistentTasksCustomMetaData.PersistentTask<?> datafeedTask = MlTasks.getDatafeedTask(datafeedId, tasks);
if (datafeedTask != null) {
persistentTasksService.sendRemoveRequest(datafeedTask.getId(),
new ActionListener<PersistentTasksCustomMetaData.PersistentTask<?>>() {
@Override
public void onResponse(PersistentTasksCustomMetaData.PersistentTask<?> persistentTask) {
if (counter.incrementAndGet() == startedDatafeeds.size()) {
if (counter.incrementAndGet() == allDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}

@Override
public void onFailure(Exception e) {
final int slot = counter.incrementAndGet();
if ((ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException &&
Strings.isAllOrWildcard(new String[]{request.getDatafeedId()})) == false) {
// We validated that the datafeed names supplied in the request existed when we started processing the action.
// If the related tasks don't exist at this point then they must have been stopped by a simultaneous stop request.
// This is not an error.
if (ExceptionsHelper.unwrapCause(e) instanceof ResourceNotFoundException == false) {
failures.set(slot - 1, e);
}
if (slot == startedDatafeeds.size()) {
if (slot == allDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}
});
} else {
// This should not happen, because startedDatafeeds was derived from the same tasks that is passed to this method
// This should not happen, because startedDatafeeds and stoppingDatafeeds
// were derived from the same tasks that were passed to this method
String msg = "Requested datafeed [" + datafeedId + "] be force-stopped, but datafeed's task could not be found.";
assert datafeedTask != null : msg;
logger.error(msg);
final int slot = counter.incrementAndGet();
failures.set(slot - 1, new RuntimeException(msg));
if (slot == startedDatafeeds.size()) {
if (slot == allDatafeeds.size()) {
sendResponseOrFailure(request.getDatafeedId(), listener, failures);
}
}
Expand Down Expand Up @@ -313,7 +332,7 @@ protected StopDatafeedAction.Response newResponse(StopDatafeedAction.Request req
.convertToElastic(failedNodeExceptions.get(0));
} else {
// This can happen we the actual task in the node no longer exists,
// which means the datafeed(s) have already been closed.
// which means the datafeed(s) have already been stopped.
return new StopDatafeedAction.Response(true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@

import java.io.IOException;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

/**
* A placeholder class simulating the actions of the native Autodetect process.
Expand All @@ -37,16 +40,21 @@
*/
public class BlackHoleAutodetectProcess implements AutodetectProcess {

public static final String MAGIC_FAILURE_VALUE = "253402300799";
public static final String MAGIC_FAILURE_VALUE_AS_DATE = "9999-12-31 23:59:59";

private static final String FLUSH_ID = "flush-1";

private final String jobId;
private final ZonedDateTime startTime;
private final BlockingQueue<AutodetectResult> results = new LinkedBlockingDeque<>();
private final Consumer<String> onProcessCrash;
private volatile boolean open = true;

public BlackHoleAutodetectProcess(String jobId) {
public BlackHoleAutodetectProcess(String jobId, Consumer<String> onProcessCrash) {
this.jobId = jobId;
startTime = ZonedDateTime.now();
this.onProcessCrash = Objects.requireNonNull(onProcessCrash);
}

@Override
Expand All @@ -59,7 +67,13 @@ public boolean isReady() {
}

@Override
public void writeRecord(String[] record) throws IOException {
public void writeRecord(String[] record) {
if (Arrays.asList(record).contains(MAGIC_FAILURE_VALUE)) {
open = false;
onProcessCrash.accept("simulated failure");
AutodetectResult result = new AutodetectResult(null, null, null, null, null, null, null, null, null, null, null);
results.add(result);
}
}

@Override
Expand Down
Loading

0 comments on commit 5114f0b

Please sign in to comment.