Skip to content

Commit

Permalink
[ML] Close job in index (elastic#34217)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Oct 29, 2018
1 parent c75a7d4 commit e3821be
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 340 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ && updatesDetectors(job) == false
&& (modelSnapshotMinVersion == null || Objects.equals(modelSnapshotMinVersion, job.getModelSnapshotMinVersion()))
&& (establishedModelMemory == null || Objects.equals(establishedModelMemory, job.getEstablishedModelMemory()))
&& (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion()))
&& (clearJobFinishTime == false || job.getFinishedTime() == null);
&& ((clearJobFinishTime == null || clearJobFinishTime == false) || job.getFinishedTime() == null);
}

boolean updatesDetectors(Job job) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,7 @@
import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier;
import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizer;
Expand Down Expand Up @@ -367,6 +368,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
Auditor auditor = new Auditor(client, clusterService.getNodeName());
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobConfigProvider jobConfigProvider = new JobConfigProvider(client, settings);
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool);
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, threadPool, client, notifier);

Expand Down Expand Up @@ -423,6 +425,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
mlLifeCycleService,
jobResultsProvider,
jobConfigProvider,
datafeedConfigProvider,
jobManager,
autodetectProcessManager,
new MlInitializationService(settings, threadPool, clusterService, client),
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDatafeedAction.Request, PutDatafeedAction.Response> {
Expand Down Expand Up @@ -177,7 +178,7 @@ private ElasticsearchException checkConfigsAreNotDefinedInClusterState(String da
}

private void checkJobDoesNotHaveADatafeed(String jobId, ActionListener<Boolean> listener) {
datafeedConfigProvider.findDatafeedForJobId(jobId, ActionListener.wrap(
datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
datafeedIds -> {
if (datafeedIds.isEmpty()) {
listener.onResponse(Boolean.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

import java.util.Collections;
import java.util.Map;

public class TransportUpdateDatafeedAction extends TransportMasterNodeAction<UpdateDatafeedAction.Request, PutDatafeedAction.Response> {
Expand Down Expand Up @@ -100,7 +101,7 @@ protected void masterOperation(UpdateDatafeedAction.Request request, ClusterStat
* if it does have a datafeed it must be the one we are updating
*/
private void checkJobDoesNotHaveADifferentDatafeed(String jobId, String datafeedId, ActionListener<Boolean> listener) {
datafeedConfigProvider.findDatafeedForJobId(jobId, ActionListener.wrap(
datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
datafeedIds -> {
if (datafeedIds.isEmpty()) {
// Ok the job does not have a datafeed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -168,17 +169,17 @@ public void onFailure(Exception e) {
}

/**
* Find any datafeeds that are used by job {@code jobid} i.e. the
* datafeed that references job {@code jobid}.
* Find any datafeeds that are used by jobs {@code jobIds} i.e. the
* datafeeds that references any of the jobs in {@code jobIds}.
*
* In theory there should never be more than one datafeed referencing a
* particular job.
*
* @param jobId The job to find
* @param jobIds The jobs to find the datafeeds of
* @param listener Datafeed Id listener
*/
public void findDatafeedForJobId(String jobId, ActionListener<Set<String>> listener) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedJobIdQuery(jobId));
public void findDatafeedsForJobIds(Collection<String> jobIds, ActionListener<Set<String>> listener) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedJobIdsQuery(jobIds));
sourceBuilder.fetchSource(false);
sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName());

Expand All @@ -191,8 +192,8 @@ public void findDatafeedForJobId(String jobId, ActionListener<Set<String>> liste
response -> {
Set<String> datafeedIds = new HashSet<>();
SearchHit[] hits = response.getHits().getHits();
// There should be 0 or 1 datafeeds referencing the same job
assert hits.length <= 1;
// There cannot be more than one datafeed per job
assert hits.length <= jobIds.size();

for (SearchHit hit : hits) {
datafeedIds.add(hit.field(DatafeedConfig.ID.getPreferredName()).getValue());
Expand Down Expand Up @@ -463,10 +464,10 @@ private QueryBuilder buildDatafeedIdQuery(String [] tokens) {
return boolQueryBuilder;
}

private QueryBuilder buildDatafeedJobIdQuery(String jobId) {
private QueryBuilder buildDatafeedJobIdsQuery(Collection<String> jobIds) {
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.filter(new TermQueryBuilder(DatafeedConfig.CONFIG_TYPE.getPreferredName(), DatafeedConfig.TYPE));
boolQueryBuilder.filter(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
boolQueryBuilder.filter(new TermsQueryBuilder(Job.ID.getPreferredName(), jobIds));
return boolQueryBuilder;
}

Expand Down
Loading

0 comments on commit e3821be

Please sign in to comment.