Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FEATURE][ML] Add cluster setting to enable/disable config migration #36700

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ public class MachineLearning extends Plugin implements ActionPlugin, AnalysisPlu
public static final Setting<Integer> MAX_MACHINE_MEMORY_PERCENT =
Setting.intSetting("xpack.ml.max_machine_memory_percent", 30, 5, 90, Property.Dynamic, Property.NodeScope);
public static final Setting<Integer> MAX_LAZY_ML_NODES =
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);
Setting.intSetting("xpack.ml.max_lazy_ml_nodes", 0, 0, 3, Property.Dynamic, Property.NodeScope);

private static final Logger logger = LogManager.getLogger(XPackPlugin.class);

Expand Down Expand Up @@ -308,7 +308,8 @@ public List<Setting<?>> getSettings() {
AutodetectBuilder.MAX_ANOMALY_RECORDS_SETTING_DYNAMIC,
AutodetectProcessManager.MAX_RUNNING_JOBS_PER_NODE,
AutodetectProcessManager.MAX_OPEN_JOBS_PER_NODE,
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP));
AutodetectProcessManager.MIN_DISK_SPACE_OFF_HEAP,
MlConfigMigrationEligibilityCheck.ENABLE_CONFIG_MIGRATION));
}

public Settings additionalSettings() {
Expand Down Expand Up @@ -444,7 +445,7 @@ public Collection<Object> createComponents(Client client, ClusterService cluster
jobDataCountsPersister,
datafeedManager,
auditor,
new MlAssignmentNotifier(auditor, threadPool, client, clusterService),
new MlAssignmentNotifier(settings, auditor, threadPool, client, clusterService),
memoryTracker
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.Assignment;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData.PersistentTask;
Expand All @@ -36,10 +37,10 @@ public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMast
private final ThreadPool threadPool;
private final AtomicBoolean enabled = new AtomicBoolean(false);

MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
MlAssignmentNotifier(Settings settings, Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
this.auditor = auditor;
this.clusterService = clusterService;
this.mlConfigMigrator = new MlConfigMigrator(client, clusterService);
this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService);
this.threadPool = threadPool;
clusterService.addLocalNodeMasterListener(this);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.ml;

import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.xpack.core.ml.MlMetadata;
import org.elasticsearch.xpack.core.ml.MlTasks;
import org.elasticsearch.xpack.core.ml.job.config.Job;

/**
* Checks whether migration can start and whether ML resources (e.g. jobs, datafeeds)
* are eligible to be migrated from the cluster state into the config index
*/
public class MlConfigMigrationEligibilityCheck {

private static final Version MIN_NODE_VERSION = Version.V_6_6_0;

public static final Setting<Boolean> ENABLE_CONFIG_MIGRATION = Setting.boolSetting(
"xpack.ml.enable_config_migration", true, Setting.Property.Dynamic, Setting.Property.NodeScope);

private volatile boolean isConfigMigrationEnabled;

public MlConfigMigrationEligibilityCheck(Settings settings, ClusterService clusterService) {
isConfigMigrationEnabled = ENABLE_CONFIG_MIGRATION.get(settings);
clusterService.getClusterSettings().addSettingsUpdateConsumer(ENABLE_CONFIG_MIGRATION, this::setConfigMigrationEnabled);
}

private void setConfigMigrationEnabled(boolean configMigrationEnabled) {
this.isConfigMigrationEnabled = configMigrationEnabled;
}

/**
* Can migration start? Returns:
* False if config migration is disabled via the setting {@link #ENABLE_CONFIG_MIGRATION}
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
* True otherwise
* @param clusterState The cluster state
* @return A boolean that dictates if config migration can start
*/
public boolean canStartMigration(ClusterState clusterState) {
if (isConfigMigrationEnabled == false) {
return false;
}

Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
if (minNodeVersion.before(MIN_NODE_VERSION)) {
return false;
}
return true;
}

/**
* Is the job a eligible for migration? Returns:
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
* False if the {@link Job#isDeleting()}
* False if the job has a persistent task
* True otherwise i.e. the job is present, not deleting
* and does not have a persistent task.
*
* @param jobId The job Id
* @param clusterState The cluster state
* @return A boolean depending on the conditions listed above
*/
public boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) {
if (canStartMigration(clusterState) == false) {
return false;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
Job job = mlMetadata.getJobs().get(jobId);

if (job == null || job.isDeleting()) {
return false;
}

PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false;
}

/**
* Is the datafeed a eligible for migration? Returns:
* False if {@link #canStartMigration(ClusterState)} returns {@code false}
* False if the datafeed is not in the cluster state
* False if the datafeed has a persistent task
* True otherwise i.e. the datafeed is present and does not have a persistent task.
*
* @param datafeedId The datafeed Id
* @param clusterState The cluster state
* @return A boolean depending on the conditions listed above
*/
public boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) {
if (canStartMigration(clusterState) == false) {
return false;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) {
return false;
}

PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.elasticsearch.cluster.ClusterStateUpdateTask;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.ToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
Expand All @@ -41,6 +42,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -80,18 +82,19 @@ public class MlConfigMigrator {
private static final Logger logger = LogManager.getLogger(MlConfigMigrator.class);

public static final String MIGRATED_FROM_VERSION = "migrated from version";
public static final Version MIN_NODE_VERSION = Version.V_6_6_0;

static final int MAX_BULK_WRITE_SIZE = 100;

private final Client client;
private final ClusterService clusterService;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;

private final AtomicBoolean migrationInProgress;

public MlConfigMigrator(Client client, ClusterService clusterService) {
this.client = client;
this.clusterService = clusterService;
public MlConfigMigrator(Settings settings, Client client, ClusterService clusterService) {
this.client = Objects.requireNonNull(client);
this.clusterService = Objects.requireNonNull(clusterService);
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.migrationInProgress = new AtomicBoolean(false);
}

Expand All @@ -114,9 +117,8 @@ public MlConfigMigrator(Client client, ClusterService clusterService) {
*/
public void migrateConfigsWithoutTasks(ClusterState clusterState, ActionListener<Boolean> listener) {

Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
if (minNodeVersion.before(MIN_NODE_VERSION)) {
listener.onResponse(Boolean.FALSE);
if (migrationEligibilityCheck.canStartMigration(clusterState) == false) {
listener.onResponse(false);
return;
}

Expand Down Expand Up @@ -455,60 +457,4 @@ static List<String> filterFailedDatafeedConfigWrites(Set<String> failedDocumentI
.filter(id -> failedDocumentIds.contains(DatafeedConfig.documentId(id)) == false)
.collect(Collectors.toList());
}

/**
* Is the job a eligible for migration? Returns:
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
* False if the job is not in the cluster state
* False if the {@link Job#isDeleting()}
* False if the job has a persistent task
* True otherwise i.e. the job is present, not deleting
* and does not have a persistent task.
*
* @param jobId The job Id
* @param clusterState clusterstate
* @return A boolean depending on the conditions listed above
*/
public static boolean jobIsEligibleForMigration(String jobId, ClusterState clusterState) {
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
if (minNodeVersion.before(MIN_NODE_VERSION)) {
return false;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
Job job = mlMetadata.getJobs().get(jobId);

if (job == null || job.isDeleting()) {
return false;
}

PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlTasks.openJobIds(persistentTasks).contains(jobId) == false;
}

/**
* Is the datafeed a eligible for migration? Returns:
* False if the min node version of the cluster is before {@link #MIN_NODE_VERSION}
* False if the datafeed is not in the cluster state
* False if the datafeed has a persistent task
* True otherwise i.e. the datafeed is present and does not have a persistent task.
*
* @param datafeedId The datafeed Id
* @param clusterState clusterstate
* @return A boolean depending on the conditions listed above
*/
public static boolean datafeedIsEligibleForMigration(String datafeedId, ClusterState clusterState) {
Version minNodeVersion = clusterState.nodes().getMinNodeVersion();
if (minNodeVersion.before(MIN_NODE_VERSION)) {
return false;
}

MlMetadata mlMetadata = MlMetadata.getMlMetadata(clusterState);
if (mlMetadata.getDatafeeds().containsKey(datafeedId) == false) {
return false;
}

PersistentTasksCustomMetaData persistentTasks = clusterState.metaData().custom(PersistentTasksCustomMetaData.TYPE);
return MlTasks.startedDatafeedIds(persistentTasks).contains(datafeedId) == false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.persistent.PersistentTasksCustomMetaData;
import org.elasticsearch.persistent.PersistentTasksService;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.core.XPackPlugin;
Expand All @@ -33,7 +33,7 @@
import org.elasticsearch.xpack.core.ml.datafeed.DatafeedState;
import org.elasticsearch.xpack.core.ml.job.messages.Messages;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;

import static org.elasticsearch.xpack.core.ClientHelper.ML_ORIGIN;
Expand All @@ -45,6 +45,7 @@ public class TransportDeleteDatafeedAction extends TransportMasterNodeAction<Del
private final DatafeedConfigProvider datafeedConfigProvider;
private final ClusterService clusterService;
private final PersistentTasksService persistentTasksService;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;

@Inject
public TransportDeleteDatafeedAction(Settings settings, TransportService transportService, ClusterService clusterService,
Expand All @@ -58,6 +59,7 @@ public TransportDeleteDatafeedAction(Settings settings, TransportService transpo
this.datafeedConfigProvider = new DatafeedConfigProvider(client, xContentRegistry);
this.persistentTasksService = persistentTasksService;
this.clusterService = clusterService;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
}

@Override
Expand All @@ -74,7 +76,7 @@ protected AcknowledgedResponse newResponse() {
protected void masterOperation(DeleteDatafeedAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {

if (MlConfigMigrator.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) {
if (migrationEligibilityCheck.datafeedIsEligibleForMigration(request.getDatafeedId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete datafeed", request.getDatafeedId()));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.ModelSnapshot;
import org.elasticsearch.xpack.core.ml.job.process.autodetect.state.Quantiles;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.datafeed.persistence.DatafeedConfigProvider;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobDataDeleter;
Expand Down Expand Up @@ -97,6 +97,7 @@ public class TransportDeleteJobAction extends TransportMasterNodeAction<DeleteJo
private final JobManager jobManager;
private final DatafeedConfigProvider datafeedConfigProvider;
private final MlMemoryTracker memoryTracker;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;

/**
* A map of task listeners by job_id.
Expand All @@ -122,6 +123,7 @@ public TransportDeleteJobAction(Settings settings, TransportService transportSer
this.jobManager = jobManager;
this.datafeedConfigProvider = datafeedConfigProvider;
this.memoryTracker = memoryTracker;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
this.listenersByJobId = new HashMap<>();
}

Expand Down Expand Up @@ -149,7 +151,7 @@ protected void masterOperation(DeleteJobAction.Request request, ClusterState sta
protected void masterOperation(Task task, DeleteJobAction.Request request, ClusterState state,
ActionListener<AcknowledgedResponse> listener) {

if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobId(), state)) {
if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("delete job", request.getJobId()));
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
import org.elasticsearch.xpack.core.ml.job.persistence.ElasticsearchMappings;
import org.elasticsearch.xpack.core.ml.utils.ExceptionsHelper;
import org.elasticsearch.xpack.ml.MachineLearning;
import org.elasticsearch.xpack.ml.MlConfigMigrator;
import org.elasticsearch.xpack.ml.MlConfigMigrationEligibilityCheck;
import org.elasticsearch.xpack.ml.job.ClusterStateJobUpdate;
import org.elasticsearch.xpack.ml.job.JobManager;
import org.elasticsearch.xpack.ml.job.persistence.JobConfigProvider;
Expand Down Expand Up @@ -113,6 +113,7 @@ public class TransportOpenJobAction extends TransportMasterNodeAction<OpenJobAct
private final JobResultsProvider jobResultsProvider;
private final JobManager jobManager;
private final MlMemoryTracker memoryTracker;
private final MlConfigMigrationEligibilityCheck migrationEligibilityCheck;

@Inject
public TransportOpenJobAction(Settings settings, TransportService transportService, ThreadPool threadPool,
Expand All @@ -130,6 +131,7 @@ public TransportOpenJobAction(Settings settings, TransportService transportServi
this.jobConfigProvider = jobConfigProvider;
this.jobManager = jobManager;
this.memoryTracker = memoryTracker;
this.migrationEligibilityCheck = new MlConfigMigrationEligibilityCheck(settings, clusterService);
}

/**
Expand Down Expand Up @@ -520,7 +522,7 @@ protected ClusterBlockException checkBlock(OpenJobAction.Request request, Cluste

@Override
protected void masterOperation(OpenJobAction.Request request, ClusterState state, ActionListener<AcknowledgedResponse> listener) {
if (MlConfigMigrator.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) {
if (migrationEligibilityCheck.jobIsEligibleForMigration(request.getJobParams().getJobId(), state)) {
listener.onFailure(ExceptionsHelper.configHasNotBeenMigrated("open job", request.getJobParams().getJobId()));
return;
}
Expand Down
Loading