Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Curator framework improvements #2225

Merged
merged 6 commits into from
Aug 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.collect.Lists;
import com.hubspot.singularity.config.SingularityConfiguration;
import com.hubspot.singularity.config.ZooKeeperConfiguration;
import com.hubspot.singularity.data.LoggingCuratorFramework;
import com.hubspot.singularity.data.curator.SingularityReadOnlyCuratorFramework;
import com.hubspot.singularity.data.curator.ZkClientsLoadDistributor;
import java.util.List;
import javax.inject.Inject;
import javax.inject.Provider;
import org.apache.curator.framework.CuratorFramework;
Expand All @@ -27,10 +31,46 @@ public SingularityCuratorProvider(
final SingularityManagedScheduledExecutorServiceFactory executorServiceFactory
) {
checkNotNull(configuration, "configuration is null");
if (configuration.useLoggingCuratorFramework()) {
LOG.trace("Creating a logging curator framework");
this.curatorFramework =
new LoggingCuratorFramework(
getCuratorFrameworkForSingularityInstanceType(configuration),
executorServiceFactory
);
} else {
this.curatorFramework =
getCuratorFrameworkForSingularityInstanceType(configuration);
}
}

private CuratorFramework getCuratorFrameworkForSingularityInstanceType(
SingularityConfiguration configuration
) {
ZooKeeperConfiguration zookeeperConfig = configuration.getZooKeeperConfiguration();

CuratorFramework tempCuratorFramework = CuratorFrameworkFactory
if (configuration.isReadOnlyInstance()) {
LOG.trace("Creating multiple logging curator frameworks for read-only instance");
int numberOfCuratorFrameworks = zookeeperConfig.getCuratorFrameworkInstances();
List<CuratorFramework> curatorFrameworks = Lists.newArrayListWithExpectedSize(
numberOfCuratorFrameworks
);
for (int i = 0; i < numberOfCuratorFrameworks; i++) {
curatorFrameworks.add(buildCuratorFrameworkInstance(zookeeperConfig));
}
return new SingularityReadOnlyCuratorFramework(
new ZkClientsLoadDistributor(curatorFrameworks)
);
} else {
LOG.trace("Creating curator framework for leader instance");
return buildCuratorFrameworkInstance(zookeeperConfig);
}
}

