Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Close job defined in index #34146

Closed
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.xpack.core.logstash.LogstashFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MachineLearningFeatureSetUsage;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.CloseJobAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarAction;
import org.elasticsearch.xpack.core.ml.action.DeleteCalendarEventAction;
Expand Down Expand Up @@ -331,9 +332,9 @@ public List<NamedWriteableRegistry.Entry> getNamedWriteables() {
new NamedWriteableRegistry.Entry(MetaData.Custom.class, "ml", MlMetadata::new),
new NamedWriteableRegistry.Entry(NamedDiff.class, "ml", MlMetadata.MlMetadataDiff::new),
// ML - Persistent action requests
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, StartDatafeedAction.TASK_NAME,
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.DATAFEED_TASK_NAME,
StartDatafeedAction.DatafeedParams::new),
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, OpenJobAction.TASK_NAME,
new NamedWriteableRegistry.Entry(PersistentTaskParams.class, MlTasks.JOB_TASK_NAME,
OpenJobAction.JobParams::new),
// ML - Task states
new NamedWriteableRegistry.Entry(PersistentTaskState.class, JobTaskState.NAME, JobTaskState::new),
Expand Down Expand Up @@ -379,9 +380,9 @@ public List<NamedXContentRegistry.Entry> getNamedXContent() {
new NamedXContentRegistry.Entry(MetaData.Custom.class, new ParseField("ml"),
parser -> MlMetadata.LENIENT_PARSER.parse(parser, null).build()),
// ML - Persistent action requests
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(StartDatafeedAction.TASK_NAME),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.DATAFEED_TASK_NAME),
StartDatafeedAction.DatafeedParams::fromXContent),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(OpenJobAction.TASK_NAME),
new NamedXContentRegistry.Entry(PersistentTaskParams.class, new ParseField(MlTasks.JOB_TASK_NAME),
OpenJobAction.JobParams::fromXContent),
// ML - Task states
new NamedXContentRegistry.Entry(PersistentTaskState.class, new ParseField(DatafeedState.NAME), DatafeedState::fromXContent),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ public final class MlMetaIndex {
*/
public static final String INDEX_NAME = ".ml-meta";

public static final String INCLUDE_TYPE_KEY = "include_type";

public static final String TYPE = "doc";

private MlMetaIndex() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
*/
package org.elasticsearch.xpack.core.ml;

import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.ResourceNotFoundException;
import org.elasticsearch.Version;
import org.elasticsearch.cluster.AbstractDiffable;
Expand Down Expand Up @@ -167,7 +166,7 @@ private static <T extends Writeable> void writeMap(Map<String, T> map, StreamOut
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
DelegatingMapParams extendedParams =
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_CLUSTER_STATE, "true"), params);
new DelegatingMapParams(Collections.singletonMap(ToXContentParams.FOR_INTERNAL_STORAGE, "true"), params);
mapValuesToXContent(JOBS_FIELD, jobs, builder, extendedParams);
mapValuesToXContent(DATAFEEDS_FIELD, datafeeds, builder, extendedParams);
return builder;
Expand Down Expand Up @@ -295,7 +294,7 @@ public Builder deleteJob(String jobId, PersistentTasksCustomMetaData tasks) {

public Builder putDatafeed(DatafeedConfig datafeedConfig, Map<String, String> headers) {
if (datafeeds.containsKey(datafeedConfig.getId())) {
throw new ResourceAlreadyExistsException("A datafeed with id [" + datafeedConfig.getId() + "] already exists");
throw ExceptionsHelper.datafeedAlreadyExists(datafeedConfig.getId());
}
String jobId = datafeedConfig.getJobId();
checkJobIsAvailableForDatafeed(jobId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,18 @@
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;

import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

public final class MlTasks {

public static final String JOB_TASK_NAME = "xpack/ml/job";
public static final String DATAFEED_TASK_NAME = "xpack/ml/datafeed";

private static final String JOB_TASK_ID_PREFIX = "job-";
private static final String DATAFEED_TASK_ID_PREFIX = "datafeed-";

private MlTasks() {
}

Expand All @@ -22,15 +32,15 @@ private MlTasks() {
* A datafeed id can be used as a job id, because they are stored separately in cluster state.
*/
public static String jobTaskId(String jobId) {
return "job-" + jobId;
return JOB_TASK_ID_PREFIX + jobId;
}

/**
* Namespaces the task ids for datafeeds.
* A job id can be used as a datafeed id, because they are stored separately in cluster state.
*/
public static String datafeedTaskId(String datafeedId) {
return "datafeed-" + datafeedId;
return DATAFEED_TASK_ID_PREFIX + datafeedId;
}

@Nullable
Expand Down Expand Up @@ -67,4 +77,43 @@ public static DatafeedState getDatafeedState(String datafeedId, @Nullable Persis
return DatafeedState.STOPPED;
}
}

/**
* The job Ids of anomaly detector job tasks.
* All anomaly detector jobs are returned regardless of the status of the
* task (OPEN, CLOSED, FAILED etc).
*
* @param tasks Persistent tasks
* @return The job Ids of anomaly detector job tasks
*/
public static Set<String> openJobIds(PersistentTasksCustomMetaData tasks) {
return tasks.findTasks(JOB_TASK_NAME, task -> true)
.stream()
.map(t -> t.getId().substring(JOB_TASK_ID_PREFIX.length()))
.collect(Collectors.toSet());
}

/**
* Is there an ml anomaly detector job task for the job {@code jobId}?
* @param jobId The job id
* @param tasks Persistent tasks
* @return
*/
public static boolean taskExistsForJob(String jobId, PersistentTasksCustomMetaData tasks) {
return openJobIds(tasks).contains(jobId);
}

/**
* Read the active anomaly detector job tasks.
* Active tasks are not {@code JobState.CLOSED} or {@code JobState.FAILED}.
*
* @param tasks Persistent tasks
* @return The job tasks excluding closed and failed jobs
*/
public static List<PersistentTasksCustomMetaData.PersistentTask<?>> activeJobTasks(PersistentTasksCustomMetaData tasks) {
return tasks.findTasks(JOB_TASK_NAME, task -> true)
.stream()
.filter(task -> ((JobTaskState) task.getState()).getState().isAnyOf(JobState.CLOSED, JobState.FAILED) == false)
.collect(Collectors.toList());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.elasticsearch.action.support.master.MasterNodeRequest;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand All @@ -25,6 +26,7 @@
import org.elasticsearch.tasks.Task;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.MachineLearningField;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

Expand All @@ -35,7 +37,7 @@ public class OpenJobAction extends Action<OpenJobAction.Request, AcknowledgedRes

public static final OpenJobAction INSTANCE = new OpenJobAction();
public static final String NAME = "cluster:admin/xpack/ml/job/open";
public static final String TASK_NAME = "xpack/ml/job";


private OpenJobAction() {
super(NAME);
Expand Down Expand Up @@ -136,10 +138,9 @@ public static class JobParams implements XPackPlugin.XPackPersistentTaskParams {

/** TODO Remove in 7.0.0 */
public static final ParseField IGNORE_DOWNTIME = new ParseField("ignore_downtime");

public static final ParseField TIMEOUT = new ParseField("timeout");
public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(TASK_NAME, true, JobParams::new);

public static ObjectParser<JobParams, Void> PARSER = new ObjectParser<>(MlTasks.JOB_TASK_NAME, true, JobParams::new);
static {
PARSER.declareString(JobParams::setJobId, Job.ID);
PARSER.declareBoolean((p, v) -> {}, IGNORE_DOWNTIME);
Expand All @@ -163,6 +164,7 @@ public static JobParams parseRequest(String jobId, XContentParser parser) {
// A big state can take a while to restore. For symmetry with the _close endpoint any
// changes here should be reflected there too.
private TimeValue timeout = MachineLearningField.STATE_PERSIST_RESTORE_TIMEOUT;
private Job job;

JobParams() {
}
Expand All @@ -178,6 +180,9 @@ public JobParams(StreamInput in) throws IOException {
in.readBoolean();
}
timeout = TimeValue.timeValueMillis(in.readVLong());
if (in.getVersion().onOrAfter(Version.CURRENT)) {
job = in.readOptionalWriteable(Job::new);
}
}

public String getJobId() {
Expand All @@ -196,9 +201,18 @@ public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}

@Nullable
public Job getJob() {
return job;
}

public void setJob(Job job) {
this.job = job;
}

@Override
public String getWriteableName() {
return TASK_NAME;
return MlTasks.JOB_TASK_NAME;
}

@Override
Expand All @@ -209,6 +223,9 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeBoolean(true);
}
out.writeVLong(timeout.millis());
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalWriteable(job);
}
}

@Override
Expand All @@ -217,12 +234,13 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
builder.field(Job.ID.getPreferredName(), jobId);
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
builder.endObject();
// The job field is streamed but not persisted
return builder;
}

@Override
public int hashCode() {
return Objects.hash(jobId, timeout);
return Objects.hash(jobId, timeout, job);
}

@Override
Expand All @@ -235,7 +253,8 @@ public boolean equals(Object obj) {
}
OpenJobAction.JobParams other = (OpenJobAction.JobParams) obj;
return Objects.equals(jobId, other.jobId) &&
Objects.equals(timeout, other.timeout);
Objects.equals(timeout, other.timeout) &&
Objects.equals(job, other.job);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,7 @@ public void writeTo(StreamOutput out) throws IOException {

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
datafeed.doXContentBody(builder, params);
builder.endObject();
datafeed.toXContent(builder, params);
return builder;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
Expand All @@ -43,7 +44,6 @@ public class StartDatafeedAction

public static final StartDatafeedAction INSTANCE = new StartDatafeedAction();
public static final String NAME = "cluster:admin/xpack/ml/datafeed/start";
public static final String TASK_NAME = "xpack/ml/datafeed";

private StartDatafeedAction() {
super(NAME);
Expand Down Expand Up @@ -147,8 +147,7 @@ public boolean equals(Object obj) {

public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams {

public static ObjectParser<DatafeedParams, Void> PARSER = new ObjectParser<>(TASK_NAME, true, DatafeedParams::new);

public static ObjectParser<DatafeedParams, Void> PARSER = new ObjectParser<>(MlTasks.DATAFEED_TASK_NAME, true, DatafeedParams::new);
static {
PARSER.declareString((params, datafeedId) -> params.datafeedId = datafeedId, DatafeedConfig.ID);
PARSER.declareString((params, startTime) -> params.startTime = parseDateOrThrow(
Expand Down Expand Up @@ -235,7 +234,7 @@ public void setTimeout(TimeValue timeout) {

@Override
public String getWriteableName() {
return TASK_NAME;
return MlTasks.DATAFEED_TASK_NAME;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;

import java.io.IOException;
import java.util.Arrays;
Expand Down Expand Up @@ -111,7 +111,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (description != null) {
builder.field(DESCRIPTION.getPreferredName(), description);
}
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) {
builder.field(TYPE.getPreferredName(), CALENDAR_TYPE);
}
builder.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.xpack.core.ml.MlMetaIndex;
import org.elasticsearch.xpack.core.ml.job.config.DetectionRule;
import org.elasticsearch.xpack.core.ml.job.config.Operator;
import org.elasticsearch.xpack.core.ml.job.config.RuleAction;
import org.elasticsearch.xpack.core.ml.job.config.RuleCondition;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.core.ml.utils.Intervals;
import org.elasticsearch.xpack.core.ml.utils.ToXContentParams;
import org.elasticsearch.xpack.core.ml.utils.time.TimeUtils;

import java.io.IOException;
Expand Down Expand Up @@ -170,7 +170,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
if (eventId != null) {
builder.field(EVENT_ID.getPreferredName(), eventId);
}
if (params.paramAsBoolean(MlMetaIndex.INCLUDE_TYPE_KEY, false)) {
if (params.paramAsBoolean(ToXContentParams.INCLUDE_TYPE, false)) {
builder.field(TYPE.getPreferredName(), SCHEDULED_EVENT_TYPE);
}
builder.endObject();
Expand Down
Loading