Skip to content

Commit

Permalink
[ML] Filter undefined job groups from update calendar actions (#30757)
Browse files Browse the repository at this point in the history
The UI creates job groups in calendars ad hoc to ease calendar creation these must be filtered from the jobs list before applying updates.
  • Loading branch information
davidkyle authored May 22, 2018
1 parent bdb79d0 commit f76f95b
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,35 +28,28 @@

public class TransportUpdateCalendarJobAction extends HandledTransportAction<UpdateCalendarJobAction.Request, PutCalendarAction.Response> {

private final ClusterService clusterService;
private final JobProvider jobProvider;
private final JobManager jobManager;

@Inject
public TransportUpdateCalendarJobAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService, JobProvider jobProvider, JobManager jobManager) {
JobProvider jobProvider, JobManager jobManager) {
super(settings, UpdateCalendarJobAction.NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, UpdateCalendarJobAction.Request::new);
this.clusterService = clusterService;
this.jobProvider = jobProvider;
this.jobManager = jobManager;
}

@Override
protected void doExecute(UpdateCalendarJobAction.Request request, ActionListener<PutCalendarAction.Response> listener) {
ClusterState clusterState = clusterService.state();
final MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);

Set<String> jobIdsToAdd = Strings.tokenizeByCommaToSet(request.getJobIdsToAddExpression());
Set<String> jobIdsToRemove = Strings.tokenizeByCommaToSet(request.getJobIdsToRemoveExpression());

jobProvider.updateCalendar(request.getCalendarId(), jobIdsToAdd, jobIdsToRemove, mlMetadata,
jobProvider.updateCalendar(request.getCalendarId(), jobIdsToAdd, jobIdsToRemove,
c -> {
List<String> existingJobsOrGroups =
c.getJobIds().stream().filter(mlMetadata::isGroupOrJob).collect(Collectors.toList());
jobManager.updateProcessOnCalendarChanged(existingJobsOrGroups);
jobManager.updateProcessOnCalendarChanged(c.getJobIds());
listener.onResponse(new PutCalendarAction.Response(c));
}, listener::onFailure);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;

/**
* Allows interactions with jobs. The managed interactions include:
Expand Down Expand Up @@ -420,8 +421,13 @@ public void updateProcessOnFilterChanged(MlFilter filter) {

public void updateProcessOnCalendarChanged(List<String> calendarJobIds) {
ClusterState clusterState = clusterService.state();
MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);

List<String> existingJobsOrGroups =
calendarJobIds.stream().filter(mlMetadata::isGroupOrJob).collect(Collectors.toList());

Set<String> expandedJobIds = new HashSet<>();
calendarJobIds.forEach(jobId -> expandedJobIds.addAll(expandJobIds(jobId, true, clusterState)));
existingJobsOrGroups.forEach(jobId -> expandedJobIds.addAll(expandJobIds(jobId, true, clusterState)));
for (String jobId : expandedJobIds) {
if (isJobOpen(clusterState, jobId)) {
updateJobProcessNotifier.submitJobUpdate(UpdateParams.scheduledEventsUpdate(jobId), ActionListener.wrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ public void getForecastRequestStats(String jobId, String forecastId, Consumer<Fo
result -> handler.accept(result.result), errorHandler, () -> null);
}

public void updateCalendar(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove, MlMetadata mlMetadata,
public void updateCalendar(String calendarId, Set<String> jobIdsToAdd, Set<String> jobIdsToRemove,
Consumer<Calendar> handler, Consumer<Exception> errorHandler) {

ActionListener<Calendar> getCalendarListener = ActionListener.wrap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ private void updateCalendar(String calendarId, Set<String> idsToAdd, Set<String>
throws Exception {
CountDownLatch latch = new CountDownLatch(1);
AtomicReference<Exception> exceptionHolder = new AtomicReference<>();
jobProvider.updateCalendar(calendarId, idsToAdd, idsToRemove, mlMetadata,
jobProvider.updateCalendar(calendarId, idsToAdd, idsToRemove,
r -> latch.countDown(),
e -> {
exceptionHolder.set(e);
Expand Down
120 changes: 103 additions & 17 deletions x-pack/plugin/src/test/resources/rest-api-spec/test/ml/calendar_crud.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,23 +86,6 @@
xpack.ml.get_calendars:
calendar_id: "dogs_of_the_year"

- do:
xpack.ml.put_calendar:
calendar_id: "new_cal_with_unknown_job_group"
body: >
{
"job_ids": ["cal-job", "unknown-job-group"]
}
- do:
xpack.ml.get_calendars:
calendar_id: "new_cal_with_unknown_job_group"
- match: { count: 1 }
- match:
calendars.0:
calendar_id: "new_cal_with_unknown_job_group"
job_ids: ["cal-job", "unknown-job-group"]

---
"Test get calendar given missing":
- do:
Expand Down Expand Up @@ -714,3 +697,106 @@
- match: { calendar_id: "expression" }
- length: { job_ids: 1 }
- match: { job_ids.0: "bar-a" }

---
"Test calendar actions with new job group":
- do:
xpack.ml.put_job:
job_id: calendar-job
body: >
{
"analysis_config" : {
"detectors" :[{"function":"metric","field_name":"responsetime","by_field_name":"airline"}]
},
"data_description" : {
}
}
- do:
xpack.ml.put_calendar:
calendar_id: "cal_with_new_job_group"
body: >
{
"job_ids": ["calendar-job", "new-job-group"]
}
- do:
xpack.ml.get_calendars:
calendar_id: "cal_with_new_job_group"
- match: { count: 1 }
- match:
calendars.0:
calendar_id: "cal_with_new_job_group"
job_ids: ["calendar-job", "new-job-group"]

- do:
xpack.ml.post_calendar_events:
calendar_id: "cal_with_new_job_group"
body: >
{
"events" : [{ "description": "beach", "start_time": "2018-05-01T00:00:00Z", "end_time": "2018-05-06T00:00:00Z" }]
}
- do:
xpack.ml.get_calendar_events:
calendar_id: cal_with_new_job_group
- length: { events: 1 }
- match: { events.0.description: beach }

- do:
xpack.ml.delete_calendar:
calendar_id: "cal_with_new_job_group"

- do:
xpack.ml.put_calendar:
calendar_id: "started_empty_calendar"

- do:
xpack.ml.put_calendar_job:
calendar_id: "started_empty_calendar"
job_id: "new-group"
- match: { calendar_id: "started_empty_calendar" }
- length: { job_ids: 1 }

- do:
xpack.ml.get_calendars:
calendar_id: "started_empty_calendar"
- match: { count: 1 }
- match:
calendars.0:
calendar_id: "started_empty_calendar"
job_ids: ["new-group"]

- do:
xpack.ml.post_calendar_events:
calendar_id: "started_empty_calendar"
body: >
{
"events" : [{ "description": "beach", "start_time": "2018-05-01T00:00:00Z", "end_time": "2018-05-06T00:00:00Z" }]
}
- do:
xpack.ml.get_calendar_events:
calendar_id: "started_empty_calendar"
- length: { events: 1 }
- match: { events.0.description: beach }
- set: { events.0.event_id: beach_event_id }

- do:
xpack.ml.delete_calendar_event:
calendar_id: "started_empty_calendar"
event_id: $beach_event_id

- do:
xpack.ml.get_calendar_events:
calendar_id: "started_empty_calendar"
- length: { events: 0 }

- do:
xpack.ml.delete_calendar:
calendar_id: "started_empty_calendar"

- do:
catch: missing
xpack.ml.get_calendars:
calendar_id: "started_empty_calendar"

0 comments on commit f76f95b

Please sign in to comment.