private CuratorFramework buildCuratorFrameworkInstance(
ZooKeeperConfiguration zookeeperConfig
) {
return CuratorFrameworkFactory
.builder()
.defaultData(null)
.connectionStateErrorPolicy(new SessionConnectionStateErrorPolicy())
Expand All @@ -45,13 +85,6 @@ public SingularityCuratorProvider(
)
.namespace(zookeeperConfig.getZkNamespace())
.build();

if (configuration.useLoggingCuratorFramework()) {
tempCuratorFramework =
new LoggingCuratorFramework(tempCuratorFramework, executorServiceFactory);
}

this.curatorFramework = tempCuratorFramework;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ public class ZooKeeperConfiguration {
@NotNull
private int retryMaxTries = 3;

@NotNull
private int curatorFrameworkInstances = 3;

@NotNull
private String zkNamespace;

Expand Down Expand Up @@ -80,4 +83,12 @@ public void setAbortAfterConnectionLostForMillis(
) {
this.abortAfterConnectionLostForMillis = abortAfterConnectionLostForMillis;
}

public int getCuratorFrameworkInstances() {
return curatorFrameworkInstances;
}

public void setCuratorFrameworkInstances(int curatorFrameworkInstances) {
this.curatorFrameworkInstances = curatorFrameworkInstances;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
package com.hubspot.singularity.data.curator;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.curator.CuratorZookeeperClient;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.WatcherRemoveCuratorFramework;
import org.apache.curator.framework.api.CreateBuilder;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.DeleteBuilder;
import org.apache.curator.framework.api.ExistsBuilder;
import org.apache.curator.framework.api.GetACLBuilder;
import org.apache.curator.framework.api.GetChildrenBuilder;
import org.apache.curator.framework.api.GetConfigBuilder;
import org.apache.curator.framework.api.GetDataBuilder;
import org.apache.curator.framework.api.ReconfigBuilder;
import org.apache.curator.framework.api.RemoveWatchesBuilder;
import org.apache.curator.framework.api.SetACLBuilder;
import org.apache.curator.framework.api.SetDataBuilder;
import org.apache.curator.framework.api.SyncBuilder;
import org.apache.curator.framework.api.UnhandledErrorListener;
import org.apache.curator.framework.api.transaction.CuratorMultiTransaction;
import org.apache.curator.framework.api.transaction.CuratorTransaction;
import org.apache.curator.framework.api.transaction.TransactionOp;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.curator.framework.listen.Listenable;
import org.apache.curator.framework.schema.SchemaSet;
import org.apache.curator.framework.state.ConnectionStateErrorPolicy;
import org.apache.curator.framework.state.ConnectionStateListener;
import org.apache.curator.utils.EnsurePath;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier;

public class SingularityReadOnlyCuratorFramework implements CuratorFramework {
private final ZkClientsLoadDistributor distributor;

public SingularityReadOnlyCuratorFramework(ZkClientsLoadDistributor distributor) {
this.distributor = distributor;
}

@Override
public void start() {
distributor.start();
}

@Override
public void close() {
distributor.close();
}

@Override
public CuratorFrameworkState getState() {
return distributor.getCuratorFramework().getState();
}

@Override
public boolean isStarted() {
return distributor.isStarted();
}

@Override
public CreateBuilder create() {
return distributor.getCuratorFramework().create();
}

@Override
public DeleteBuilder delete() {
return distributor.getCuratorFramework().delete();
}

@Override
public ExistsBuilder checkExists() {
return distributor.getCuratorFramework().checkExists();
}

@Override
public GetDataBuilder getData() {
return distributor.getCuratorFramework().getData();
}

@Override
public SetDataBuilder setData() {
return distributor.getCuratorFramework().setData();
}

@Override
public GetChildrenBuilder getChildren() {
return distributor.getCuratorFramework().getChildren();
}

@Override
public GetACLBuilder getACL() {
return distributor.getCuratorFramework().getACL();
}

@Override
public SetACLBuilder setACL() {
return distributor.getCuratorFramework().setACL();
}

@Override
public ReconfigBuilder reconfig() {
return distributor.getCuratorFramework().reconfig();
}

@Override
public GetConfigBuilder getConfig() {
return distributor.getCuratorFramework().getConfig();
}

@Override
public CuratorTransaction inTransaction() {
return distributor.getCuratorFramework().inTransaction();
}

@Override
public CuratorMultiTransaction transaction() {
return distributor.getCuratorFramework().transaction();
}

@Override
public TransactionOp transactionOp() {
return distributor.getCuratorFramework().transactionOp();
}

@Override
public void sync(String path, Object backgroundContextObject) {
distributor.getCuratorFramework().sync(path, backgroundContextObject);
}

@Override
public void createContainers(String path) throws Exception {
distributor.getCuratorFramework().createContainers(path);
}

@Override
public SyncBuilder sync() {
return distributor.getCuratorFramework().sync();
}

@Override
public RemoveWatchesBuilder watches() {
return distributor.getCuratorFramework().watches();
}

@Override
public Listenable<ConnectionStateListener> getConnectionStateListenable() {
return distributor.getCuratorFramework().getConnectionStateListenable();
}

@Override
public Listenable<CuratorListener> getCuratorListenable() {
return distributor.getCuratorFramework().getCuratorListenable();
}

@Override
public Listenable<UnhandledErrorListener> getUnhandledErrorListenable() {
return distributor.getCuratorFramework().getUnhandledErrorListenable();
}

@Override
public CuratorFramework nonNamespaceView() {
return distributor.getCuratorFramework().nonNamespaceView();
}

@Override
public CuratorFramework usingNamespace(String newNamespace) {
return distributor.getCuratorFramework().usingNamespace(newNamespace);
}

@Override
public String getNamespace() {
return distributor.getCuratorFramework().getNamespace();
}

@Override
public CuratorZookeeperClient getZookeeperClient() {
return distributor.getCuratorFramework().getZookeeperClient();
}

@Override
public EnsurePath newNamespaceAwareEnsurePath(String path) {
return distributor.getCuratorFramework().newNamespaceAwareEnsurePath(path);
}

@Override
public void clearWatcherReferences(Watcher watcher) {
distributor.getCuratorFramework().clearWatcherReferences(watcher);
}

@Override
public boolean blockUntilConnected(int maxWaitTime, TimeUnit units)
throws InterruptedException {
return false;
}

@Override
public void blockUntilConnected() throws InterruptedException {}

@Override
public WatcherRemoveCuratorFramework newWatcherRemoveCuratorFramework() {
return distributor.getCuratorFramework().newWatcherRemoveCuratorFramework();
}

@Override
public ConnectionStateErrorPolicy getConnectionStateErrorPolicy() {
return distributor.getCuratorFramework().getConnectionStateErrorPolicy();
}

@Override
public QuorumVerifier getCurrentConfig() {
return distributor.getCuratorFramework().getCurrentConfig();
}

@Override
public SchemaSet getSchemaSet() {
return distributor.getCuratorFramework().getSchemaSet();
}

@Override
public boolean isZk34CompatibilityMode() {
return distributor.getCuratorFramework().isZk34CompatibilityMode();
}

@Override
public CompletableFuture<Void> runSafe(Runnable runnable) {
return distributor.getCuratorFramework().runSafe(runnable);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package com.hubspot.singularity.data.curator;

import com.google.common.collect.Iterators;
import java.util.Iterator;
import java.util.List;
import org.apache.curator.framework.CuratorFramework;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZkClientsLoadDistributor {
private static final Logger LOG = LoggerFactory.getLogger(
ZkClientsLoadDistributor.class
);

private final List<CuratorFramework> curatorFrameworks;
private final Iterator<CuratorFramework> iterator;

public ZkClientsLoadDistributor(List<CuratorFramework> curatorFrameworks) {
this.curatorFrameworks = curatorFrameworks;
this.iterator = Iterators.cycle(curatorFrameworks);
}

public void start() {
for (CuratorFramework framework : curatorFrameworks) {
try {
framework.start();
} catch (Exception e) {
LOG.warn("Error starting framework: ");
}
}
}

public void close() {
for (CuratorFramework framework : curatorFrameworks) {
try {
framework.close();
} catch (Exception e) {
LOG.warn("Error starting framework: ");
}
}
}

public CuratorFramework getCuratorFramework() {
return iterator.next();
}

public List<CuratorFramework> getAll() {
return this.curatorFrameworks;
}

public boolean isStarted() {
boolean started = true;
for (CuratorFramework framework : curatorFrameworks) {
started = started & framework.isStarted();
}
return started;
}
}