Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ML] Filter undefined job groups from update job calendar actions #30757

Merged
merged 1 commit into from
May 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,28 @@

public class TransportUpdateCalendarJobAction extends HandledTransportAction<UpdateCalendarJobAction.Request, PutCalendarAction.Response> {

private final ClusterService clusterService;
private final JobProvider jobProvider;
private final JobManager jobManager;

@Inject
public TransportUpdateCalendarJobAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, JobProvider jobProvider, JobManager jobManager) {
JobProvider jobProvider, JobManager jobManager) {
super(settings, UpdateCalendarJobAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, UpdateCalendarJobAction.Request::new);
this.clusterService = clusterService;
this.jobProvider = jobProvider;
this.jobManager = jobManager;
}

@Override
protected void doExecute(UpdateCalendarJobAction.Request request, ActionListener<PutCalendarAction.Response> listener) {
ClusterState clusterState = clusterService.state();
final MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);

Set<String> jobIdsToAdd = Strings.tokenizeByCommaToSet(request.getJobIdsToAddExpression());
Set<String> jobIdsToRemove = Strings.tokenizeByCommaToSet(request.getJobIdsToRemoveExpression());

jobProvider.updateCalendar(request.getCalendarId(), jobIdsToAdd, jobIdsToRemove, mlMetadata,
jobProvider.updateCalendar(request.getCalendarId(), jobIdsToAdd, jobIdsToRemove,
c -> {
List<String> existingJobsOrGroups =
c.getJobIds().stream().filter(mlMetadata::isGroupOrJob).collect(Collectors.toList());
jobManager.updateProcessOnCalendarChanged(existingJobsOrGroups);
jobManager.updateProcessOnCalendarChanged(c.getJobIds());
listener.onResponse(new PutCalendarAction.Response(c));
}, listener::onFailure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Allows interactions with jobs. The managed interactions include:
Expand Down Expand Up @@ -420,8 +421,13 @@ public void updateProcessOnFilterChanged(MlFilter filter) {

public void updateProcessOnCalendarChanged(List<String> calendarJobIds) {
ClusterState clusterState = clusterService.state();
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);

List<String> existingJobsOrGroups =
calendarJobIds.stream().filter(mlMetadata::isGroupOrJob).collect(Collectors.toList());

Set<String> expandedJobIds = new HashSet<>();
calendarJobIds.forEach(jobId -> expandedJobIds.addAll(expandJobIds(jobId, true, clusterState)));
existingJobsOrGroups.forEach(jobId -> expandedJobIds.addAll(expandJobIds(jobId, true, clusterState)));
for (String jobId : expandedJobIds) {
if (isJobOpen(clusterState, jobId)) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.scheduledEventsUpdate(jobId), ActionListener.wrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ public void getForecastRequestStats(String jobId, String forecastId, Consumer<Fo
result -> handler.accept(result.result), errorHandler, () -> null);
}

public void updateCalendar(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove, MlMetadata mlMetadata,
public void updateCalendar(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove,
Consumer<Calendar> handler, Consumer<Exception> errorHandler) {

ActionListener<Calendar> getCalendarListener = ActionListener.wrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private void updateCalendar(String calendarId, Set<String> idsToAdd, Set<String>
throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
jobProvider.updateCalendar(calendarId, idsToAdd, idsToRemove, mlMetadata,
jobProvider.updateCalendar(calendarId, idsToAdd, idsToRemove,
r -> latch.countDown(),
e -> {
exceptionHolder.set(e);
Expand Down
120 changes: 103 additions & 17 deletions x-pack/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,6 @@
xpack.ml.get_calendars:
calendar_id: "dogs_of_the_year"

- do:
xpack.ml.put_calendar:
calendar_id: "new_cal_with_unknown_job_group"
body: >
{
"job_ids": ["cal-job", "unknown-job-group"]
}

- do:
xpack.ml.get_calendars:
calendar_id: "new_cal_with_unknown_job_group"
- match: { count: 1 }
- match:
calendars.0:
calendar_id: "new_cal_with_unknown_job_group"
job_ids: ["cal-job", "unknown-job-group"]

---
"Test get calendar given missing":
- do:
Expand Down Expand Up @@ -714,3 +697,106 @@
- match: { calendar_id: "expression" }
- length: { job_ids: 1 }
- match: { job_ids.0: "bar-a" }

---
"Test calendar actions with new job group":
- do:
xpack.ml.put_job:
job_id: calendar-job
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}

- do:
xpack.ml.put_calendar:
calendar_id: "cal_with_new_job_group"
body: >
{
"job_ids": ["calendar-job", "new-job-group"]
}

- do:
xpack.ml.get_calendars:
calendar_id: "cal_with_new_job_group"
- match: { count: 1 }
- match:
calendars.0:
calendar_id: "cal_with_new_job_group"
job_ids: ["calendar-job", "new-job-group"]

- do:
xpack.ml.post_calendar_events:
calendar_id: "cal_with_new_job_group"
body: >
{
"events" : [{ "description": "beach", "start_time": "2018-05-01T00:00:00Z", "end_time": "2018-05-06T00:00:00Z" }]
}

- do:
xpack.ml.get_calendar_events:
calendar_id: cal_with_new_job_group
- length: { events: 1 }
- match: { events.0.description: beach }

- do:
xpack.ml.delete_calendar:
calendar_id: "cal_with_new_job_group"

- do:
xpack.ml.put_calendar:
calendar_id: "started_empty_calendar"

- do:
xpack.ml.put_calendar_job:
calendar_id: "started_empty_calendar"
job_id: "new-group"
- match: { calendar_id: "started_empty_calendar" }
- length: { job_ids: 1 }

- do:
xpack.ml.get_calendars:
calendar_id: "started_empty_calendar"
- match: { count: 1 }
- match:
calendars.0:
calendar_id: "started_empty_calendar"
job_ids: ["new-group"]

- do:
xpack.ml.post_calendar_events:
calendar_id: "started_empty_calendar"
body: >
{
"events" : [{ "description": "beach", "start_time": "2018-05-01T00:00:00Z", "end_time": "2018-05-06T00:00:00Z" }]
}

- do:
xpack.ml.get_calendar_events:
calendar_id: "started_empty_calendar"
- length: { events: 1 }
- match: { events.0.description: beach }
- set: { events.0.event_id: beach_event_id }

- do:
xpack.ml.delete_calendar_event:
calendar_id: "started_empty_calendar"
event_id: $beach_event_id

- do:
xpack.ml.get_calendar_events:
calendar_id: "started_empty_calendar"
- length: { events: 0 }

- do:
xpack.ml.delete_calendar:
calendar_id: "started_empty_calendar"

- do:
catch: missing
xpack.ml.get_calendars:
calendar_id: "started_empty_calendar"