Skip to content

Commit

Permalink
limits memory and cpu used by compaction reservation request (apache#…
Browse files Browse the repository at this point in the history
…5185)

Added threads pools to execute compaction reservation request in order
to limit memory and cpu used by executing reservations.  Request queued up
for the pool could still potentially use a lot of memory.  Did two
things to control memory of things in the queue.  First only allow a
compactor process to have one reservation processing at time.  Second
made the data related to a resevation request a soft reference which
should allow it be garbage collected if memory gets low while it sitting
in the queue.  Once the request starts executing it obtains a strong
refrence to the data so it can no longer be garbage collected.

fixes apache#5177
  • Loading branch information
keith-turner authored Dec 15, 2024
1 parent 47b75d3 commit 632dbc2
Show file tree
Hide file tree
Showing 4 changed files with 190 additions and 60 deletions.
12 changes: 12 additions & 0 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -1189,6 +1189,18 @@ public enum Property {
COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, PropertyType.PREFIX,
"Properties in this category affect the behavior of the accumulo compaction coordinator server.",
"2.1.0"),
COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT("compaction.coordinator.reservation.threads.root",
"1", PropertyType.COUNT,
"The number of threads used to reserve files for compaction in a tablet for the root tablet.",
"4.0.0"),
COMPACTION_COORDINATOR_RESERVATION_THREADS_META("compaction.coordinator.reservation.threads.meta",
"1", PropertyType.COUNT,
"The number of threads used to reserve files for compaction in a tablet for accumulo.metadata tablets.",
"4.0.0"),
COMPACTION_COORDINATOR_RESERVATION_THREADS_USER("compaction.coordinator.reservation.threads.user",
"64", PropertyType.COUNT,
"The number of threads used to reserve files for compaction in a tablet for user tables.",
"4.0.0"),
@Experimental
COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL(
"compaction.coordinator.compactor.dead.check.interval", "5m", PropertyType.TIMEDURATION,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ public enum ThreadPoolNames {
CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"),
COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"),
COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"),
COORDINATOR_RESERVATION_ROOT_POOL("accumulo.pool.compaction.coordinator.reservation.root"),
COORDINATOR_RESERVATION_META_POOL("accumulo.pool.compaction.coordinator.reservation.meta"),
COORDINATOR_RESERVATION_USER_POOL("accumulo.pool.compaction.coordinator.reservation.user"),
GC_DELETE_POOL("accumulo.pool.gc.threads.delete"),
GENERAL_SERVER_POOL("accumulo.pool.general.server"),
SERVICE_LOCK_POOL("accumulo.pool.service.lock"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_META_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_ROOT_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_USER_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL;
Expand Down Expand Up @@ -369,6 +372,27 @@ public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf
return builder.build();
case GC_DELETE_THREADS:
return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
case COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT:
builder = getPoolBuilder(COORDINATOR_RESERVATION_ROOT_POOL).numCoreThreads(conf.getCount(p))
.withTimeOut(60L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case COMPACTION_COORDINATOR_RESERVATION_THREADS_META:
builder = getPoolBuilder(COORDINATOR_RESERVATION_META_POOL).numCoreThreads(conf.getCount(p))
.withTimeOut(60L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case COMPACTION_COORDINATOR_RESERVATION_THREADS_USER:
builder = getPoolBuilder(COORDINATOR_RESERVATION_USER_POOL).numCoreThreads(conf.getCount(p))
.withTimeOut(60L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
default:
throw new IllegalArgumentException("Unhandled thread pool property: " + p);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.ref.SoftReference;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -43,13 +44,17 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.stream.Collectors;

import org.apache.accumulo.core.Constants;
Expand Down Expand Up @@ -193,6 +198,9 @@ public class CompactionCoordinator

private volatile long coordinatorStartTime;

private final Map<DataLevel,ThreadPoolExecutor> reservationPools;
private final Set<String> activeCompactorReservationRequest = ConcurrentHashMap.newKeySet();

public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
AtomicReference<Map<FateInstanceType,Fate<Manager>>> fateInstances, Manager manager) {
this.ctx = ctx;
Expand Down Expand Up @@ -232,6 +240,18 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security,
deadCompactionDetector =
new DeadCompactionDetector(this.ctx, this, schedExecutor, fateInstances);

var rootReservationPool = ThreadPools.getServerThreadPools().createExecutorService(
ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT, true);

var metaReservationPool = ThreadPools.getServerThreadPools().createExecutorService(
ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META, true);

var userReservationPool = ThreadPools.getServerThreadPools().createExecutorService(
ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER, true);

reservationPools = Map.of(Ample.DataLevel.ROOT, rootReservationPool, Ample.DataLevel.METADATA,
metaReservationPool, Ample.DataLevel.USER, userReservationPool);

compactorCounts = ctx.getCaches().createNewBuilder(CacheName.COMPACTOR_COUNTS, false)
.expireAfterWrite(30, TimeUnit.SECONDS).build(this::countCompactors);
// At this point the manager does not have its lock so no actions should be taken yet
Expand All @@ -250,6 +270,9 @@ public void start() {

public void shutdown() {
shutdown.countDown();

reservationPools.values().forEach(ExecutorService::shutdownNow);

var localThread = serviceThread;
if (localThread != null) {
try {
Expand Down Expand Up @@ -528,82 +551,142 @@ protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job,

}

protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob,
String compactorAddress, ExternalCompactionId externalCompactionId) {
private class ReserveCompactionTask implements Supplier<CompactionMetadata> {

// Use a soft reference for this in case free memory gets low while this is sitting in the queue
// waiting to process. This object can contain the tablets list of files and if there are lots
// of tablet with lots of files then that could start to cause memory problems. This hack could
// be removed if #5188 were implemented.
private final SoftReference<CompactionJobQueues.MetaJob> metaJobRef;
private final String compactorAddress;
private final ExternalCompactionId externalCompactionId;

private ReserveCompactionTask(CompactionJobQueues.MetaJob metaJob, String compactorAddress,
ExternalCompactionId externalCompactionId) {
Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM
|| metaJob.getJob().getKind() == CompactionKind.USER);
this.metaJobRef = new SoftReference<>(Objects.requireNonNull(metaJob));
this.compactorAddress = Objects.requireNonNull(compactorAddress);
this.externalCompactionId = Objects.requireNonNull(externalCompactionId);
Preconditions.checkState(activeCompactorReservationRequest.add(compactorAddress),
"compactor %s already on has a reservation in flight, cannot process %s",
compactorAddress, externalCompactionId);
}

Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM
|| metaJob.getJob().getKind() == CompactionKind.USER);
@Override
public CompactionMetadata get() {
try {
var metaJob = metaJobRef.get();
if (metaJob == null) {
LOG.warn("Compaction reservation request for {} {} was garbage collected.",
compactorAddress, externalCompactionId);
return null;
}

var tabletMetadata = metaJob.getTabletMetadata();
var tabletMetadata = metaJob.getTabletMetadata();

var jobFiles = metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile)
.collect(Collectors.toSet());
var jobFiles = metaJob.getJob().getFiles().stream()
.map(CompactableFileImpl::toStoredTabletFile).collect(Collectors.toSet());

Retry retry = Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100))
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
.logInterval(Duration.ofMinutes(3)).createRetry();
Retry retry = Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100))
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5)
.logInterval(Duration.ofMinutes(3)).createRetry();

while (retry.canRetry()) {
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
var extent = metaJob.getTabletMetadata().getExtent();
while (retry.canRetry()) {
try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) {
var extent = metaJob.getTabletMetadata().getExtent();

if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), jobFiles, ctx,
manager.getSteadyTime())) {
return null;
}
if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), jobFiles, ctx,
manager.getSteadyTime())) {
return null;
}

