diff --git a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HighAvailabilityServicesUtil.java b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HighAvailabilityServicesUtil.java index 5f288a7d7..1107035f5 100644 --- a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HighAvailabilityServicesUtil.java +++ b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/HighAvailabilityServicesUtil.java @@ -57,7 +57,7 @@ private static class ZkHighAvailabilityServices extends AbstractIdleService impl private final AtomicInteger rmConnections = new AtomicInteger(0); public ZkHighAvailabilityServices(CoreConfiguration configuration) { - curatorService = new CuratorService(configuration, null); + curatorService = new CuratorService(configuration); } @Override diff --git a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/TestGetMasterMonitor.java b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/TestGetMasterMonitor.java index c311dd302..f2a75e448 100644 --- a/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/TestGetMasterMonitor.java +++ b/mantis-control-plane/mantis-control-plane-client/src/main/java/io/mantisrx/server/master/client/TestGetMasterMonitor.java @@ -56,7 +56,7 @@ public static void main(String[] args) { final CountDownLatch latch = new CountDownLatch(5); StaticPropertiesConfigurationFactory configurationFactory = new StaticPropertiesConfigurationFactory(properties); CoreConfiguration config = configurationFactory.getConfig(); - final CuratorService curatorService = new CuratorService(config, null); + final CuratorService curatorService = new CuratorService(config); MasterMonitor masterMonitor = curatorService.getMasterMonitor(); masterMonitor.getMasterObservable() .filter(new Func1() { diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/master/ZookeeperMasterMonitor.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/master/ZookeeperMasterMonitor.java index 6c8d92a1b..e63de99a4 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/master/ZookeeperMasterMonitor.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/master/ZookeeperMasterMonitor.java @@ -16,13 +16,14 @@ package io.mantisrx.server.core.master; -import io.mantisrx.server.core.json.DefaultObjectMapper; +import io.mantisrx.common.JsonSerializer; import io.mantisrx.shaded.org.apache.curator.framework.CuratorFramework; import io.mantisrx.shaded.org.apache.curator.framework.api.BackgroundCallback; import io.mantisrx.shaded.org.apache.curator.framework.api.CuratorEvent; import io.mantisrx.shaded.org.apache.curator.framework.recipes.cache.NodeCache; import io.mantisrx.shaded.org.apache.curator.framework.recipes.cache.NodeCacheListener; import java.io.IOException; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicReference; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,16 +43,17 @@ public class ZookeeperMasterMonitor implements MasterMonitor { private final BehaviorSubject masterSubject; private final AtomicReference latestMaster = new AtomicReference<>(); private final NodeCache nodeMonitor; + private final JsonSerializer jsonSerializer; - public ZookeeperMasterMonitor(CuratorFramework curator, String masterPath, MasterDescription initValue) { + public ZookeeperMasterMonitor(CuratorFramework curator, String masterPath) { this.curator = curator; this.masterPath = masterPath; - this.masterSubject = BehaviorSubject.create(initValue); + this.masterSubject = BehaviorSubject.create(); this.nodeMonitor = new NodeCache(curator, masterPath); - this.latestMaster.set(initValue); + this.jsonSerializer = new JsonSerializer(); } - public void start() { + public void start() throws Exception { nodeMonitor.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { @@ -60,12 +62,21 @@ public void nodeChanged() throws Exception { }); try { - nodeMonitor.start(); + nodeMonitor.start(true); } catch (Exception e) { throw new IllegalStateException("Failed to start master node monitor: " + e.getMessage(), e); } - logger.info("The ZK master monitor is started"); + byte[] initialValue = nodeMonitor.getCurrentData().getData(); + onMasterNodeUpdated(initialValue); + logger.info("The ZK master monitor has started"); + } + + private void onMasterNodeUpdated(byte[] data) throws Exception { + MasterDescription description = jsonSerializer.fromJSON(Arrays.toString(data), MasterDescription.class); + logger.info("new master description = {}", description); + latestMaster.set(description); + masterSubject.onNext(description); } private void retrieveMaster() { @@ -78,11 +89,7 @@ private void retrieveMaster() { .inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework client, CuratorEvent event) throws Exception { - MasterDescription description = DefaultObjectMapper.getInstance().readValue(event.getData(), MasterDescription.class); - logger.info("New master retrieved: " + description); - latestMaster.set(description); - masterSubject.onNext(description); - + onMasterNodeUpdated(event.getData()); } }) .forPath(masterPath) @@ -113,4 +120,4 @@ public void shutdown() { throw new RuntimeException("Failed to close the ZK node monitor: " + e.getMessage(), e); } } -} \ No newline at end of file +} diff --git a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/zookeeper/CuratorService.java b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/zookeeper/CuratorService.java index 52464aa4d..972649572 100644 --- a/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/zookeeper/CuratorService.java +++ b/mantis-control-plane/mantis-control-plane-core/src/main/java/io/mantisrx/server/core/zookeeper/CuratorService.java @@ -22,7 +22,6 @@ import io.mantisrx.server.core.BaseService; import io.mantisrx.server.core.CoreConfiguration; import io.mantisrx.server.core.Service; -import io.mantisrx.server.core.master.MasterDescription; import io.mantisrx.server.core.master.MasterMonitor; import io.mantisrx.server.core.master.ZookeeperMasterMonitor; import io.mantisrx.shaded.com.google.common.util.concurrent.MoreExecutors; @@ -38,7 +37,7 @@ /** - * This {@link Service} implementation is responsible for managing the lifecycle of a {@link org.apache.curator.framework.CuratorFramework} + * This {@link Service} implementation is responsible for managing the lifecycle of a {@link io.mantisrx.shaded.org.apache.curator.framework.CuratorFramework} * instance. */ public class CuratorService extends BaseService { @@ -48,12 +47,10 @@ public class CuratorService extends BaseService { private final CuratorFramework curator; private final ZookeeperMasterMonitor masterMonitor; - private final CoreConfiguration configs; private final Gauge isConnectedGauge; - public CuratorService(CoreConfiguration configs, MasterDescription initialMasterDescription) { + public CuratorService(CoreConfiguration configs) { super(false); - this.configs = configs; Metrics m = new Metrics.Builder() .name(CuratorService.class.getCanonicalName()) .addGauge(isConnectedGaugeName) @@ -70,8 +67,7 @@ public CuratorService(CoreConfiguration configs, MasterDescription initialMaster masterMonitor = new ZookeeperMasterMonitor( curator, - ZKPaths.makePath(configs.getZkRoot(), configs.getLeaderAnnouncementPath()), - initialMasterDescription); + ZKPaths.makePath(configs.getZkRoot(), configs.getLeaderAnnouncementPath())); } private void setupCuratorListener() { @@ -93,10 +89,14 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) { @Override public void start() { - isConnectedGauge.set(0L); - setupCuratorListener(); - curator.start(); - masterMonitor.start(); + try { + isConnectedGauge.set(0L); + setupCuratorListener(); + curator.start(); + masterMonitor.start(); + } catch (Exception e) { + throw new RuntimeException(e); + } } @Override diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java index 83a281a06..57c47088e 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/MasterMain.java @@ -213,7 +213,7 @@ public MasterMain(ConfigurationFactory configFactory, AuditEventSubscriber audit mantisServices.addService(new MasterApiAkkaService(new LocalMasterMonitor(leadershipManager.getDescription()), leadershipManager.getDescription(), jobClusterManagerActor, statusEventBrokerActor, resourceClusters, config.getApiPort(), storageProvider, schedulingService, lifecycleEventPublisher, leadershipManager, agentClusterOps)); } else { - curatorService = new CuratorService(this.config, leadershipManager.getDescription()); + curatorService = new CuratorService(this.config); curatorService.start(); mantisServices.addService(createLeaderElector(curatorService, leadershipManager)); mantisServices.addService(new MasterApiAkkaService(curatorService.getMasterMonitor(), leadershipManager.getDescription(), jobClusterManagerActor, statusEventBrokerActor,