From 632dbc2ed0d7bfed30a2f9da7053257b07720962 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Sun, 15 Dec 2024 14:27:19 -0500 Subject: [PATCH] limits memory and cpu used by compaction reservation request (#5185) Added threads pools to execute compaction reservation request in order to limit memory and cpu used by executing reservations. Request queued up for the pool could still potentially use a lot of memory. Did two things to control memory of things in the queue. First only allow a compactor process to have one reservation processing at time. Second made the data related to a resevation request a soft reference which should allow it be garbage collected if memory gets low while it sitting in the queue. Once the request starts executing it obtains a strong refrence to the data so it can no longer be garbage collected. fixes #5177 --- .../apache/accumulo/core/conf/Property.java | 12 + .../core/util/threads/ThreadPoolNames.java | 3 + .../core/util/threads/ThreadPools.java | 24 ++ .../coordinator/CompactionCoordinator.java | 211 +++++++++++++----- 4 files changed, 190 insertions(+), 60 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 63cbd397acb..77fa9c461ad 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1189,6 +1189,18 @@ public enum Property { COMPACTION_COORDINATOR_PREFIX("compaction.coordinator.", null, PropertyType.PREFIX, "Properties in this category affect the behavior of the accumulo compaction coordinator server.", "2.1.0"), + COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT("compaction.coordinator.reservation.threads.root", + "1", PropertyType.COUNT, + "The number of threads used to reserve files for compaction in a tablet for the root tablet.", + "4.0.0"), + COMPACTION_COORDINATOR_RESERVATION_THREADS_META("compaction.coordinator.reservation.threads.meta", + "1", PropertyType.COUNT, + "The number of threads used to reserve files for compaction in a tablet for accumulo.metadata tablets.", + "4.0.0"), + COMPACTION_COORDINATOR_RESERVATION_THREADS_USER("compaction.coordinator.reservation.threads.user", + "64", PropertyType.COUNT, + "The number of threads used to reserve files for compaction in a tablet for user tables.", + "4.0.0"), @Experimental COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL( "compaction.coordinator.compactor.dead.check.interval", "5m", PropertyType.TIMEDURATION, diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java index b360e41b6b5..6c025a6815b 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@ -34,6 +34,9 @@ public enum ThreadPoolNames { CONDITIONAL_WRITER_CLEANUP_POOL("accumulo.pool.client.context.conditional.writer.cleanup"), COORDINATOR_FINALIZER_BACKGROUND_POOL("accumulo.pool.compaction.finalizer.background.pool"), COORDINATOR_FINALIZER_NOTIFIER_POOL("accumulo.pool.compaction.coordinator.compaction.finalizer"), + COORDINATOR_RESERVATION_ROOT_POOL("accumulo.pool.compaction.coordinator.reservation.root"), + COORDINATOR_RESERVATION_META_POOL("accumulo.pool.compaction.coordinator.reservation.meta"), + COORDINATOR_RESERVATION_USER_POOL("accumulo.pool.compaction.coordinator.reservation.user"), GC_DELETE_POOL("accumulo.pool.gc.threads.delete"), GENERAL_SERVER_POOL("accumulo.pool.general.server"), SERVICE_LOCK_POOL("accumulo.pool.service.lock"), diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 12e2567bdf6..08d1d829899 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -22,6 +22,9 @@ import static java.util.concurrent.TimeUnit.MINUTES; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.ACCUMULO_POOL_PREFIX; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_META_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_ROOT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.COORDINATOR_RESERVATION_USER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GC_DELETE_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.GENERAL_SERVER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_FATE_POOL; @@ -369,6 +372,27 @@ public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf return builder.build(); case GC_DELETE_THREADS: return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build(); + case COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT: + builder = getPoolBuilder(COORDINATOR_RESERVATION_ROOT_POOL).numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case COMPACTION_COORDINATOR_RESERVATION_THREADS_META: + builder = getPoolBuilder(COORDINATOR_RESERVATION_META_POOL).numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case COMPACTION_COORDINATOR_RESERVATION_THREADS_USER: + builder = getPoolBuilder(COORDINATOR_RESERVATION_USER_POOL).numCoreThreads(conf.getCount(p)) + .withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); default: throw new IllegalArgumentException("Unhandled thread pool property: " + p); } diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java index 2640ed4bcce..6f2eca37565 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java @@ -31,6 +31,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.UncheckedIOException; +import java.lang.ref.SoftReference; import java.time.Duration; import java.util.ArrayList; import java.util.Collection; @@ -43,13 +44,17 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.accumulo.core.Constants; @@ -193,6 +198,9 @@ public class CompactionCoordinator private volatile long coordinatorStartTime; + private final Map reservationPools; + private final Set activeCompactorReservationRequest = ConcurrentHashMap.newKeySet(); + public CompactionCoordinator(ServerContext ctx, SecurityOperation security, AtomicReference>> fateInstances, Manager manager) { this.ctx = ctx; @@ -232,6 +240,18 @@ public CompactionCoordinator(ServerContext ctx, SecurityOperation security, deadCompactionDetector = new DeadCompactionDetector(this.ctx, this, schedExecutor, fateInstances); + var rootReservationPool = ThreadPools.getServerThreadPools().createExecutorService( + ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT, true); + + var metaReservationPool = ThreadPools.getServerThreadPools().createExecutorService( + ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META, true); + + var userReservationPool = ThreadPools.getServerThreadPools().createExecutorService( + ctx.getConfiguration(), Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER, true); + + reservationPools = Map.of(Ample.DataLevel.ROOT, rootReservationPool, Ample.DataLevel.METADATA, + metaReservationPool, Ample.DataLevel.USER, userReservationPool); + compactorCounts = ctx.getCaches().createNewBuilder(CacheName.COMPACTOR_COUNTS, false) .expireAfterWrite(30, TimeUnit.SECONDS).build(this::countCompactors); // At this point the manager does not have its lock so no actions should be taken yet @@ -250,6 +270,9 @@ public void start() { public void shutdown() { shutdown.countDown(); + + reservationPools.values().forEach(ExecutorService::shutdownNow); + var localThread = serviceThread; if (localThread != null) { try { @@ -528,82 +551,142 @@ protected CompactionMetadata createExternalCompactionMetadata(CompactionJob job, } - protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob, - String compactorAddress, ExternalCompactionId externalCompactionId) { + private class ReserveCompactionTask implements Supplier { + + // Use a soft reference for this in case free memory gets low while this is sitting in the queue + // waiting to process. This object can contain the tablets list of files and if there are lots + // of tablet with lots of files then that could start to cause memory problems. This hack could + // be removed if #5188 were implemented. + private final SoftReference metaJobRef; + private final String compactorAddress; + private final ExternalCompactionId externalCompactionId; + + private ReserveCompactionTask(CompactionJobQueues.MetaJob metaJob, String compactorAddress, + ExternalCompactionId externalCompactionId) { + Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM + || metaJob.getJob().getKind() == CompactionKind.USER); + this.metaJobRef = new SoftReference<>(Objects.requireNonNull(metaJob)); + this.compactorAddress = Objects.requireNonNull(compactorAddress); + this.externalCompactionId = Objects.requireNonNull(externalCompactionId); + Preconditions.checkState(activeCompactorReservationRequest.add(compactorAddress), + "compactor %s already on has a reservation in flight, cannot process %s", + compactorAddress, externalCompactionId); + } - Preconditions.checkArgument(metaJob.getJob().getKind() == CompactionKind.SYSTEM - || metaJob.getJob().getKind() == CompactionKind.USER); + @Override + public CompactionMetadata get() { + try { + var metaJob = metaJobRef.get(); + if (metaJob == null) { + LOG.warn("Compaction reservation request for {} {} was garbage collected.", + compactorAddress, externalCompactionId); + return null; + } - var tabletMetadata = metaJob.getTabletMetadata(); + var tabletMetadata = metaJob.getTabletMetadata(); - var jobFiles = metaJob.getJob().getFiles().stream().map(CompactableFileImpl::toStoredTabletFile) - .collect(Collectors.toSet()); + var jobFiles = metaJob.getJob().getFiles().stream() + .map(CompactableFileImpl::toStoredTabletFile).collect(Collectors.toSet()); - Retry retry = Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100)) - .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5) - .logInterval(Duration.ofMinutes(3)).createRetry(); + Retry retry = Retry.builder().maxRetries(5).retryAfter(Duration.ofMillis(100)) + .incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(10)).backOffFactor(1.5) + .logInterval(Duration.ofMinutes(3)).createRetry(); - while (retry.canRetry()) { - try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { - var extent = metaJob.getTabletMetadata().getExtent(); + while (retry.canRetry()) { + try (var tabletsMutator = ctx.getAmple().conditionallyMutateTablets()) { + var extent = metaJob.getTabletMetadata().getExtent(); - if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), jobFiles, ctx, - manager.getSteadyTime())) { - return null; - } + if (!canReserveCompaction(tabletMetadata, metaJob.getJob().getKind(), jobFiles, ctx, + manager.getSteadyTime())) { + return null; + } - var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata, - compactorAddress, externalCompactionId); - - // any data that is read from the tablet to make a decision about if it can compact or not - // must be checked for changes in the conditional mutation. - var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() - .requireFiles(jobFiles).requireNotCompacting(jobFiles); - if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { - // For system compactions the user compaction requested column is examined when deciding - // if a compaction can start so need to check for changes to this column. - tabletMutator.requireSame(tabletMetadata, SELECTED, USER_COMPACTION_REQUESTED); - } else { - tabletMutator.requireSame(tabletMetadata, SELECTED); - } + var ecm = createExternalCompactionMetadata(metaJob.getJob(), jobFiles, tabletMetadata, + compactorAddress, externalCompactionId); + + // any data that is read from the tablet to make a decision about if it can compact or + // not + // must be checked for changes in the conditional mutation. + var tabletMutator = tabletsMutator.mutateTablet(extent).requireAbsentOperation() + .requireFiles(jobFiles).requireNotCompacting(jobFiles); + if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { + // For system compactions the user compaction requested column is examined when + // deciding + // if a compaction can start so need to check for changes to this column. + tabletMutator.requireSame(tabletMetadata, SELECTED, USER_COMPACTION_REQUESTED); + } else { + tabletMutator.requireSame(tabletMetadata, SELECTED); + } - if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { - var selectedFiles = tabletMetadata.getSelectedFiles(); - var reserved = getFilesReservedBySelection(tabletMetadata, manager.getSteadyTime(), ctx); - - // If there is a selectedFiles column, and the reserved set is empty this means that - // either no user jobs were completed yet or the selection expiration time has passed - // so the column is eligible to be deleted so a system job can run instead - if (selectedFiles != null && reserved.isEmpty() - && !Collections.disjoint(jobFiles, selectedFiles.getFiles())) { - LOG.debug("Deleting user compaction selected files for {} {}", extent, - externalCompactionId); - tabletMutator.deleteSelectedFiles(); - } - } + if (metaJob.getJob().getKind() == CompactionKind.SYSTEM) { + var selectedFiles = tabletMetadata.getSelectedFiles(); + var reserved = + getFilesReservedBySelection(tabletMetadata, manager.getSteadyTime(), ctx); + + // If there is a selectedFiles column, and the reserved set is empty this means that + // either no user jobs were completed yet or the selection expiration time has passed + // so the column is eligible to be deleted so a system job can run instead + if (selectedFiles != null && reserved.isEmpty() + && !Collections.disjoint(jobFiles, selectedFiles.getFiles())) { + LOG.debug("Deleting user compaction selected files for {} {}", extent, + externalCompactionId); + tabletMutator.deleteSelectedFiles(); + } + } - tabletMutator.putExternalCompaction(externalCompactionId, ecm); - tabletMutator.submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId)); + tabletMutator.putExternalCompaction(externalCompactionId, ecm); + tabletMutator + .submit(tm -> tm.getExternalCompactions().containsKey(externalCompactionId)); - var result = tabletsMutator.process().get(extent); + var result = tabletsMutator.process().get(extent); - if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { - return ecm; - } else { - tabletMetadata = result.readMetadata(); + if (result.getStatus() == Ample.ConditionalResult.Status.ACCEPTED) { + return ecm; + } else { + tabletMetadata = result.readMetadata(); + } + } + + retry.useRetry(); + try { + retry.waitForNextAttempt(LOG, + "Reserved compaction for " + metaJob.getTabletMetadata().getExtent()); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } } - } - retry.useRetry(); - try { - retry.waitForNextAttempt(LOG, - "Reserved compaction for " + metaJob.getTabletMetadata().getExtent()); - } catch (InterruptedException e) { - throw new RuntimeException(e); + return null; + } finally { + Preconditions.checkState(activeCompactorReservationRequest.remove(compactorAddress), + "compactorAddress:%s", compactorAddress); } } + } + + protected CompactionMetadata reserveCompaction(CompactionJobQueues.MetaJob metaJob, + String compactorAddress, ExternalCompactionId externalCompactionId) { + + if (activeCompactorReservationRequest.contains(compactorAddress)) { + // In this case the compactor has a previously submitted reservation request that is still + // processing. Do not want to let it queue up another reservation request. One possible cause + // of this is that compactor timed out waiting for its last request to process and is now + // making another request. The previously submitted request can not be used because the + // compactor generates a new uuid for each request it makes. So the best thing to do is to + // return null and wait for this situation to resolve. This will likely happen when some part + // of the distributed system is not working well, so at this point want to avoid making + // problems worse instead of trying to reserve a job. + LOG.warn( + "Ignoring request from {} to reserve compaction job because it has a reservation request in progress.", + compactorAddress); + return null; + } - return null; + var dataLevel = DataLevel.of(metaJob.getTabletMetadata().getTableId()); + var future = CompletableFuture.supplyAsync( + new ReserveCompactionTask(metaJob, compactorAddress, externalCompactionId), + reservationPools.get(dataLevel)); + return future.join(); } protected TExternalCompactionJob createThriftJob(String externalCompactionId, @@ -1123,6 +1206,14 @@ public void cleanUpInternalState() { // 5. Log compactors with no groups // 6. Log groups with compactors and queued jos that have not checked in + var config = ctx.getConfiguration(); + ThreadPools.resizePool(reservationPools.get(DataLevel.ROOT), config, + Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_ROOT); + ThreadPools.resizePool(reservationPools.get(DataLevel.METADATA), config, + Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_META); + ThreadPools.resizePool(reservationPools.get(DataLevel.USER), config, + Property.COMPACTION_COORDINATOR_RESERVATION_THREADS_USER); + // grab a snapshot of the ids in the set before reading the metadata table. This is done to // avoid removing things that are added while reading the metadata. final Set idsSnapshot = Set.copyOf(RUNNING_CACHE.keySet());