var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata,
compactorAddress, externalCompactionId);

// any data that is read from the tablet to make a decision about if it can compact or not
// must be checked for changes in the conditional mutation.
var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation()
.requireFiles(jobFiles).requireNotCompacting(jobFiles);
if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
// For system compactions the user compaction requested column is examined when deciding
// if a compaction can start so need to check for changes to this column.
tabletMutator.requireSame(tabletMetadata, SELECTED, USER_COMPACTION_REQUESTED);
} else {
tabletMutator.requireSame(tabletMetadata, SELECTED);
}
var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata,
compactorAddress, externalCompactionId);

// any data that is read from the tablet to make a decision about if it can compact or
// not
// must be checked for changes in the conditional mutation.
var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation()
.requireFiles(jobFiles).requireNotCompacting(jobFiles);
if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
// For system compactions the user compaction requested column is examined when
// deciding
// if a compaction can start so need to check for changes to this column.
tabletMutator.requireSame(tabletMetadata, SELECTED, USER_COMPACTION_REQUESTED);
} else {
tabletMutator.requireSame(tabletMetadata, SELECTED);
}

if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
var selectedFiles = tabletMetadata.getSelectedFiles();
var reserved = getFilesReservedBySelection(tabletMetadata, manager.getSteadyTime(), ctx);

