Skip to content

Commit

Permalink
Adds some missing FATE functionality (#5263)
Browse files Browse the repository at this point in the history
- Previously, if a user configured fewer FATE threads to work on transactions (via MANAGER_FATE_THREADPOOL_SIZE property), the pool size would not actually be decreased. These changes safely stop excess workers in the case where the property value is decreased.
- Added test FatePoolResizeIT (tests both META and USER transactions: MetaFatePoolResizeIT and UserFatePoolResizeIT) to ensure the pool size is correctly increased and decreased with configuration changes.
- Fate.shutdown() was not waiting on termination of the fate pool watcher when needed. Added the missing wait.

---------

Co-authored-by: Keith Turner <kturner@apache.org>
  • Loading branch information
kevinrr888 and keith-turner authored Jan 21, 2025
1 parent 46b934f commit ee81a3c
Show file tree
Hide file tree
Showing 5 changed files with 402 additions and 53 deletions.
150 changes: 100 additions & 50 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@
import static org.apache.accumulo.core.util.ShutdownUtil.isIOException;

import java.time.Duration;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedTransferQueue;
Expand Down Expand Up @@ -64,6 +67,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

/**
* Fault tolerant executor
*/
Expand All @@ -76,6 +82,7 @@ public class Fate<T> {
private final T environment;
private final ScheduledThreadPoolExecutor fatePoolWatcher;
private final ExecutorService transactionExecutor;
private final Set<TransactionRunner> runningTxRunners;
private final ExecutorService deadResCleanerExecutor;

private static final EnumSet<TStatus> FINISHED_STATES = EnumSet.of(FAILED, SUCCESSFUL, UNKNOWN);
Expand Down Expand Up @@ -179,9 +186,12 @@ public void run() {
}

private class TransactionRunner implements Runnable {
// used to signal a TransactionRunner to stop in the case where there are too many running
// i.e., the property for the pool size decreased and we have excess TransactionRunners
private final AtomicBoolean stop = new AtomicBoolean(false);

private Optional<FateTxStore<T>> reserveFateTx() throws InterruptedException {
while (keepRunning.get()) {
while (keepRunning.get() && !stop.get()) {
FateId unreservedFateId = workQueue.poll(100, MILLISECONDS);

if (unreservedFateId == null) {
Expand All @@ -198,55 +208,61 @@ private Optional<FateTxStore<T>> reserveFateTx() throws InterruptedException {

@Override
public void run() {
while (keepRunning.get()) {
FateTxStore<T> txStore = null;
ExecutionState state = new ExecutionState();
try {
var optionalopStore = reserveFateTx();
if (optionalopStore.isPresent()) {
txStore = optionalopStore.orElseThrow();
} else {
continue;
}
state.status = txStore.getStatus();
state.op = txStore.top();
if (state.status == FAILED_IN_PROGRESS) {
processFailed(txStore, state.op);
} else if (state.status == SUBMITTED || state.status == IN_PROGRESS) {
try {
execute(txStore, state);
if (state.op != null && state.deferTime != 0) {
// The current op is not ready to execute
continue;
}
} catch (StackOverflowException e) {
// the op that failed to push onto the stack was never executed, so no need to undo
// it just transition to failed and undo the ops that executed
transitionToFailed(txStore, e);
continue;
} catch (Exception e) {
blockIfHadoopShutdown(txStore.getID(), e);
transitionToFailed(txStore, e);
runningTxRunners.add(this);
try {
while (keepRunning.get() && !stop.get()) {
FateTxStore<T> txStore = null;
ExecutionState state = new ExecutionState();
try {
var optionalopStore = reserveFateTx();
if (optionalopStore.isPresent()) {
txStore = optionalopStore.orElseThrow();
} else {
continue;
}
state.status = txStore.getStatus();
state.op = txStore.top();
if (state.status == FAILED_IN_PROGRESS) {
processFailed(txStore, state.op);
} else if (state.status == SUBMITTED || state.status == IN_PROGRESS) {
try {
execute(txStore, state);
if (state.op != null && state.deferTime != 0) {
// The current op is not ready to execute
continue;
}
} catch (StackOverflowException e) {
// the op that failed to push onto the stack was never executed, so no need to undo
// it just transition to failed and undo the ops that executed
transitionToFailed(txStore, e);
continue;
} catch (Exception e) {
blockIfHadoopShutdown(txStore.getID(), e);
transitionToFailed(txStore, e);
continue;
}

if (state.op == null) {
// transaction is finished
String ret = state.prevOp.getReturn();
if (ret != null) {
txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
if (state.op == null) {
// transaction is finished
String ret = state.prevOp.getReturn();
if (ret != null) {
txStore.setTransactionInfo(TxInfo.RETURN_VALUE, ret);
}
txStore.setStatus(SUCCESSFUL);
doCleanUp(txStore);
}
txStore.setStatus(SUCCESSFUL);
doCleanUp(txStore);
}
}
} catch (Exception e) {
runnerLog.error("Uncaught exception in FATE runner thread.", e);
} finally {
if (txStore != null) {
txStore.unreserve(Duration.ofMillis(state.deferTime));
} catch (Exception e) {
runnerLog.error("Uncaught exception in FATE runner thread.", e);
} finally {
if (txStore != null) {
txStore.unreserve(Duration.ofMillis(state.deferTime));
}
}
}
} finally {
log.trace("A TransactionRunner is exiting...");
Preconditions.checkState(runningTxRunners.remove(this));
}
}

Expand Down Expand Up @@ -357,6 +373,14 @@ private void undo(FateId fateId, Repo<T> op) {
}
}

private boolean flagStop() {
return stop.compareAndSet(false, true);
}

private boolean isFlaggedToStop() {
return stop.get();
}

}

protected long executeIsReady(FateId fateId, Repo<T> op) throws Exception {
Expand Down Expand Up @@ -400,15 +424,16 @@ public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createExecutorService(conf,
Property.MANAGER_FATE_THREADPOOL_SIZE, true);
this.workQueue = new LinkedTransferQueue<>();
this.runningTxRunners = Collections.synchronizedSet(new HashSet<>());
this.fatePoolWatcher =
ThreadPools.getServerThreadPools().createGeneralScheduledExecutorService(conf);
ThreadPools.watchCriticalScheduledTask(fatePoolWatcher.scheduleWithFixedDelay(() -> {
// resize the pool if the property changed
ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE);
// If the pool grew, then ensure that there is a TransactionRunner for each thread
final int configured = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
final int needed = configured - pool.getActiveCount();
final int needed = configured - runningTxRunners.size();
if (needed > 0) {
// If the pool grew, then ensure that there is a TransactionRunner for each thread
for (int i = 0; i < needed; i++) {
try {
pool.execute(new TransactionRunner());
Expand All @@ -425,11 +450,26 @@ public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
}
}
idleCountHistory.clear();
} else if (needed < 0) {
// If we need the pool to shrink, then ensure excess TransactionRunners are safely stopped.
// Flag the necessary number of TransactionRunners to safely stop when they are done work
// on a transaction.
int numFlagged =
(int) runningTxRunners.stream().filter(TransactionRunner::isFlaggedToStop).count();
int numToStop = -1 * (numFlagged + needed);
for (var runner : runningTxRunners) {
if (numToStop <= 0) {
break;
}
if (runner.flagStop()) {
log.trace("Flagging a TransactionRunner to stop...");
numToStop--;
}
}
} else {
// The property did not change, but should it based on idle Fate threads? Maintain
// count of the last X minutes of idle Fate threads. If zero 95% of the time, then suggest
// that the
// MANAGER_FATE_THREADPOOL_SIZE be increased.
// that the MANAGER_FATE_THREADPOOL_SIZE be increased.
final long interval = Math.min(60, TimeUnit.MILLISECONDS
.toMinutes(conf.getTimeInMillis(Property.MANAGER_FATE_IDLE_CHECK_INTERVAL)));
if (interval == 0) {
Expand Down Expand Up @@ -488,6 +528,11 @@ public Duration getPoolWatcherDelay() {
return POOL_WATCHER_DELAY;
}

@VisibleForTesting
public int getTxRunnersActive() {
return runningTxRunners.size();
}

// get a transaction id back to the requester before doing any work
public FateId startTransaction() {
return store.create();
Expand Down Expand Up @@ -622,10 +667,15 @@ public void shutdown(long timeout, TimeUnit timeUnit) {
if (timeout > 0) {
long start = System.nanoTime();

while ((System.nanoTime() - start) < timeUnit.toNanos(timeout)
&& (workFinder.isAlive() || !transactionExecutor.isTerminated()
|| (deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated()))) {
while ((System.nanoTime() - start) < timeUnit.toNanos(timeout) && (workFinder.isAlive()
|| !transactionExecutor.isTerminated() || !fatePoolWatcher.isTerminated()
|| (deadResCleanerExecutor != null && !deadResCleanerExecutor.isTerminated()))) {
try {
if (!fatePoolWatcher.awaitTermination(1, SECONDS)) {
log.debug("Fate {} is waiting for pool watcher to terminate", store.type());
continue;
}

if (!transactionExecutor.awaitTermination(1, SECONDS)) {
log.debug("Fate {} is waiting for worker threads to terminate", store.type());
continue;
Expand Down
14 changes: 11 additions & 3 deletions test/src/main/java/org/apache/accumulo/test/fate/FastFate.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,13 @@
import org.apache.accumulo.core.fate.Repo;

/**
* A FATE which performs the dead reservation cleanup with a much shorter delay between. Useful for
* shortening test times for tests that are waiting for a cleanup to occur.
* A FATE which performs the dead reservation cleanup and the check on the pool size with a much
* shorter delay between. Useful for shortening test times for tests that are waiting for one of
* these actions to occur.
*/
public class FastFate<T> extends Fate<T> {
private static final Duration DEAD_RES_CLEANUP_DELAY = Duration.ofSeconds(5);
private static final Duration POOL_WATCHER_DELAY = Duration.ofSeconds(5);

public FastFate(T environment, FateStore<T> store, boolean runDeadResCleaner,
Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
Expand All @@ -39,6 +42,11 @@ public FastFate(T environment, FateStore<T> store, boolean runDeadResCleaner,

@Override
public Duration getDeadResCleanupDelay() {
return Duration.ofSeconds(5);
return DEAD_RES_CLEANUP_DELAY;
}

@Override
public Duration getPoolWatcherDelay() {
return POOL_WATCHER_DELAY;
}
}
Loading

0 comments on commit ee81a3c

Please sign in to comment.