Skip to content

Commit

Permalink
Rework local node master
Browse files Browse the repository at this point in the history
  • Loading branch information
davidkyle committed Dec 17, 2018
1 parent af950f3 commit f7faf07
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.LocalNodeMasterListener;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings;
Expand All @@ -26,57 +25,39 @@
import org.elasticsearch.xpack.ml.notifications.Auditor;

import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

public class MlAssignmentNotifier implements ClusterStateListener, LocalNodeMasterListener {
public class MlAssignmentNotifier implements ClusterStateListener {
private static final Logger logger = LogManager.getLogger(MlAssignmentNotifier.class);

private final Auditor auditor;
private final ClusterService clusterService;
private final MlConfigMigrator mlConfigMigrator;
private final ThreadPool threadPool;
private final AtomicBoolean enabled = new AtomicBoolean(false);

MlAssignmentNotifier(Settings settings, Auditor auditor, ThreadPool threadPool, Client client, ClusterService clusterService) {
this.auditor = auditor;
this.clusterService = clusterService;
this.mlConfigMigrator = new MlConfigMigrator(settings, client, clusterService);
this.threadPool = threadPool;
clusterService.addLocalNodeMasterListener(this);
clusterService.addListener(this);
}

MlAssignmentNotifier(Auditor auditor, ThreadPool threadPool, MlConfigMigrator mlConfigMigrator, ClusterService clusterService) {
this.auditor = auditor;
this.clusterService = clusterService;
this.mlConfigMigrator = mlConfigMigrator;
this.threadPool = threadPool;
clusterService.addLocalNodeMasterListener(this);
clusterService.addListener(this);
}

@Override
public void onMaster() {
if (enabled.compareAndSet(false, true)) {
clusterService.addListener(this);
}
}

@Override
public void offMaster() {
if (enabled.compareAndSet(true, false)) {
clusterService.removeListener(this);
}
}

@Override
public String executorName() {
private String executorName() {
return ThreadPool.Names.GENERIC;
}

@Override
public void clusterChanged(ClusterChangedEvent event) {
if (enabled.get() == false) {

if (event.localNodeMaster() == false) {
return;
}

if (event.metaDataChanged() == false) {
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,34 +69,39 @@ private void setupMocks() {

public void testClusterChanged_info() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
notifier.onMaster();

DiscoveryNode node =
new DiscoveryNode("node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT);
ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
new PersistentTasksCustomMetaData(0L, Collections.emptyMap())))
.build();

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", "node_id", null, tasksBuilder);
addJobTask("job_id", "_node_id", null, tasksBuilder);
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build();
ClusterState state = ClusterState.builder(new ClusterName("_name"))
ClusterState newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder().add(node))
// set local node master
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(auditor, times(1)).info(eq("job_id"), any());
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any());
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any());

notifier.offMaster();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
// no longer master
newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9300), Version.CURRENT)))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verifyNoMoreInteractions(auditor);
}

public void testClusterChanged_warning() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
notifier.onMaster();

ClusterState previous = ClusterState.builder(new ClusterName("_name"))
.metaData(MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE,
Expand All @@ -106,21 +111,31 @@ public void testClusterChanged_warning() {
PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", null, null, tasksBuilder);
MetaData metaData = MetaData.builder().putCustom(PersistentTasksCustomMetaData.TYPE, tasksBuilder.build()).build();
ClusterState state = ClusterState.builder(new ClusterName("_name"))
ClusterState newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
// set local node master
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(auditor, times(1)).warning(eq("job_id"), any());
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(state), any());
verify(configMigrator, times(1)).migrateConfigsWithoutTasks(eq(newState), any());

notifier.offMaster();
notifier.clusterChanged(new ClusterChangedEvent("_test", state, previous));
// no longer master
newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)))
.build();

notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verifyNoMoreInteractions(auditor);
}

public void testClusterChanged_noPersistentTaskChanges() {
MlAssignmentNotifier notifier = new MlAssignmentNotifier(auditor, threadPool, configMigrator, clusterService);
notifier.onMaster();

PersistentTasksCustomMetaData.Builder tasksBuilder = PersistentTasksCustomMetaData.builder();
addJobTask("job_id", null, null, tasksBuilder);
Expand All @@ -129,14 +144,25 @@ public void testClusterChanged_noPersistentTaskChanges() {
.metaData(metaData)
.build();

ClusterState current = ClusterState.builder(new ClusterName("_name"))
ClusterState newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
// set local node master
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT))
.localNodeId("_node_id")
.masterNodeId("_node_id"))
.build();

notifier.clusterChanged(new ClusterChangedEvent("_test", current, previous));
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any());

notifier.offMaster();
// no longer master
newState = ClusterState.builder(new ClusterName("_name"))
.metaData(metaData)
.nodes(DiscoveryNodes.builder()
.add(new DiscoveryNode("_node_id", new TransportAddress(InetAddress.getLoopbackAddress(), 9200), Version.CURRENT)))
.build();
notifier.clusterChanged(new ClusterChangedEvent("_test", newState, previous));
verify(configMigrator, never()).migrateConfigsWithoutTasks(any(), any());
}
}

0 comments on commit f7faf07

Please sign in to comment.