Skip to content

Commit

Permalink
[ML] Log warnings for jobs unassigned for a long time
Browse files Browse the repository at this point in the history
If a job is unassigned for a long time (say more than 15 minutes)
then that's a sign of a potential problem with the cluster. In
Cloud it may be an indication of a failure of autoscaling. In
self-managed it may be an indication of a failed node not being
replaced. Either way, warning that the situation exists in
periodic log messages should make it easier for operators to
detect the situation and attempt to remedy it.
  • Loading branch information
droberts195 committed Oct 2, 2023
1 parent 1369ff2 commit d7e89dd
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
Expand All @@ -22,27 +23,55 @@
import org.elasticsearch.xpack.core.ml.action.OpenJobAction;
import org.elasticsearch.xpack.core.ml.action.StartDataFrameAnalyticsAction;
import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
import org.elasticsearch.xpack.core.ml.utils.MlTaskParams;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;

public class MlAssignmentNotifier implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(MlAssignmentNotifier.class);

static final Duration MIN_LOG_CHECK_INTERVAL = Duration.ofSeconds(30);
static final Duration LONG_TIME_UNASSIGNED_INTERVAL = Duration.ofMinutes(15);
static final Duration MIN_REPORT_INTERVAL = Duration.ofHours(6);

private final AnomalyDetectionAuditor anomalyDetectionAuditor;
private final DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor;
private final ThreadPool threadPool;
private final Clock clock;
private Map<TaskNameAndId, UnassignedTimeAndReportTime> unassignedInfoByTask = Map.of();
private volatile Instant lastLogCheck;

MlAssignmentNotifier(
AnomalyDetectionAuditor anomalyDetectionAuditor,
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor,
ThreadPool threadPool,
ClusterService clusterService
) {
this(anomalyDetectionAuditor, dataFrameAnalyticsAuditor, threadPool, clusterService, Clock.systemUTC());
}

MlAssignmentNotifier(
AnomalyDetectionAuditor anomalyDetectionAuditor,
DataFrameAnalyticsAuditor dataFrameAnalyticsAuditor,
ThreadPool threadPool,
ClusterService clusterService,
Clock clock
) {
this.anomalyDetectionAuditor = anomalyDetectionAuditor;
this.dataFrameAnalyticsAuditor = dataFrameAnalyticsAuditor;
this.threadPool = threadPool;
this.clock = clock;
this.lastLogCheck = clock.instant();
clusterService.addListener(this);
}