// If there is a selectedFiles column, and the reserved set is empty this means that
// either no user jobs were completed yet or the selection expiration time has passed
// so the column is eligible to be deleted so a system job can run instead
if (selectedFiles != null && reserved.isEmpty()
&& !Collections.disjoint(jobFiles, selectedFiles.getFiles())) {
LOG.debug("Deleting user compaction selected files for {} {}", extent,
externalCompactionId);
tabletMutator.deleteSelectedFiles();
}
}
if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) {
var selectedFiles = tabletMetadata.getSelectedFiles();
var reserved =
getFilesReservedBySelection(tabletMetadata, manager.getSteadyTime(), ctx);

// If there is a selectedFiles column, and the reserved set is empty this means that
// either no user jobs were completed yet or the selection expiration time has passed
// so the column is eligible to be deleted so a system job can run instead
if (selectedFiles != null && reserved.isEmpty()
&& !Collections.disjoint(jobFiles, selectedFiles.getFiles())) {
LOG.debug("Deleting user compaction selected files for {} {}", extent,
externalCompactionId);
tabletMutator.deleteSelectedFiles();
}
}

tabletMutator.putExternalCompaction(externalCompactionId, ecm);
tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId));
tabletMutator.putExternalCompaction(externalCompactionId, ecm);
tabletMutator
.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId));

var result = tabletsMutator.process().get(extent);
var result = tabletsMutator.process().get(extent);

if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
return ecm;
} else {
tabletMetadata = result.readMetadata();
if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) {
return ecm;
} else {
tabletMetadata = result.readMetadata();
}
}

retry.useRetry();
try {
retry.waitForNextAttempt(LOG,
"Reserved compaction for " + metaJob.getTabletMetadata().getExtent());
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

retry.useRetry();
try {
retry.waitForNextAttempt(LOG,
"Reserved compaction for " + metaJob.getTabletMetadata().getExtent());
} catch (InterruptedException e) {
throw new RuntimeException(e);
return null;
} finally {
Preconditions.checkState(activeCompactorReservationRequest.remove(compactorAddress),
"compactorAddress:%s", compactorAddress);
}
}
}

protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob,
String compactorAddress, ExternalCompactionId externalCompactionId) {

if (activeCompactorReservationRequest.contains(compactorAddress)) {
// In this case the compactor has a previously submitted reservation request that is still
// processing. Do not want to let it queue up another reservation request. One possible cause
// of this is that compactor timed out waiting for its last request to process and is now
// making another request. The previously submitted request can not be used because the
// compactor generates a new uuid for each request it makes. So the best thing to do is to
// return null and wait for this situation to resolve. This will likely happen when some part
// of the distributed system is not working well, so at this point want to avoid making
// problems worse instead of trying to reserve a job.
LOG.warn(
"Ignoring request from {} to reserve compaction job because it has a reservation request in progress.",
compactorAddress);
return null;
}

return null;
var dataLevel = DataLevel.of(metaJob.getTabletMetadata().getTableId());
var future = CompletableFuture.supplyAsync(
new ReserveCompactionTask(metaJob, compactorAddress, externalCompactionId),
reservationPools.get(dataLevel));
return future.join();
}

protected TExternalCompactionJob createThriftJob(String externalCompactionId,
Expand Down Expand Up @@ -1123,6 +1206,14 @@ public void cleanUpInternalState() {
// 5. Log compactors with no groups
// 6. Log groups with compactors and queued jos that have not checked in

var config = ctx.getConfiguration();
ThreadPools.resizePool(reservationPools.get(DataLevel.ROOT), config,
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT);
ThreadPools.resizePool(reservationPools.get(DataLevel.METADATA), config,
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META);
ThreadPools.resizePool(reservationPools.get(DataLevel.USER), config,
Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER);

// grab a snapshot of the ids in the set before reading the metadata table. This is done to
// avoid removing things that are added while reading the metadata.
final Set<ExternalCompactionId> idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());
Expand Down

0 comments on commit 632dbc2

Please sign in to comment.