Skip to content

Commit

Permalink
Merge branch 'main' into accumulo-5014
Browse files Browse the repository at this point in the history
  • Loading branch information
cshannon committed Dec 20, 2024
2 parents 346f936 + e5370d1 commit 10830fe
Show file tree
Hide file tree
Showing 27 changed files with 1,069 additions and 717 deletions.
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,18 @@ public enum Property {
COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the accumulo compaction coordinator server.",
"2.1.0"),
COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT("compaction.coordinator.reservation.threads.root",
"1", PropertyType.COUNT,
"The number of threads used to reserve files for compaction in a tablet for the root tablet.",
"4.0.0"),
COMPACTION_COORDINATOR_RESERVATION_THREADS_META("compaction.coordinator.reservation.threads.meta",
"1", PropertyType.COUNT,
"The number of threads used to reserve files for compaction in a tablet for accumulo.metadata tablets.",
"4.0.0"),
COMPACTION_COORDINATOR_RESERVATION_THREADS_USER("compaction.coordinator.reservation.threads.user",
"64", PropertyType.COUNT,
"The number of threads used to reserve files for compaction in a tablet for user tables.",
"4.0.0"),
@Experimental
COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL(
"compaction.coordinator.compactor.dead.check.interval", "5m", PropertyType.TIMEDURATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -75,12 +76,22 @@ public FateId newRandomId(FateInstanceType instanceType) {
return FateId.from(instanceType, UUID.randomUUID());
}
};
protected static final int MAX_REPOS = 100;

// The ZooKeeper lock for the process that's running this store instance
protected final ZooUtil.LockID lockID;
protected final Predicate<ZooUtil.LockID> isLockHeld;
protected final Map<FateId,CountDownTimer> deferred;
protected final FateIdGenerator fateIdGenerator;
// the statuses required to perform operations
public static final Set<TStatus> REQ_PUSH_STATUS = Set.of(TStatus.IN_PROGRESS, TStatus.NEW);
public static final Set<TStatus> REQ_POP_STATUS =
Set.of(TStatus.FAILED_IN_PROGRESS, TStatus.SUCCESSFUL);
public static final Set<TStatus> REQ_DELETE_STATUS =
Set.of(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED);
// all but UNKNOWN
public static final Set<TStatus> REQ_FORCE_DELETE_STATUS = Set.of(TStatus.NEW, TStatus.SUBMITTED,
TStatus.SUCCESSFUL, TStatus.FAILED, TStatus.FAILED_IN_PROGRESS, TStatus.IN_PROGRESS);
private final int maxDeferred;
private final AtomicBoolean deferredOverflow = new AtomicBoolean();

Expand Down Expand Up @@ -415,7 +426,7 @@ protected void seededTx() {
unreservedRunnableCount.increment();
}

protected byte[] serializeTxInfo(Serializable so) {
protected static byte[] serializeTxInfo(Serializable so) {
if (so instanceof String) {
return ("S " + so).getBytes(UTF_8);
} else {
Expand All @@ -428,7 +439,7 @@ protected byte[] serializeTxInfo(Serializable so) {
}
}

protected Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
protected static Serializable deserializeTxInfo(TxInfo txInfo, byte[] data) {
if (data[0] == 'O') {
byte[] sera = new byte[data.length - 2];
System.arraycopy(data, 2, sera, 0, sera.length);
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
ThreadPools.resizePool(pool, conf, Property.MANAGER_FATE_THREADPOOL_SIZE);
// If the pool grew, then ensure that there is a TransactionRunner for each thread
final int configured = conf.getCount(Property.MANAGER_FATE_THREADPOOL_SIZE);
final int needed = configured - pool.getQueue().size();
final int needed = configured - pool.getActiveCount();
if (needed > 0) {
for (int i = 0; i < needed; i++) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
*/
package org.apache.accumulo.core.fate;

public class StackOverflowException extends Exception {
public class StackOverflowException extends RuntimeException {

public StackOverflowException(String msg) {
super(msg);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,8 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
private final String tableName;

private static final FateInstanceType fateInstanceType = FateInstanceType.USER;
private static final int maxRepos = 100;
private static final com.google.common.collect.Range<Integer> REPO_RANGE =
com.google.common.collect.Range.closed(1, maxRepos);
com.google.common.collect.Range.closed(1, MAX_REPOS);

public UserFateStore(ClientContext context, String tableName, ZooUtil.LockID lockID,
Predicate<ZooUtil.LockID> isLockHeld) {
Expand Down Expand Up @@ -457,12 +456,12 @@ public void push(Repo<T> repo) throws StackOverflowException {

Optional<Integer> top = findTop();

if (top.filter(t -> t >= maxRepos).isPresent()) {
if (top.filter(t -> t >= MAX_REPOS).isPresent()) {
throw new StackOverflowException("Repo stack size too large");
}

FateMutator<T> fateMutator =
newMutator(fateId).requireStatus(TStatus.IN_PROGRESS, TStatus.NEW);
newMutator(fateId).requireStatus(REQ_PUSH_STATUS.toArray(TStatus[]::new));
fateMutator.putRepo(top.map(t -> t + 1).orElse(1), repo).mutate();
}

Expand All @@ -471,8 +470,8 @@ public void pop() {
verifyReservedAndNotDeleted(true);

Optional<Integer> top = findTop();
top.ifPresent(t -> newMutator(fateId)
.requireStatus(TStatus.FAILED_IN_PROGRESS, TStatus.SUCCESSFUL).deleteRepo(t).mutate());
top.ifPresent(t -> newMutator(fateId).requireStatus(REQ_POP_STATUS.toArray(TStatus[]::new))
.deleteRepo(t).mutate());
}

@Override
Expand All @@ -497,7 +496,7 @@ public void delete() {
verifyReservedAndNotDeleted(true);

var mutator = newMutator(fateId);
mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED);
mutator.requireStatus(REQ_DELETE_STATUS.toArray(TStatus[]::new));
mutator.delete().mutate();
this.deleted = true;
}
Expand All @@ -507,9 +506,7 @@ public void forceDelete() {
verifyReservedAndNotDeleted(true);

var mutator = newMutator(fateId);
// allow deletion of all txns other than UNKNOWN
mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED, TStatus.SUCCESSFUL, TStatus.FAILED,
TStatus.FAILED_IN_PROGRESS, TStatus.IN_PROGRESS);
mutator.requireStatus(REQ_FORCE_DELETE_STATUS.toArray(TStatus[]::new));
mutator.delete().mutate();
this.deleted = true;
}
Expand Down Expand Up @@ -537,14 +534,14 @@ protected void unreserve() {

static Text invertRepo(int position) {
Preconditions.checkArgument(REPO_RANGE.contains(position),
"Position %s is not in the valid range of [0,%s]", position, maxRepos);
return new Text(String.format("%02d", maxRepos - position));
"Position %s is not in the valid range of [0,%s]", position, MAX_REPOS);
return new Text(String.format("%02d", MAX_REPOS - position));
}

static Integer restoreRepo(Text invertedPosition) {
int position = maxRepos - Integer.parseInt(invertedPosition.toString());
int position = MAX_REPOS - Integer.parseInt(invertedPosition.toString());
Preconditions.checkArgument(REPO_RANGE.contains(position),
"Position %s is not in the valid range of [0,%s]", position, maxRepos);
"Position %s is not in the valid range of [0,%s]", position, MAX_REPOS);
return position;
}
}
Loading

0 comments on commit 10830fe

Please sign in to comment.