Skip to content

Commit

Permalink
[ML] Fix calendar and filter updates from non-master nodes (#31804)
Browse files Browse the repository at this point in the history
Job updates or changes to calendars or filters may
result into updating the job process if it has been
running. To preserve the order of updates, process
updates are queued through the UpdateJobProcessNotifier
which is only running on the master node. All actions
performing such updates must run on the master node.

However, the CRUD actions for calendars and filters
are not master node actions. They have been submitting
the updates to the UpdateJobProcessNotifier even though
it might have not been running (given the action was
run on a non-master node). When that happens, the update
never reaches the process.

This commit fixes this problem by ensuring the notifier
runs on all nodes and by ensuring the process update action
gets the resources again before updating the process
(instead of having those resources passed in the request).

This ensures that even if the order of the updates
gets messed up, the latest update will read the latest
state of those resource and the process will get back
in sync.

This leaves us with 2 types of updates:

  1. updates to the job config should happen on the master
  node. This is because we cannot refetch the entire job
  and update it. We need to know the parts that have been changed.

  2. updates to resources the job uses. Those can be handled
  on non-master nodes but they should be re-fetched by the
  update process action.

Closes #31803
  • Loading branch information
dimitris-athanasiou committed Jul 5, 2018
1 parent 70531ec commit 268c614
Show file tree
Hide file tree
Showing 7 changed files with 212 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@
*/
package org.elasticsearch.xpack.ml.job;

import org.apache.logging.log4j.Logger;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.component.LifecycleListener;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
Expand All @@ -31,9 +32,26 @@
import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Request;
import static org.elasticsearch.xpack.core.ml.action.UpdateProcessAction.Response;

public class UpdateJobProcessNotifier extends AbstractComponent implements LocalNodeMasterListener {
/**
* This class serves as a queue for updates to the job process.
* Queueing is important for 2 reasons: first, it throttles the updates
* to the process, and second and most important, it preserves the order of the updates
* for actions that run on the master node. For preserving the order of the updates
* to the job config, it's necessary to handle the whole update chain on the master
* node. However, for updates to resources the job uses (e.g. calendars, filters),
* they can be handled on non-master nodes as long as the update process action
* is fetching the latest version of those resources from the index instead of
* using the version that existed while the handling action was at work. This makes
* sure that even if the order of updates gets reversed, the final process update
* will fetch the valid state of those external resources ensuring the process is
* in sync.
*/
public class UpdateJobProcessNotifier extends AbstractComponent {

private static final Logger LOGGER = Loggers.getLogger(UpdateJobProcessNotifier.class);

private final Client client;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final LinkedBlockingQueue<UpdateHolder> orderedJobUpdates = new LinkedBlockingQueue<>(1000);

Expand All @@ -42,9 +60,15 @@ public class UpdateJobProcessNotifier extends AbstractComponent implements Local
public UpdateJobProcessNotifier(Settings settings, Client client, ClusterService clusterService, ThreadPool threadPool) {
super(settings);
this.client = client;
this.clusterService = clusterService;
this.threadPool = threadPool;
clusterService.addLocalNodeMasterListener(this);
clusterService.addLifecycleListener(new LifecycleListener() {

@Override
public void beforeStart() {
start();
}

@Override
public void beforeStop() {
stop();
Expand All @@ -56,16 +80,6 @@ boolean submitJobUpdate(UpdateParams update, ActionListener<Boolean> listener) {
return orderedJobUpdates.offer(new UpdateHolder(update, listener));
}

@Override
public void onMaster() {
start();
}

@Override
public void offMaster() {
stop();
}

private void start() {
cancellable = threadPool.scheduleWithFixedDelay(this::processNextUpdate, TimeValue.timeValueSeconds(1), ThreadPool.Names.GENERIC);
}
Expand All @@ -79,12 +93,6 @@ private void stop() {
}
}

@Override
public String executorName() {
// SAME is ok here, because both start() and stop() are inexpensive:
return ThreadPool.Names.SAME;
}

private void processNextUpdate() {
List<UpdateHolder> updates = new ArrayList<>(orderedJobUpdates.size());
try {
Expand All @@ -101,6 +109,15 @@ void executeProcessUpdates(Iterator<UpdateHolder> updatesIterator) {
}
UpdateHolder updateHolder = updatesIterator.next();
UpdateParams update = updateHolder.update;

if (update.isJobUpdate() && clusterService.localNode().isMasterNode() == false) {
assert clusterService.localNode().isMasterNode();
LOGGER.error("Job update was submitted to non-master node [" + clusterService.nodeName() + "]; update for job ["
+ update.getJobId() + "] will be ignored");
executeProcessUpdates(updatesIterator);
return;
}

Request request = new Request(update.getJobId(), update.getModelPlotConfig(), update.getDetectorUpdates(), update.getFilter(),
update.isUpdateScheduledEvents());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,34 @@
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
import org.elasticsearch.xpack.core.ml.job.config.AnalysisConfig;
import org.elasticsearch.xpack.core.ml.job.config.CategorizationAnalyzerConfig;
import org.elasticsearch.xpack.core.ml.job.config.DataDescription;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobUpdate;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.job.categorization.CategorizationAnalyzer;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.CountingInputStream;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriter;
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.DataToProcessWriterFactory;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.time.Duration;
import java.time.ZonedDateTime;
import java.util.Collections;
import java.util.List;
import java.util.Locale;
import java.util.Optional;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -205,30 +203,29 @@ public void killProcess(boolean awaitCompletion, boolean finish) throws IOExcept
}
}

public void writeUpdateProcessMessage(UpdateParams updateParams, List<ScheduledEvent> scheduledEvents,
BiConsumer<Void, Exception> handler) {
public void writeUpdateProcessMessage(UpdateProcessMessage update, BiConsumer<Void, Exception> handler) {
submitOperation(() -> {
if (updateParams.getModelPlotConfig() != null) {
autodetectProcess.writeUpdateModelPlotMessage(updateParams.getModelPlotConfig());
if (update.getModelPlotConfig() != null) {
autodetectProcess.writeUpdateModelPlotMessage(update.getModelPlotConfig());
}

// Filters have to be written before detectors
if (updateParams.getFilter() != null) {
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(updateParams.getFilter()));
if (update.getFilter() != null) {
autodetectProcess.writeUpdateFiltersMessage(Collections.singletonList(update.getFilter()));
}

// Add detector rules
if (updateParams.getDetectorUpdates() != null) {
for (JobUpdate.DetectorUpdate update : updateParams.getDetectorUpdates()) {
if (update.getRules() != null) {
autodetectProcess.writeUpdateDetectorRulesMessage(update.getDetectorIndex(), update.getRules());
if (update.getDetectorUpdates() != null) {
for (JobUpdate.DetectorUpdate detectorUpdate : update.getDetectorUpdates()) {
if (detectorUpdate.getRules() != null) {
autodetectProcess.writeUpdateDetectorRulesMessage(detectorUpdate.getDetectorIndex(), detectorUpdate.getRules());
}
}
}

// Add scheduled events; null means there's no update but an empty list means we should clear any events in the process
if (scheduledEvents != null) {
autodetectProcess.writeUpdateScheduledEventsMessage(scheduledEvents, job.getAnalysisConfig().getBucketSpan());
if (update.getScheduledEvents() != null) {
autodetectProcess.writeUpdateScheduledEventsMessage(update.getScheduledEvents(), job.getAnalysisConfig().getBucketSpan());
}

return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
*/
package org.elasticsearch.xpack.ml.job.process.autodetect;

import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.core.internal.io.IOUtils;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.action.ActionListener;
Expand All @@ -22,35 +20,39 @@
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentElasticsearchExtension;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.action.GetFiltersAction;
import org.elasticsearch.xpack.core.ml.action.util.QueryPage;
import org.elasticsearch.xpack.core.ml.calendars.ScheduledEvent;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.core.ml.job.config.MlFilter;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.output.FlushAcknowledgement;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.DataCounts;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSizeStats;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.action.TransportOpenJobAction.JobTask;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobDataCountsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobRenormalizedResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.JobResultsPersister;
import org.elasticsearch.xpack.ml.job.persistence.ScheduledEventsQueryBuilder;
import org.elasticsearch.xpack.ml.job.persistence.StateStreamer;
import org.elasticsearch.xpack.ml.job.process.DataCountsReporter;
import org.elasticsearch.xpack.ml.job.process.NativeStorageProvider;
import org.elasticsearch.xpack.ml.job.process.autodetect.output.AutoDetectResultProcessor;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.AutodetectParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.DataLoadParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.FlushJobParams;
import org.elasticsearch.xpack.ml.job.process.autodetect.params.ForecastParams;
Expand Down Expand Up @@ -82,6 +84,8 @@
import java.util.function.Consumer;

import static org.elasticsearch.common.settings.Setting.Property;
import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
import static org.elasticsearch.xpack.core.ClientHelper.executeAsyncWithOrigin;

public class AutodetectProcessManager extends AbstractComponent {

Expand Down Expand Up @@ -156,7 +160,7 @@ public void onNodeStartup() {
}
}

public synchronized void closeAllJobsOnThisNode(String reason) throws IOException {
public synchronized void closeAllJobsOnThisNode(String reason) {
int numJobs = processByAllocation.size();
if (numJobs != 0) {
logger.info("Closing [{}] jobs, because [{}]", numJobs, reason);
Expand Down Expand Up @@ -322,8 +326,7 @@ public void forecastJob(JobTask jobTask, ForecastParams params, Consumer<Excepti
});
}

public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams,
Consumer<Exception> handler) {
public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams, Consumer<Exception> handler) {
AutodetectCommunicator communicator = getOpenAutodetectCommunicator(jobTask);
if (communicator == null) {
String message = "Cannot process update model debug config because job [" + jobTask.getJobId() + "] is not open";
Expand All @@ -332,25 +335,59 @@ public void writeUpdateProcessMessage(JobTask jobTask, UpdateParams updateParams
return;
}

UpdateProcessMessage.Builder updateProcessMessage = new UpdateProcessMessage.Builder();
updateProcessMessage.setModelPlotConfig(updateParams.getModelPlotConfig());
updateProcessMessage.setDetectorUpdates(updateParams.getDetectorUpdates());

// Step 3. Set scheduled events on message and write update process message
ActionListener<QueryPage<ScheduledEvent>> eventsListener = ActionListener.wrap(
events -> {
communicator.writeUpdateProcessMessage(updateParams, events == null ? null : events.results(), (aVoid, e) -> {
updateProcessMessage.setScheduledEvents(events == null ? null : events.results());
communicator.writeUpdateProcessMessage(updateProcessMessage.build(), (aVoid, e) -> {
if (e == null) {
handler.accept(null);
} else {
handler.accept(e);
}
});
},
handler::accept);

if (updateParams.isUpdateScheduledEvents()) {
Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
DataCounts dataCounts = getStatistics(jobTask).get().v1();
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
}, handler
);

// Step 2. Set the filter on the message and get scheduled events
ActionListener<MlFilter> filterListener = ActionListener.wrap(
filter -> {
updateProcessMessage.setFilter(filter);

if (updateParams.isUpdateScheduledEvents()) {
Job job = jobManager.getJobOrThrowIfUnknown(jobTask.getJobId());
DataCounts dataCounts = getStatistics(jobTask).get().v1();
ScheduledEventsQueryBuilder query = new ScheduledEventsQueryBuilder().start(job.earliestValidTimestamp(dataCounts));
jobProvider.scheduledEventsForJob(jobTask.getJobId(), job.getGroups(), query, eventsListener);
} else {
eventsListener.onResponse(null);
}
}, handler
);

// Step 1. Get the filter
if (updateParams.getFilter() == null) {
filterListener.onResponse(null);
} else {
eventsListener.onResponse(null);
GetFiltersAction.Request getFilterRequest = new GetFiltersAction.Request();
getFilterRequest.setFilterId(updateParams.getFilter().getId());
executeAsyncWithOrigin(client, ML_ORIGIN, GetFiltersAction.INSTANCE, getFilterRequest,
new ActionListener<GetFiltersAction.Response>() {

@Override
public void onResponse(GetFiltersAction.Response response) {
filterListener.onResponse(response.getFilters().results().get(0));
}

@Override
public void onFailure(Exception e) {
handler.accept(e);
}
});
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,15 @@ public MlFilter getFilter() {
return filter;
}

/**
* Returns true if the update params include a job update,
* ie an update to the job config directly rather than an
* update to external resources a job uses (e.g. calendars, filters).
*/
public boolean isJobUpdate() {
return modelPlotConfig != null || detectorUpdates != null;
}

public boolean isUpdateScheduledEvents() {
return updateScheduledEvents;
}
Expand Down
Loading

0 comments on commit 268c614

Please sign in to comment.