Skip to content

Commit

Permalink
Reduce the number of ZooSession objects created
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Feb 7, 2025
1 parent 31b43ab commit 1cac6d4
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 190 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ public class ClientContext implements AccumuloClient {

private final AtomicBoolean zooKeeperOpened = new AtomicBoolean(false);
private final Supplier<ZooSession> zooSession;
private final Supplier<InstanceId> instanceId;

private void ensureOpen() {
if (closed.get()) {
Expand Down Expand Up @@ -230,6 +231,9 @@ public ClientContext(SingletonReservation reservation, ClientInfo info,
return zk;
});

this.instanceId =
memoize(() -> ZooUtil.getInstanceId(this.zooSession.get(), getInstanceName()));

this.zooCache = memoize(() -> new ZooCache(getZooSession()));
this.accumuloConf = serverConf;
timeoutSupplier = memoizeWithExpiration(
Expand Down Expand Up @@ -550,7 +554,7 @@ public List<String> getManagerLocations() {
*/
public InstanceId getInstanceID() {
ensureOpen();
return info.getInstanceId();
return this.instanceId.get();
}

public String getZooKeeperRoot() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;

Expand All @@ -42,11 +41,6 @@ public interface ClientInfo {
*/
String getInstanceName();

/**
* @return Accumulo instanceId
*/
InstanceId getInstanceId();

/**
* @return a Supplier for creating new ZooKeeper client instances based on the configuration
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.zookeeper.ZooSession;
import org.apache.hadoop.conf.Configuration;

Expand All @@ -51,7 +49,6 @@ public class ClientInfoImpl implements ClientInfo {
// suppliers for lazily loading
private final Supplier<AuthenticationToken> tokenSupplier;
private final Supplier<Configuration> hadoopConf;
private final Supplier<InstanceId> instanceId;
private final BiFunction<String,String,ZooSession> zooSessionForName;

public ClientInfoImpl(Properties properties, Optional<AuthenticationToken> tokenOpt) {
Expand All @@ -62,24 +59,13 @@ public ClientInfoImpl(Properties properties, Optional<AuthenticationToken> token
this.hadoopConf = memoize(Configuration::new);
this.zooSessionForName = (name, rootPath) -> new ZooSession(name, getZooKeepers() + rootPath,
getZooKeepersSessionTimeOut(), null);
this.instanceId = memoize(() -> {
try (var zk =
getZooKeeperSupplier(getClass().getSimpleName() + ".getInstanceId()", "").get()) {
return ZooUtil.getInstanceId(zk, getInstanceName());
}
});
}

@Override
public String getInstanceName() {
return getString(ClientProperty.INSTANCE_NAME);
}

@Override
public InstanceId getInstanceId() {
return instanceId.get();
}

@Override
public Supplier<ZooSession> getZooKeeperSupplier(String clientName, String rootPath) {
return () -> zooSessionForName.apply(requireNonNull(clientName), requireNonNull(rootPath));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,8 @@ public synchronized void stop(ServerType server, String hostname) throws IOExcep
try {
cluster.stopProcessWithTimeout(managerProcess, 30, TimeUnit.SECONDS);
try {
new ZooZap().zap(cluster.getServerContext().getSiteConfiguration(), "-manager");
new ZooZap().zap(cluster.getServerContext().getZooSession(),
cluster.getServerContext().getSiteConfiguration(), "-manager");
} catch (RuntimeException e) {
log.error("Error zapping Manager zookeeper lock", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -642,79 +642,76 @@ private void verifyUp() throws InterruptedException, IOException {
waitForProcessStart(tsp, "TabletServer" + tsExpectedCount);
}

String secret = getSiteConfiguration().get(Property.INSTANCE_SECRET);
String instanceNamePath = Constants.ZROOT + Constants.ZINSTANCES + "/" + getInstanceName();
try (var zk = new ZooSession(MiniAccumuloClusterImpl.class.getSimpleName() + ".verifyUp()",
getZooKeepers(), 60000, secret)) {
var rdr = zk.asReader();
InstanceId instanceId = null;
for (int i = 0; i < numTries; i++) {
try {
// make sure it's up enough we can perform operations successfully
rdr.sync("/");
// wait for the instance to be created
instanceId = InstanceId.of(new String(rdr.getData(instanceNamePath), UTF_8));
break;
} catch (KeeperException e) {
log.warn("Error trying to read instance id from zookeeper: {}", e.getMessage());
log.debug("Unable to read instance id from zookeeper.", e);
}
Thread.sleep(1000);
}

if (instanceId == null) {
try {
log.warn("******* COULD NOT FIND INSTANCE ID - DUMPING ZK ************");
log.warn("Connected to ZooKeeper: {}", getZooKeepers());
log.warn("Looking for instanceId at {}", instanceNamePath);
ZKUtil.visitSubTreeDFS(zk, Constants.ZROOT, false,
(rc, path, ctx, name) -> log.warn("{}", path));
log.warn("******* END ZK DUMP ************");
} catch (KeeperException e) {
log.error("Error dumping zk", e);
}
throw new IllegalStateException("Unable to find instance id from zookeeper.");
}

String rootPath = ZooUtil.getRoot(instanceId);
int tsActualCount = 0;
ZooSession zk = getServerContext().getZooSession();
var rdr = zk.asReader();
InstanceId instanceId = null;
for (int i = 0; i < numTries; i++) {
try {
while (tsActualCount < tsExpectedCount) {
tsActualCount = 0;
String tserverPath = rootPath + Constants.ZTSERVERS;
for (String child : rdr.getChildren(tserverPath)) {
if (rdr.getChildren(tserverPath + "/" + child).isEmpty()) {
log.info("TServer " + tsActualCount + " not yet present in ZooKeeper");
} else {
tsActualCount++;
log.info("TServer " + tsActualCount + " present in ZooKeeper");
}
}
Thread.sleep(500);
}
// make sure it's up enough we can perform operations successfully
rdr.sync("/");
// wait for the instance to be created
instanceId = InstanceId.of(new String(rdr.getData(instanceNamePath), UTF_8));
break;
} catch (KeeperException e) {
throw new IllegalStateException("Unable to read TServer information from zookeeper.", e);
log.warn("Error trying to read instance id from zookeeper: {}", e.getMessage());
log.debug("Unable to read instance id from zookeeper.", e);
}
Thread.sleep(1000);
}

if (instanceId == null) {
try {
while (rdr.getChildren(rootPath + Constants.ZMANAGER_LOCK).isEmpty()) {
log.info("Manager not yet present in ZooKeeper");
Thread.sleep(500);
}
log.warn("******* COULD NOT FIND INSTANCE ID - DUMPING ZK ************");
log.warn("Connected to ZooKeeper: {}", getZooKeepers());
log.warn("Looking for instanceId at {}", instanceNamePath);
ZKUtil.visitSubTreeDFS(zk, Constants.ZROOT, false,
(rc, path, ctx, name) -> log.warn("{}", path));
log.warn("******* END ZK DUMP ************");
} catch (KeeperException e) {
throw new IllegalStateException("Unable to read Manager information from zookeeper.", e);
log.error("Error dumping zk", e);
}
throw new IllegalStateException("Unable to find instance id from zookeeper.");
}

try {
while (rdr.getChildren(rootPath + Constants.ZGC_LOCK).isEmpty()) {
log.info("GC not yet present in ZooKeeper");
Thread.sleep(500);
String rootPath = ZooUtil.getRoot(instanceId);
int tsActualCount = 0;
try {
while (tsActualCount < tsExpectedCount) {
tsActualCount = 0;
String tserverPath = rootPath + Constants.ZTSERVERS;
for (String child : rdr.getChildren(tserverPath)) {
if (rdr.getChildren(tserverPath + "/" + child).isEmpty()) {
log.info("TServer " + tsActualCount + " not yet present in ZooKeeper");
} else {
tsActualCount++;
log.info("TServer " + tsActualCount + " present in ZooKeeper");
}
}
} catch (KeeperException e) {
throw new IllegalStateException("Unable to read GC information from zookeeper.", e);
Thread.sleep(500);
}
} catch (KeeperException e) {
throw new IllegalStateException("Unable to read TServer information from zookeeper.", e);
}

try {
while (rdr.getChildren(rootPath + Constants.ZMANAGER_LOCK).isEmpty()) {
log.info("Manager not yet present in ZooKeeper");
Thread.sleep(500);
}
} catch (KeeperException e) {
throw new IllegalStateException("Unable to read Manager information from zookeeper.", e);
}

try {
while (rdr.getChildren(rootPath + Constants.ZGC_LOCK).isEmpty()) {
log.info("GC not yet present in ZooKeeper");
Thread.sleep(500);
}
} catch (KeeperException e) {
throw new IllegalStateException("Unable to read GC information from zookeeper.", e);
}

}

private List<String> buildRemoteDebugParams(int port) {
Expand Down Expand Up @@ -795,18 +792,18 @@ public synchronized void stop() throws IOException, InterruptedException {
// is restarted, then the processes will start right away
// and not wait for the old locks to be cleaned up.
try {
new ZooZap().zap(getServerContext().getSiteConfiguration(), "-manager",
"-compaction-coordinators", "-tservers", "-compactors", "-sservers");
new ZooZap().zap(getServerContext().getZooSession(),
getServerContext().getSiteConfiguration(), "-manager", "-compaction-coordinators",
"-tservers", "-compactors", "-sservers");
} catch (RuntimeException e) {
log.error("Error zapping zookeeper locks", e);
}
control.stop(ServerType.ZOOKEEPER, null);

// Clear the location of the servers in ZooCache.
// When ZooKeeper was stopped in the previous method call,
// the local ZooKeeper watcher did not fire. If MAC is
// restarted, then ZooKeeper will start on the same port with
// the same data, but no Watchers will fire.
// If MAC is restarted, then ZooKeeper will start
// on the same port with the same data and the
// start of the server processes will wait until
// the pre-existing ephemeral locks time out.
boolean startCalled = true;
try {
getServerContext().getZooKeeperRoot();
Expand All @@ -815,6 +812,10 @@ public synchronized void stop() throws IOException, InterruptedException {
startCalled = false;
}
}
// It's possible that the call above to getZooKeeperRoot will
// try to connect to ZooKeeper to get the instanceId. Stop
// ZooKeeper after the call above.
control.stop(ServerType.ZOOKEEPER, null);
if (startCalled) {
final ServerContext ctx = getServerContext();
final String zRoot = ctx.getZooKeeperRoot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,15 @@ private ServerContext(ServerInfo info) {
* Used during initialization to set the instance name and ID.
*/
public static ServerContext initialize(SiteConfiguration siteConfig, String instanceName,
InstanceId instanceID) {
return new ServerContext(ServerInfo.initialize(siteConfig, instanceName, instanceID));
InstanceId instanceId) {
return new ServerContext(ServerInfo.initialize(siteConfig, instanceName, instanceId)) {

@Override
public InstanceId getInstanceID() {
return instanceId;
}

};
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Optional;
import java.util.Properties;
import java.util.function.BiFunction;
import java.util.function.Function;
Expand Down Expand Up @@ -57,18 +58,17 @@ public class ServerInfo implements ClientInfo {
// set things up using the config file, the instanceId from HDFS, and ZK for the instanceName
static ServerInfo fromServerConfig(SiteConfiguration siteConfig) {
final Function<ServerInfo,String> instanceNameFromZk = si -> {
InstanceId iid = VolumeManager.getInstanceIDFromHdfs(
si.getServerDirs().getInstanceIdLocation(si.getVolumeManager().getFirst()),
si.getHadoopConf());
try (var zk =
si.getZooKeeperSupplier(ServerInfo.class.getSimpleName() + ".getInstanceName()", "")
.get()) {
return ZooUtil.getInstanceName(zk, si.getInstanceId());
return ZooUtil.getInstanceName(zk, iid);
}
};
final Function<ServerInfo,
InstanceId> instanceIdFromHdfs = si -> VolumeManager.getInstanceIDFromHdfs(
si.getServerDirs().getInstanceIdLocation(si.getVolumeManager().getFirst()),
si.getHadoopConf());
return new ServerInfo(siteConfig, GET_ZK_HOSTS_FROM_CONFIG, GET_ZK_TIMEOUT_FROM_CONFIG,
instanceNameFromZk, instanceIdFromHdfs);
instanceNameFromZk, Optional.empty());
}

// set things up using a provided instanceName and InstanceId to initialize the system, but still
Expand All @@ -79,15 +79,14 @@ static ServerInfo initialize(SiteConfiguration siteConfig, String instanceName,
requireNonNull(instanceName);
requireNonNull(instanceId);
return new ServerInfo(siteConfig, GET_ZK_HOSTS_FROM_CONFIG, GET_ZK_TIMEOUT_FROM_CONFIG,
si -> instanceName, si -> instanceId);
si -> instanceName, Optional.of(instanceId));
}

// set things up using the config file, and the client config for a server-side CLI utility
static ServerInfo fromServerAndClientConfig(SiteConfiguration siteConfig, ClientInfo info) {
// ClientInfo.getInstanceId looks up the ID in ZK using the provided instance name
return new ServerInfo(siteConfig, si -> info.getZooKeepers(),
si -> info.getZooKeepersSessionTimeOut(), si -> info.getInstanceName(),
si -> info.getInstanceId());
si -> info.getZooKeepersSessionTimeOut(), si -> info.getInstanceName(), Optional.empty());
}

static ServerInfo forTesting(SiteConfiguration siteConfig, String instanceName, String zooKeepers,
Expand All @@ -108,7 +107,6 @@ static ServerInfo forTesting(SiteConfiguration siteConfig, String instanceName,
private final Supplier<ServerDirs> serverDirs;
private final Supplier<String> zooKeepers;
private final Supplier<Integer> zooKeepersSessionTimeOut; // can't memoize IntSupplier
private final Supplier<InstanceId> instanceId;
private final Supplier<String> instanceName;
private final Supplier<Credentials> credentials;
private final BiFunction<String,String,ZooSession> zooSessionForName;
Expand All @@ -121,13 +119,12 @@ static ServerInfo forTesting(SiteConfiguration siteConfig, String instanceName,
// direction only
private ServerInfo(SiteConfiguration siteConfig, Function<ServerInfo,String> zkHostsFunction,
ToIntFunction<ServerInfo> zkTimeoutFunction, Function<ServerInfo,String> instanceNameFunction,
Function<ServerInfo,InstanceId> instanceIdFunction) {
Optional<InstanceId> instanceId) {
SingletonManager.setMode(Mode.SERVER);
this.siteConfig = requireNonNull(siteConfig);
requireNonNull(zkHostsFunction);
requireNonNull(zkTimeoutFunction);
requireNonNull(instanceNameFunction);
requireNonNull(instanceIdFunction);

this.hadoopConf = memoize(Configuration::new);
this.volumeManager = memoize(() -> {
Expand All @@ -138,16 +135,20 @@ private ServerInfo(SiteConfiguration siteConfig, Function<ServerInfo,String> zkH
}
});
this.serverDirs = memoize(() -> new ServerDirs(getSiteConfiguration(), getHadoopConf()));
this.credentials =
memoize(() -> SystemCredentials.get(getInstanceId(), getSiteConfiguration()));
this.credentials = memoize(() -> {
InstanceId iid = instanceId.isPresent() ? instanceId.orElseThrow()
: VolumeManager.getInstanceIDFromHdfs(
getServerDirs().getInstanceIdLocation(getVolumeManager().getFirst()),
getHadoopConf());
return SystemCredentials.get(iid, getSiteConfiguration());
});

this.zooSessionForName = (name, rootPath) -> new ZooSession(name, getZooKeepers() + rootPath,
getZooKeepersSessionTimeOut(), getSiteConfiguration().get(Property.INSTANCE_SECRET));

// from here on, set up the suppliers based on what was passed in, to support different cases
this.zooKeepers = memoize(() -> zkHostsFunction.apply(this));
this.zooKeepersSessionTimeOut = memoize(() -> zkTimeoutFunction.applyAsInt(this));
this.instanceId = memoize(() -> instanceIdFunction.apply(this));
this.instanceName = memoize(() -> instanceNameFunction.apply(this));
}

Expand All @@ -159,11 +160,6 @@ public VolumeManager getVolumeManager() {
return volumeManager.get();
}

@Override
public InstanceId getInstanceId() {
return instanceId.get();
}

@Override
public Supplier<ZooSession> getZooKeeperSupplier(String clientName, String rootPath) {
return () -> zooSessionForName.apply(requireNonNull(clientName), requireNonNull(rootPath));
Expand Down
Loading

0 comments on commit 1cac6d4

Please sign in to comment.