Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Close job defined in index #34217

Merged
merged 1 commit into from
Oct 2, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ && updatesDetectors(job) == false
&& (modelSnapshotId == null || Objects.equals(modelSnapshotId, job.getModelSnapshotId()))
&& (establishedModelMemory == null || Objects.equals(establishedModelMemory, job.getEstablishedModelMemory()))
&& (jobVersion == null || Objects.equals(jobVersion, job.getJobVersion()))
&& (clearJobFinishTime == false || job.getFinishedTime() == null);
&& ((clearJobFinishTime == null || clearJobFinishTime == false) || job.getFinishedTime() == null);
}

boolean updatesDetectors(Job job) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@
import org.elasticsearch.xpack.ml.action.TransportValidateJobConfigAction;
import org.elasticsearch.xpack.ml.datafeed.DatafeedJobBuilder;
import org.elasticsearch.xpack.ml.datafeed.DatafeedManager;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.UpdateJobProcessNotifier;
import org.elasticsearch.xpack.ml.job.categorization.MlClassicTokenizer;
Expand Down Expand Up @@ -369,6 +370,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
Auditor auditor = new Auditor(client, clusterService.nodeName());
JobResultsProvider jobResultsProvider = new JobResultsProvider(client, settings);
JobConfigProvider jobConfigProvider = new JobConfigProvider(client, settings);
DatafeedConfigProvider datafeedConfigProvider = new DatafeedConfigProvider(client, settings, xContentRegistry);
UpdateJobProcessNotifier notifier = new UpdateJobProcessNotifier(settings, client, clusterService, threadPool);
JobManager jobManager = new JobManager(env, settings, jobResultsProvider, clusterService, auditor, threadPool, client, notifier);

Expand Down Expand Up @@ -426,6 +428,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
mlLifeCycleService,
jobResultsProvider,
jobConfigProvider,
datafeedConfigProvider,
jobManager,
autodetectProcessManager,
new MlInitializationService(settings, threadPool, clusterService, client),
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

import java.io.IOException;
import java.util.Collections;
import java.util.Map;

public class TransportPutDatafeedAction extends TransportMasterNodeAction<PutDatafeedAction.Request, PutDatafeedAction.Response> {
Expand Down Expand Up @@ -177,7 +178,7 @@ private ElasticsearchException checkConfigsAreNotDefinedInClusterState(String da
}

private void checkJobDoesNotHaveADatafeed(String jobId, ActionListener<Boolean> listener) {
datafeedConfigProvider.findDatafeedForJobId(jobId, ActionListener.wrap(
datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
datafeedIds -> {
if (datafeedIds.isEmpty()) {
listener.onResponse(Boolean.TRUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;

import java.util.Collections;
import java.util.Map;

public class TransportUpdateDatafeedAction extends TransportMasterNodeAction<UpdateDatafeedAction.Request, PutDatafeedAction.Response> {
Expand Down Expand Up @@ -100,7 +101,7 @@ protected void masterOperation(UpdateDatafeedAction.Request request, ClusterStat
* if it does have a datafeed it must be the one we are updating
*/
private void checkJobDoesNotHaveADifferentDatafeed(String jobId, String datafeedId, ActionListener<Boolean> listener) {
datafeedConfigProvider.findDatafeedForJobId(jobId, ActionListener.wrap(
datafeedConfigProvider.findDatafeedsForJobIds(Collections.singletonList(jobId), ActionListener.wrap(
datafeedIds -> {
if (datafeedIds.isEmpty()) {
// Ok the job does not have a datafeed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -168,17 +169,17 @@ public void onFailure(Exception e) {
}

/**
* Find any datafeeds that are used by job {@code jobid} i.e. the
* datafeed that references job {@code jobid}.
* Find any datafeeds that are used by jobs {@code jobIds} i.e. the
* datafeeds that references any of the jobs in {@code jobIds}.
*
* In theory there should never be more than one datafeed referencing a
* particular job.
*
* @param jobId The job to find
* @param jobIds The jobs to find the datafeeds of
* @param listener Datafeed Id listener
*/
public void findDatafeedForJobId(String jobId, ActionListener<Set<String>> listener) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedJobIdQuery(jobId));
public void findDatafeedsForJobIds(Collection<String> jobIds, ActionListener<Set<String>> listener) {
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().query(buildDatafeedJobIdsQuery(jobIds));
sourceBuilder.fetchSource(false);
sourceBuilder.docValueField(DatafeedConfig.ID.getPreferredName());

Expand All @@ -191,8 +192,8 @@ public void findDatafeedForJobId(String jobId, ActionListener<Set<String>> liste
response -> {
Set<String> datafeedIds = new HashSet<>();
SearchHit[] hits = response.getHits().getHits();
// There should be 0 or 1 datafeeds referencing the same job
assert hits.length <= 1;
// There cannot be more than one datafeed per job
assert hits.length <= jobIds.size();

for (SearchHit hit : hits) {
datafeedIds.add(hit.field(DatafeedConfig.ID.getPreferredName()).getValue());
Expand Down Expand Up @@ -463,10 +464,10 @@ private QueryBuilder buildDatafeedIdQuery(String [] tokens) {
return boolQueryBuilder;
}

private QueryBuilder buildDatafeedJobIdQuery(String jobId) {
private QueryBuilder buildDatafeedJobIdsQuery(Collection<String> jobIds) {
BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();
boolQueryBuilder.filter(new TermQueryBuilder(DatafeedConfig.CONFIG_TYPE.getPreferredName(), DatafeedConfig.TYPE));
boolQueryBuilder.filter(new TermQueryBuilder(Job.ID.getPreferredName(), jobId));
boolQueryBuilder.filter(new TermsQueryBuilder(Job.ID.getPreferredName(), jobIds));
return boolQueryBuilder;
}

Expand Down
Loading