Expand All @@ -54,9 +83,16 @@ private String executorName() {
public void clusterChanged(ClusterChangedEvent event) {

if (event.localNodeMaster() == false) {
unassignedInfoByTask = Map.of();
return;
}

Instant now = clock.instant();
if (lastLogCheck.plus(MIN_LOG_CHECK_INTERVAL).isBefore(now)) {
lastLogCheck = now;
threadPool.executor(executorName()).execute(() -> logLongTimeUnassigned(now, event.state()));
}

if (event.metadataChanged() == false) {
return;
}
Expand Down Expand Up @@ -223,4 +259,80 @@ static String nodeName(DiscoveryNodes nodes, String nodeId) {
}
return nodeId;
}

private void logLongTimeUnassigned(Instant now, ClusterState state) {
PersistentTasksCustomMetadata tasks = state.getMetadata().custom(PersistentTasksCustomMetadata.TYPE);
if (tasks == null) {
return;
}

List<String> itemsToReport = findLongTimeUnassignedTasks(now, tasks);

logger.warn("ML persistent tasks unassigned for a long time [{}]", String.join("|", itemsToReport));
}

/**
* Creates a list of items to be logged to report ML job tasks that:
* 1. Have been unassigned for a long time
* 2. Have not been logged recently (to avoid log spam)
* <p>
* Only report on jobs, not datafeeds, on the assumption that jobs and their corresponding
* datafeeds get assigned together. This may miss some obscure edge cases, but will avoid
* the verbose and confusing messages that the duplication between jobs and datafeeds would
* generally cause.
* <p>
* The time intervals used in this reporting reset each time the master node changes, as
* the data structure used to record the information is in memory on the current master node,
* not in cluster state.
*/
synchronized List<String> findLongTimeUnassignedTasks(Instant now, PersistentTasksCustomMetadata tasks) {

assert tasks != null;

final List<String> itemsToReport = new ArrayList<>();
final Map<TaskNameAndId, UnassignedTimeAndReportTime> oldUnassignedInfoByTask = unassignedInfoByTask;
final Map<TaskNameAndId, UnassignedTimeAndReportTime> newUnassignedInfoByTask = new HashMap<>();

for (PersistentTask<?> task : tasks.tasks()) {
if (task.getExecutorNode() == null) {
final String taskName = task.getTaskName();
if (MlTasks.JOB_TASK_NAME.equals(taskName) || MlTasks.DATA_FRAME_ANALYTICS_TASK_NAME.equals(taskName)) {
final String mlId = ((MlTaskParams) task.getParams()).getMlId();
final TaskNameAndId key = new TaskNameAndId(taskName, mlId);
final UnassignedTimeAndReportTime previousInfo = oldUnassignedInfoByTask.get(key);
final Instant firstUnassignedTime;
final Instant lastReportedTime;
if (previousInfo != null) {
firstUnassignedTime = previousInfo.unassignedTime();
if (firstUnassignedTime.plus(LONG_TIME_UNASSIGNED_INTERVAL).isBefore(now)
&& (previousInfo.reportTime() == null || previousInfo.reportTime().plus(MIN_REPORT_INTERVAL).isBefore(now))) {
lastReportedTime = now;
itemsToReport.add(
Strings.format(
"[%s]/[%s] unassigned for [%d] seconds",
taskName,
mlId,
ChronoUnit.SECONDS.between(firstUnassignedTime, now)
)
);
} else {
lastReportedTime = previousInfo.reportTime();
}
} else {
firstUnassignedTime = now;
lastReportedTime = null;
}
newUnassignedInfoByTask.put(key, new UnassignedTimeAndReportTime(firstUnassignedTime, lastReportedTime));
}
}
}

unassignedInfoByTask = newUnassignedInfoByTask;

return itemsToReport;
}

private record TaskNameAndId(String taskName, String mlId) {};

private record UnassignedTimeAndReportTime(Instant unassignedTime, Instant reportTime) {};
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,25 @@
import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.ml.job.config.JobState;
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
import org.junit.Before;

import java.net.InetAddress;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;

import static org.elasticsearch.xpack.ml.job.task.OpenJobPersistentTasksExecutorTests.addJobTask;
import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -47,10 +55,9 @@ public void setupMocks() {
dataFrameAnalyticsAuditor = mock(DataFrameAnalyticsAuditor.class);
clusterService = mock(ClusterService.class);
threadPool = mock(ThreadPool.class);
threadPool = mock(ThreadPool.class);

ExecutorService executorService = mock(ExecutorService.class);
org.mockito.Mockito.doAnswer(invocation -> {
doAnswer(invocation -> {
((Runnable) invocation.getArguments()[0]).run();
return null;
}).when(executorService).execute(any(Runnable.class));
Expand Down Expand Up @@ -233,4 +240,69 @@ public void testAuditUnassignedMlTasks() {
verify(anomalyDetectionAuditor, times(2)).includeNodeInfo();
}
}

public void testFindLongTimeUnassignedTasks() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(
anomalyDetectionAuditor,
dataFrameAnalyticsAuditor,
threadPool,
clusterService
);

Instant now = Instant.now();
Instant eightHoursAgo = now.minus(Duration.ofHours(8));
Instant sevenHoursAgo = eightHoursAgo.plus(Duration.ofHours(1));
Instant twoHoursAgo = sevenHoursAgo.plus(Duration.ofHours(5));

PersistentTasksCustomMetadata.Builder tasksBuilder = PersistentTasksCustomMetadata.builder();
addJobTask("job1", "node1", JobState.OPENED, tasksBuilder);
addJobTask("job2", "node1", JobState.OPENED, tasksBuilder);
addJobTask("job3", null, JobState.OPENED, tasksBuilder);
addJobTask("job4", null, JobState.OPENED, tasksBuilder);
addJobTask("job5", null, JobState.OPENED, tasksBuilder);
List<String> itemsToReport = notifier.findLongTimeUnassignedTasks(eightHoursAgo, tasksBuilder.build());
// Nothing reported because unassigned jobs only just detected
assertThat(itemsToReport, empty());

tasksBuilder = PersistentTasksCustomMetadata.builder();
addJobTask("job1", null, JobState.OPENED, tasksBuilder);
addJobTask("job2", "node1", JobState.OPENED, tasksBuilder);
addJobTask("job3", null, JobState.OPENED, tasksBuilder);
addJobTask("job4", "node2", JobState.OPENED, tasksBuilder);
addJobTask("job5", null, JobState.OPENED, tasksBuilder);
itemsToReport = notifier.findLongTimeUnassignedTasks(sevenHoursAgo, tasksBuilder.build());
// Jobs 3 and 5 still unassigned so should get reported, job 4 now assigned, job 1 only just detected unassigned
assertThat(
itemsToReport,
containsInAnyOrder("[xpack/ml/job]/[job3] unassigned for [3600] seconds", "[xpack/ml/job]/[job5] unassigned for [3600] seconds")
);

tasksBuilder = PersistentTasksCustomMetadata.builder();
addJobTask("job1", null, JobState.OPENED, tasksBuilder);
addJobTask("job2", null, JobState.OPENED, tasksBuilder);
addJobTask("job3", null, JobState.OPENED, tasksBuilder);
addJobTask("job4", "node2", JobState.OPENED, tasksBuilder);
addJobTask("job5", null, JobState.OPENED, tasksBuilder);
itemsToReport = notifier.findLongTimeUnassignedTasks(twoHoursAgo, tasksBuilder.build());
// Jobs 3 and 5 still unassigned but reported less than 6 hours ago, job 1 still unassigned so gets reported now,
// job 2 only just detected unassigned
assertThat(itemsToReport, contains("[xpack/ml/job]/[job1] unassigned for [18000] seconds"));

tasksBuilder = PersistentTasksCustomMetadata.builder();
addJobTask("job1", null, JobState.OPENED, tasksBuilder);
addJobTask("job2", null, JobState.OPENED, tasksBuilder);
addJobTask("job3", null, JobState.OPENED, tasksBuilder);
addJobTask("job4", null, JobState.OPENED, tasksBuilder);
addJobTask("job5", "node1", JobState.OPENED, tasksBuilder);
itemsToReport = notifier.findLongTimeUnassignedTasks(now, tasksBuilder.build());
// Job 3 still unassigned and reported more than 6 hours ago, job 1 still unassigned but reported less than 6 hours ago,
// job 2 still unassigned so gets reported now, job 4 only just detected unassigned, job 5 now assigned
assertThat(
itemsToReport,
containsInAnyOrder(
"[xpack/ml/job]/[job2] unassigned for [7200] seconds",
"[xpack/ml/job]/[job3] unassigned for [28800] seconds"
)
);
}
}

0 comments on commit d7e89dd

Please sign in to comment.