From 39b5c2ae8f399620bdd06da8007bb3ad1a87e8be Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Sat, 14 Dec 2024 18:51:53 +0000 Subject: [PATCH 1/4] Executes conditional mutations in a thread pool Conditional mutations currently execute in the thrift thread pool which can grow unbounded in size. Having an unbounded number of conditional mutations executing concurrently could degrade a tablet servers health in some cases. This change adds threads pools for executing conditional updates. This will help limit the CPU and memory used by executing conditional mutations. Conditional mutations can also contain data as part of the mutation that needs to be checked, if this is large that could still cause memory issues. This change more limits the memory used by reading current tablet data to check the condition. Limiting the memory used by conditional mutations waiting to execute is something that would need to somehow be controlled by thrift. So this change protects memory and CPU resources for conditional mutations that are executing but does not protect memory for ones that are waiting to execute. To protect memory for those waiting to execute would need to end the current practice of letting the thrift thread pool grow unbounded, but that is a long existing problem. --- .../apache/accumulo/core/conf/Property.java | 9 +++++ .../core/util/threads/ThreadPoolNames.java | 3 ++ .../core/util/threads/ThreadPools.java | 24 +++++++++++++ .../accumulo/tserver/TabletClientHandler.java | 34 +++++++++++++------ .../tserver/TabletServerResourceManager.java | 34 +++++++++++++++++++ 5 files changed, 93 insertions(+), 11 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 122c7f414be..ff79307ec2d 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -767,6 +767,15 @@ public enum Property { "Resource group name for this TabletServer. Resource groups can be defined to dedicate resources " + " to specific tables (e.g. balancing tablets for table(s) within a group, see TableLoadBalancer).", "4.0.0"), + TSERV_CONDITIONAL_UPDATE_THREADS_ROOT("tserver.conditionalupdate.threads.root", "16", + PropertyType.COUNT, "Numbers of threads for executing conditional updates on the root table.", + "4.0.0"), + TSERV_CONDITIONAL_UPDATE_THREADS_META("tserver.conditionalupdate.threads.meta", "16", + PropertyType.COUNT, "Numbers of threads for executing conditional updates on the root table.", + "4.0.0"), + TSERV_CONDITIONAL_UPDATE_THREADS_USER("tserver.conditionalupdate.threads.user", "16", + PropertyType.COUNT, "Numbers of threads for executing conditional updates on the root table.", + "4.0.0"), // accumulo garbage collector properties GC_PREFIX("gc.", null, PropertyType.PREFIX, diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java index b57baf0d5d5..b360e41b6b5 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPoolNames.java @@ -62,6 +62,9 @@ public enum ThreadPoolNames { TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"), TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"), TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"), + TSERVER_CONDITIONAL_UPDATE_ROOT_POOL("accumulo.pool.tserver.conditionalupdate.root"), + TSERVER_CONDITIONAL_UPDATE_META_POOL("accumulo.pool.tserver.conditionalupdate.meta"), + TSERVER_CONDITIONAL_UPDATE_USER_POOL("accumulo.pool.tserver.conditionalupdate.user"), UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"), UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers"); diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 44fd7d64e7e..12e2567bdf6 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -28,6 +28,9 @@ import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL; @@ -343,6 +346,27 @@ public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf builder.enableThreadPoolMetrics(); } return builder.build(); + case TSERV_CONDITIONAL_UPDATE_THREADS_ROOT: + builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_ROOT_POOL) + .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case TSERV_CONDITIONAL_UPDATE_THREADS_META: + builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_META_POOL) + .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); + case TSERV_CONDITIONAL_UPDATE_THREADS_USER: + builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_USER_POOL) + .numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS); + if (emitThreadPoolMetrics) { + builder.enableThreadPoolMetrics(); + } + return builder.build(); case GC_DELETE_THREADS: return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build(); default: diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java index 0b5c2466473..117d87a96d5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java @@ -794,19 +794,30 @@ public List conditionalUpdate(TInfo tinfo, long sessID, } } - ArrayList results = new ArrayList<>(); - - Map> deferred = - conditionalUpdate(cs, updates, results, symbols); + var lambdaCs = cs; + // Conditional updates read data into memory, examine it, and then make an update. This can be + // CPU, I/O, and memory intensive. Using a thread pool directly limits CPU usage and + // indirectly limits memory and I/O usage. + Future> future = + server.resourceManager.executeConditionalUpdate(cs.tableId, () -> { + ArrayList results = new ArrayList<>(); + + Map> deferred = + conditionalUpdate(lambdaCs, updates, results, symbols); + + while (!deferred.isEmpty()) { + deferred = conditionalUpdate(lambdaCs, deferred, results, symbols); + } - while (!deferred.isEmpty()) { - deferred = conditionalUpdate(cs, deferred, results, symbols); - } + return results; + }); - return results; - } catch (IOException ioe) { - throw new TException(ioe); - } catch (Exception e) { + return future.get(); + } catch (ExecutionException | InterruptedException e) { + log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: {}", + cs == null ? null : cs.tableId, opid, e); + throw new TException(e); + } catch (RuntimeException e) { log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: {}", cs == null ? null : cs.tableId, opid, e); throw e; @@ -814,6 +825,7 @@ public List conditionalUpdate(TInfo tinfo, long sessID, if (opid != null) { writeTracker.finishWrite(opid); } + if (cs != null) { server.sessionManager.unreserveSession(sessID); } diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java index ff23188a5ad..f09165c4508 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java @@ -27,6 +27,9 @@ import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL; +import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL; @@ -46,8 +49,10 @@ import java.util.OptionalInt; import java.util.Queue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; @@ -62,9 +67,11 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig; import org.apache.accumulo.core.conf.ConfigurationTypeHelper; import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory; import org.apache.accumulo.core.file.blockfile.impl.ScanCacheProvider; +import org.apache.accumulo.core.metadata.schema.Ample; import org.apache.accumulo.core.spi.cache.BlockCache; import org.apache.accumulo.core.spi.cache.BlockCacheManager; import org.apache.accumulo.core.spi.cache.CacheType; @@ -118,6 +125,8 @@ public class TabletServerResourceManager { private final Map scanExecutors; private final Map scanExecutorChoices; + private final Map conditionalMutationExecutors; + private final ConcurrentHashMap activeAssignments; private final FileManager fileManager; @@ -385,6 +394,27 @@ public TabletServerResourceManager(ServerContext context, TabletHostingServer ts memMgmt = new MemoryManagementFramework(); memMgmt.startThreads(); + var rootConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, + Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT, enableMetrics); + modifyThreadPoolSizesAtRuntime( + () -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT), + TSERVER_CONDITIONAL_UPDATE_ROOT_POOL.poolName, rootConditionalPool); + + var metaConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, + Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics); + modifyThreadPoolSizesAtRuntime( + () -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER), + TSERVER_CONDITIONAL_UPDATE_META_POOL.poolName, metaConditionalPool); + + var userConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf, + Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics); + modifyThreadPoolSizesAtRuntime( + () -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER), + TSERVER_CONDITIONAL_UPDATE_USER_POOL.poolName, userConditionalPool); + + conditionalMutationExecutors = Map.of(Ample.DataLevel.ROOT, rootConditionalPool, + Ample.DataLevel.METADATA, metaConditionalPool, Ample.DataLevel.USER, userConditionalPool); + // We can use the same map for both metadata and normal assignments since the keyspace (extent) // is guaranteed to be unique. Schedule the task once, the task will reschedule itself. ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().schedule( @@ -835,6 +865,10 @@ public void addMigration(KeyExtent tablet, Runnable migrationHandler) { } } + public Future executeConditionalUpdate(TableId tableId, Callable updateTask) { + return conditionalMutationExecutors.get(Ample.DataLevel.of(tableId)).submit(updateTask); + } + public BlockCache getIndexCache() { return _iCache; } From 6118918730a582003bfe6677f011f0c2d2b4ae30 Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Sat, 14 Dec 2024 16:17:53 -0500 Subject: [PATCH 2/4] Update core/src/main/java/org/apache/accumulo/core/conf/Property.java Co-authored-by: Christopher L. Shannon --- core/src/main/java/org/apache/accumulo/core/conf/Property.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index ff79307ec2d..a8ebadc30dc 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -771,7 +771,7 @@ public enum Property { PropertyType.COUNT, "Numbers of threads for executing conditional updates on the root table.", "4.0.0"), TSERV_CONDITIONAL_UPDATE_THREADS_META("tserver.conditionalupdate.threads.meta", "16", - PropertyType.COUNT, "Numbers of threads for executing conditional updates on the root table.", + PropertyType.COUNT, "Numbers of threads for executing conditional updates on the metadata table.", "4.0.0"), TSERV_CONDITIONAL_UPDATE_THREADS_USER("tserver.conditionalupdate.threads.user", "16", PropertyType.COUNT, "Numbers of threads for executing conditional updates on the root table.", From d5fad56d0d6c25022ca91f2fb4e5565a02e8a10a Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Sat, 14 Dec 2024 16:17:59 -0500 Subject: [PATCH 3/4] Update core/src/main/java/org/apache/accumulo/core/conf/Property.java Co-authored-by: Christopher L. Shannon --- core/src/main/java/org/apache/accumulo/core/conf/Property.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index a8ebadc30dc..3a410a664dd 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -774,7 +774,7 @@ public enum Property { PropertyType.COUNT, "Numbers of threads for executing conditional updates on the metadata table.", "4.0.0"), TSERV_CONDITIONAL_UPDATE_THREADS_USER("tserver.conditionalupdate.threads.user", "16", - PropertyType.COUNT, "Numbers of threads for executing conditional updates on the root table.", + PropertyType.COUNT, "Numbers of threads for executing conditional updates on the user table.", "4.0.0"), // accumulo garbage collector properties From 637403abf38da7869237afff5a85bda26eb6369b Mon Sep 17 00:00:00 2001 From: Keith Turner Date: Sat, 14 Dec 2024 21:43:35 +0000 Subject: [PATCH 4/4] Adjust defaults --- .../java/org/apache/accumulo/core/conf/Property.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 3a410a664dd..63cbd397acb 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -770,11 +770,11 @@ public enum Property { TSERV_CONDITIONAL_UPDATE_THREADS_ROOT("tserver.conditionalupdate.threads.root", "16", PropertyType.COUNT, "Numbers of threads for executing conditional updates on the root table.", "4.0.0"), - TSERV_CONDITIONAL_UPDATE_THREADS_META("tserver.conditionalupdate.threads.meta", "16", - PropertyType.COUNT, "Numbers of threads for executing conditional updates on the metadata table.", - "4.0.0"), - TSERV_CONDITIONAL_UPDATE_THREADS_USER("tserver.conditionalupdate.threads.user", "16", - PropertyType.COUNT, "Numbers of threads for executing conditional updates on the user table.", + TSERV_CONDITIONAL_UPDATE_THREADS_META("tserver.conditionalupdate.threads.meta", "64", + PropertyType.COUNT, + "Numbers of threads for executing conditional updates on the metadata table.", "4.0.0"), + TSERV_CONDITIONAL_UPDATE_THREADS_USER("tserver.conditionalupdate.threads.user", "64", + PropertyType.COUNT, "Numbers of threads for executing conditional updates on user tables.", "4.0.0"), // accumulo garbage collector properties