Skip to content

Commit

Permalink
[ML] Job in index: Datafeed node selector (#34218)
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Oct 17, 2018
1 parent 551ddbd commit 7697e34
Show file tree
Hide file tree
Showing 14 changed files with 437 additions and 311 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@
import org.elasticsearch.xpack.core.XPackPlugin;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.xpack.core.ml.job.config.Job;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.LongSupplier;

Expand Down Expand Up @@ -147,6 +150,8 @@ public boolean equals(Object obj) {

public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskParams {

public static final ParseField INDICES = new ParseField("indices");

public static ObjectParser<DatafeedParams, Void> PARSER = new ObjectParser<>(MlTasks.DATAFEED_TASK_NAME, true, DatafeedParams::new);
static {
PARSER.declareString((params, datafeedId) -> params.datafeedId = datafeedId, DatafeedConfig.ID);
Expand All @@ -155,6 +160,8 @@ public static class DatafeedParams implements XPackPlugin.XPackPersistentTaskPar
PARSER.declareString(DatafeedParams::setEndTime, END_TIME);
PARSER.declareString((params, val) ->
params.setTimeout(TimeValue.parseTimeValue(val, TIMEOUT.getPreferredName())), TIMEOUT);
PARSER.declareString(DatafeedParams::setJobId, Job.ID);
PARSER.declareStringArray(DatafeedParams::setDatafeedIndices, INDICES);
}

static long parseDateOrThrow(String date, ParseField paramName, LongSupplier now) {
Expand Down Expand Up @@ -194,6 +201,10 @@ public DatafeedParams(StreamInput in) throws IOException {
startTime = in.readVLong();
endTime = in.readOptionalLong();
timeout = TimeValue.timeValueMillis(in.readVLong());
if (in.getVersion().onOrAfter(Version.CURRENT)) {
jobId = in.readOptionalString();
datafeedIndices = in.readList(StreamInput::readString);
}
}

DatafeedParams() {
Expand All @@ -203,6 +214,9 @@ public DatafeedParams(StreamInput in) throws IOException {
private long startTime;
private Long endTime;
private TimeValue timeout = TimeValue.timeValueSeconds(20);
private List<String> datafeedIndices = Collections.emptyList();
private String jobId;


public String getDatafeedId() {
return datafeedId;
Expand Down Expand Up @@ -232,6 +246,22 @@ public void setTimeout(TimeValue timeout) {
this.timeout = timeout;
}

public String getJobId() {
return jobId;
}

public void setJobId(String jobId) {
this.jobId = jobId;
}

public List<String> getDatafeedIndices() {
return datafeedIndices;
}

public void setDatafeedIndices(List<String> datafeedIndices) {
this.datafeedIndices = datafeedIndices;
}

@Override
public String getWriteableName() {
return MlTasks.DATAFEED_TASK_NAME;
Expand All @@ -248,6 +278,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeVLong(startTime);
out.writeOptionalLong(endTime);
out.writeVLong(timeout.millis());
if (out.getVersion().onOrAfter(Version.CURRENT)) {
out.writeOptionalString(jobId);
out.writeStringList(datafeedIndices);
}
}

@Override
Expand All @@ -259,13 +293,19 @@ public XContentBuilder toXContent(XContentBuilder builder, ToXContent.Params par
builder.field(END_TIME.getPreferredName(), String.valueOf(endTime));
}
builder.field(TIMEOUT.getPreferredName(), timeout.getStringRep());
if (jobId != null) {
builder.field(Job.ID.getPreferredName(), jobId);
}
if (datafeedIndices.isEmpty() == false) {
builder.field(INDICES.getPreferredName(), datafeedIndices);
}
builder.endObject();
return builder;
}

@Override
public int hashCode() {
return Objects.hash(datafeedId, startTime, endTime, timeout);
return Objects.hash(datafeedId, startTime, endTime, timeout, jobId, datafeedIndices);
}

@Override
Expand All @@ -280,7 +320,9 @@ public boolean equals(Object obj) {
return Objects.equals(datafeedId, other.datafeedId) &&
Objects.equals(startTime, other.startTime) &&
Objects.equals(endTime, other.endTime) &&
Objects.equals(timeout, other.timeout);
Objects.equals(timeout, other.timeout) &&
Objects.equals(jobId, other.jobId) &&
Objects.equals(datafeedIndices, other.datafeedIndices);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.test.AbstractSerializingTestCase;

import java.io.IOException;
import java.util.Arrays;

public class DatafeedParamsTests extends AbstractSerializingTestCase<StartDatafeedAction.DatafeedParams> {
@Override
Expand All @@ -28,6 +29,13 @@ public static StartDatafeedAction.DatafeedParams createDatafeedParams() {
if (randomBoolean()) {
params.setTimeout(TimeValue.timeValueMillis(randomNonNegativeLong()));
}
if (randomBoolean()) {
params.setJobId(randomAlphaOfLength(10));
}
if (randomBoolean()) {
params.setDatafeedIndices(Arrays.asList(randomAlphaOfLength(10), randomAlphaOfLength(10)));
}

return params;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
jobManager, jobResultsProvider, jobResultsPersister, jobDataCountsPersister, autodetectProcessFactory,
normalizerFactory, xContentRegistry, auditor);
this.autodetectProcessManager.set(autodetectProcessManager);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, jobResultsProvider, auditor, System::currentTimeMillis);
DatafeedJobBuilder datafeedJobBuilder = new DatafeedJobBuilder(client, settings, xContentRegistry,
auditor, System::currentTimeMillis);
DatafeedManager datafeedManager = new DatafeedManager(threadPool, client, clusterService, datafeedJobBuilder,
System::currentTimeMillis, auditor);
this.datafeedManager.set(datafeedManager);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedConfig;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.util.Objects;
Expand Down Expand Up @@ -89,16 +87,20 @@ public void clusterChanged(ClusterChangedEvent event) {
auditor.info(jobId, "Opening job on node [" + node.toString() + "]");
}
} else if (MlTasks.DATAFEED_TASK_NAME.equals(currentTask.getTaskName())) {
String datafeedId = ((StartDatafeedAction.DatafeedParams) currentTask.getParams()).getDatafeedId();
DatafeedConfig datafeedConfig = MlMetadata.getMlMetadata(event.state()).getDatafeed(datafeedId);
StartDatafeedAction.DatafeedParams datafeedParams = (StartDatafeedAction.DatafeedParams) currentTask.getParams();
String jobId = datafeedParams.getJobId();
if (currentAssignment.getExecutorNode() == null) {
String msg = "No node found to start datafeed [" + datafeedId +"]. Reasons [" +
String msg = "No node found to start datafeed [" + datafeedParams.getDatafeedId() +"]. Reasons [" +
currentAssignment.getExplanation() + "]";
logger.warn("[{}] {}", datafeedConfig.getJobId(), msg);
auditor.warning(datafeedConfig.getJobId(), msg);
logger.warn("[{}] {}", jobId, msg);
if (jobId != null) {
auditor.warning(jobId, msg);
}
} else {
DiscoveryNode node = event.state().nodes().get(currentAssignment.getExecutorNode());
auditor.info(datafeedConfig.getJobId(), "Starting datafeed [" + datafeedId + "] on node [" + node + "]");
if (jobId != null) {
auditor.info(jobId, "Starting datafeed [" + datafeedParams.getDatafeedId() + "] on node [" + node + "]");
}
}
}
}
Expand Down
Loading

0 comments on commit 7697e34

Please sign in to comment.