Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Making sure zookeeper master monitor doesn't need a seed value for master description #187

Merged
merged 1 commit into from
Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private static class ZkHighAvailabilityServices extends AbstractIdleService impl
private final AtomicInteger rmConnections = new AtomicInteger(0);

public ZkHighAvailabilityServices(CoreConfiguration configuration) {
curatorService = new CuratorService(configuration, null);
curatorService = new CuratorService(configuration);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(5);
StaticPropertiesConfigurationFactory configurationFactory = new StaticPropertiesConfigurationFactory(properties);
CoreConfiguration config = configurationFactory.getConfig();
final CuratorService curatorService = new CuratorService(config, null);
final CuratorService curatorService = new CuratorService(config);
MasterMonitor masterMonitor = curatorService.getMasterMonitor();
masterMonitor.getMasterObservable()
.filter(new Func1<MasterDescription, Boolean>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@

package io.mantisrx.server.core.master;

import io.mantisrx.server.core.json.DefaultObjectMapper;
import io.mantisrx.common.JsonSerializer;
import io.mantisrx.shaded.org.apache.curator.framework.CuratorFramework;
import io.mantisrx.shaded.org.apache.curator.framework.api.BackgroundCallback;
import io.mantisrx.shaded.org.apache.curator.framework.api.CuratorEvent;
import io.mantisrx.shaded.org.apache.curator.framework.recipes.cache.NodeCache;
import io.mantisrx.shaded.org.apache.curator.framework.recipes.cache.NodeCacheListener;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -42,16 +43,17 @@ public class ZookeeperMasterMonitor implements MasterMonitor {
private final BehaviorSubject<MasterDescription> masterSubject;
private final AtomicReference<MasterDescription> latestMaster = new AtomicReference<>();
private final NodeCache nodeMonitor;
private final JsonSerializer jsonSerializer;

public ZookeeperMasterMonitor(CuratorFramework curator, String masterPath, MasterDescription initValue) {
public ZookeeperMasterMonitor(CuratorFramework curator, String masterPath) {
this.curator = curator;
this.masterPath = masterPath;
this.masterSubject = BehaviorSubject.create(initValue);
this.masterSubject = BehaviorSubject.create();
this.nodeMonitor = new NodeCache(curator, masterPath);
this.latestMaster.set(initValue);
this.jsonSerializer = new JsonSerializer();
}

public void start() {
public void start() throws Exception {
nodeMonitor.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
Expand All @@ -60,12 +62,21 @@ public void nodeChanged() throws Exception {
});

try {
nodeMonitor.start();
nodeMonitor.start(true);
} catch (Exception e) {
throw new IllegalStateException("Failed to start master node monitor: " + e.getMessage(), e);
}

logger.info("The ZK master monitor is started");
byte[] initialValue = nodeMonitor.getCurrentData().getData();
onMasterNodeUpdated(initialValue);
logger.info("The ZK master monitor has started");
}

private void onMasterNodeUpdated(byte[] data) throws Exception {
MasterDescription description = jsonSerializer.fromJSON(Arrays.toString(data), MasterDescription.class);
logger.info("new master description = {}", description);
latestMaster.set(description);
masterSubject.onNext(description);
}

private void retrieveMaster() {
Expand All @@ -78,11 +89,7 @@ private void retrieveMaster() {
.inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
MasterDescription description = DefaultObjectMapper.getInstance().readValue(event.getData(), MasterDescription.class);
logger.info("New master retrieved: " + description);
latestMaster.set(description);
masterSubject.onNext(description);

onMasterNodeUpdated(event.getData());
}
})
.forPath(masterPath)
Expand Down Expand Up @@ -113,4 +120,4 @@ public void shutdown() {
throw new RuntimeException("Failed to close the ZK node monitor: " + e.getMessage(), e);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.mantisrx.server.core.BaseService;
import io.mantisrx.server.core.CoreConfiguration;
import io.mantisrx.server.core.Service;
import io.mantisrx.server.core.master.MasterDescription;
import io.mantisrx.server.core.master.MasterMonitor;
import io.mantisrx.server.core.master.ZookeeperMasterMonitor;
import io.mantisrx.shaded.com.google.common.util.concurrent.MoreExecutors;
Expand All @@ -38,7 +37,7 @@


/**
* This {@link Service} implementation is responsible for managing the lifecycle of a {@link org.apache.curator.framework.CuratorFramework}
* This {@link Service} implementation is responsible for managing the lifecycle of a {@link io.mantisrx.shaded.org.apache.curator.framework.CuratorFramework}
* instance.
*/
public class CuratorService extends BaseService {
Expand All @@ -48,12 +47,10 @@ public class CuratorService extends BaseService {

private final CuratorFramework curator;
private final ZookeeperMasterMonitor masterMonitor;
private final CoreConfiguration configs;
private final Gauge isConnectedGauge;

public CuratorService(CoreConfiguration configs, MasterDescription initialMasterDescription) {
public CuratorService(CoreConfiguration configs) {
super(false);
this.configs = configs;
Metrics m = new Metrics.Builder()
.name(CuratorService.class.getCanonicalName())
.addGauge(isConnectedGaugeName)
Expand All @@ -70,8 +67,7 @@ public CuratorService(CoreConfiguration configs, MasterDescription initialMaster

masterMonitor = new ZookeeperMasterMonitor(
curator,
ZKPaths.makePath(configs.getZkRoot(), configs.getLeaderAnnouncementPath()),
initialMasterDescription);
ZKPaths.makePath(configs.getZkRoot(), configs.getLeaderAnnouncementPath()));
}

private void setupCuratorListener() {
Expand All @@ -93,10 +89,14 @@ public void stateChanged(CuratorFramework client, ConnectionState newState) {

@Override
public void start() {
isConnectedGauge.set(0L);
setupCuratorListener();
curator.start();
masterMonitor.start();
try {
isConnectedGauge.set(0L);
setupCuratorListener();
curator.start();
masterMonitor.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public MasterMain(ConfigurationFactory configFactory, AuditEventSubscriber audit
mantisServices.addService(new MasterApiAkkaService(new LocalMasterMonitor(leadershipManager.getDescription()), leadershipManager.getDescription(), jobClusterManagerActor, statusEventBrokerActor,
resourceClusters, config.getApiPort(), storageProvider, schedulingService, lifecycleEventPublisher, leadershipManager, agentClusterOps));
} else {
curatorService = new CuratorService(this.config, leadershipManager.getDescription());
curatorService = new CuratorService(this.config);
curatorService.start();
mantisServices.addService(createLeaderElector(curatorService, leadershipManager));
mantisServices.addService(new MasterApiAkkaService(curatorService.getMasterMonitor(), leadershipManager.getDescription(), jobClusterManagerActor, statusEventBrokerActor,
Expand Down