Skip to content

Commit

Permalink
Merge branch '2.1' into 3.1
Browse files Browse the repository at this point in the history
  • Loading branch information
dlmarion committed Jan 31, 2025
2 parents e0478b4 + f8fb872 commit 9fee599
Show file tree
Hide file tree
Showing 35 changed files with 3,074 additions and 377 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,23 @@ public static class HAServiceLockWatcher implements AccumuloLockWatcher {
private static final Logger LOG = LoggerFactory.getLogger(HAServiceLockWatcher.class);

private final String serviceName;
private final Supplier<Boolean> shutdownComplete;
private volatile boolean acquiredLock = false;
private volatile boolean failedToAcquireLock = false;

public HAServiceLockWatcher(String serviceName) {
public HAServiceLockWatcher(String serviceName, Supplier<Boolean> shutdownComplete) {
this.serviceName = serviceName;
this.shutdownComplete = shutdownComplete;
}

@Override
public void lostLock(LockLossReason reason) {
Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
if (shutdownComplete.get()) {
LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.",
serviceName, reason);
} else {
Halt.halt(serviceName + " lock in zookeeper lost (reason = " + reason + "), exiting!", -1);
}
}

@Override
Expand Down Expand Up @@ -122,24 +129,27 @@ public static class ServiceLockWatcher implements LockWatcher {
private static final Logger LOG = LoggerFactory.getLogger(ServiceLockWatcher.class);

private final String serviceName;
private final Supplier<Boolean> shuttingDown;
private final Supplier<Boolean> shutdownComplete;
private final Consumer<String> lostLockAction;

public ServiceLockWatcher(String serviceName, Supplier<Boolean> shuttingDown,
public ServiceLockWatcher(String serviceName, Supplier<Boolean> shutdownComplete,
Consumer<String> lostLockAction) {
this.serviceName = serviceName;
this.shuttingDown = shuttingDown;
this.shutdownComplete = shutdownComplete;
this.lostLockAction = lostLockAction;
}

@Override
public void lostLock(final LockLossReason reason) {
Halt.halt(1, () -> {
if (!shuttingDown.get()) {
if (shutdownComplete.get()) {
LOG.warn("{} lost lock (reason = {}), not halting because shutdown is complete.",
serviceName, reason);
} else {
Halt.halt(1, () -> {
LOG.error("{} lost lock (reason = {}), exiting.", serviceName, reason);
}
lostLockAction.accept(serviceName);
});
lostLockAction.accept(serviceName);
});
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.core.rpc.clients;

import java.io.UncheckedIOException;
import java.net.UnknownHostException;

import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.process.thrift.ServerProcessService.Client;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;

import com.google.common.net.HostAndPort;

public class ServerProcessServiceThriftClient extends ThriftClientTypes<Client> {

protected ServerProcessServiceThriftClient(String serviceName) {
super(serviceName, new Client.Factory());
}

public Client getServerProcessConnection(ClientContext context, Logger log, String hostname,
int port) {
HostAndPort serverProcess = HostAndPort.fromParts(hostname, port);
try {
// Manager requests can take a long time: don't ever time out
return ThriftUtil.getClientNoTimeout(this, serverProcess, context);
} catch (TTransportException tte) {
Throwable cause = tte.getCause();
if (cause instanceof UnknownHostException) {
// do not expect to recover from this
throw new UncheckedIOException((UnknownHostException) cause);
}
log.debug("Failed to connect to process at " + serverProcess + ", will retry... ", tte);
return null;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ public abstract class ThriftClientTypes<C extends TServiceClient> {
public static final TabletManagementClientServiceThriftClient TABLET_MGMT =
new TabletManagementClientServiceThriftClient("tablet");

public static final ServerProcessServiceThriftClient SERVER_PROCESS =
new ServerProcessServiceThriftClient("process");

/**
* execute method with supplied client returning object of type R
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public enum ThreadPoolNames {
TSERVER_ASSIGNMENT_POOL("accumulo.pool.tserver.assignment"),
TSERVER_MIGRATIONS_POOL("accumulo.pool.tserver.migrations"),
TSERVER_MINOR_COMPACTOR_POOL("accumulo.pool.tserver.minor.compactor"),
TSERVER_SHUTDOWN_UNLOAD_TABLET_POOL("accumulo.pool.tserver.shutdown.tablet.unload"),
TSERVER_SUMMARY_FILE_RETRIEVER_POOL("accumulo.pool.tserver.summary.file.retriever.pool"),
TSERVER_SUMMARY_PARTITION_POOL("accumulo.pool.tserver.summary.partition"),
TSERVER_SUMMARY_REMOTE_POOL("accumulo.pool.tserver.summary.remote"),
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scripts/generate-thrift.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
[[ -z $REQUIRED_THRIFT_VERSION ]] && REQUIRED_THRIFT_VERSION='0.17.0'
[[ -z $INCLUDED_MODULES ]] && INCLUDED_MODULES=()
[[ -z $BASE_OUTPUT_PACKAGE ]] && BASE_OUTPUT_PACKAGE='org.apache.accumulo.core'
[[ -z $PACKAGES_TO_GENERATE ]] && PACKAGES_TO_GENERATE=(gc manager tabletserver securityImpl clientImpl dataImpl compaction tabletingest tablet tabletscan)
[[ -z $PACKAGES_TO_GENERATE ]] && PACKAGES_TO_GENERATE=(process gc manager tabletserver securityImpl clientImpl dataImpl compaction tabletingest tablet tabletscan)
[[ -z $BUILD_DIR ]] && BUILD_DIR='target'
[[ -z $LANGUAGES_TO_GENERATE ]] && LANGUAGES_TO_GENERATE=(java)
[[ -z $FINAL_DIR ]] && FINAL_DIR='src/main'
Expand Down
1 change: 1 addition & 0 deletions core/src/main/spotbugs/exclude-filter.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<Package name="org.apache.accumulo.core.dataImpl.thrift" />
<Package name="org.apache.accumulo.core.gc.thrift" />
<Package name="org.apache.accumulo.core.manager.thrift" />
<Package name="org.apache.accumulo.core.process.thrift" />
<Package name="org.apache.accumulo.core.securityImpl.thrift" />
<Package name="org.apache.accumulo.core.tablet.thrift" />
<Package name="org.apache.accumulo.core.tabletingest.thrift" />
Expand Down
Loading

0 comments on commit 9fee599

Please sign in to comment.