Skip to content

Commit

Permalink
Executes conditional mutations in a thread pool (apache#5184)
Browse files Browse the repository at this point in the history
Conditional mutations currently execute in the thrift thread pool which
can grow unbounded in size.  Having an unbounded number of conditional
mutations executing concurrently could degrade a tablet servers health
in some cases.  This change adds threads pools for executing conditional
updates.  This will help limit the CPU and memory used by executing
conditional mutations.  Conditional mutations can also contain data as
part of the mutation that needs to be checked, if this is large that
could still cause memory issues.  This change more limits the memory
used by reading current tablet data to check the condition.  Limiting
the memory used by conditional mutations waiting to execute is something
that would need to somehow be controlled by thrift.  So this change
protects memory and CPU resources for conditional mutations that are
executing but does not protect memory for ones that are waiting to
execute.  To protect memory for those waiting to execute would need to
end the current practice of letting the thrift thread pool grow
unbounded, but that is a long existing problem.


Co-authored-by: Christopher L. Shannon <cshannon@apache.org>
  • Loading branch information
keith-turner and cshannon authored Dec 14, 2024
1 parent 8f35f72 commit 47b75d3
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,15 @@ public enum Property {
"Resource group name for this TabletServer. Resource groups can be defined to dedicate resources "
+ " to specific tables (e.g. balancing tablets for table(s) within a group, see TableLoadBalancer).",
"4.0.0"),
TSERV_CONDITIONAL_UPDATE_THREADS_ROOT("tserver.conditionalupdate.threads.root", "16",
PropertyType.COUNT, "Numbers of threads for executing conditional updates on the root table.",
"4.0.0"),
TSERV_CONDITIONAL_UPDATE_THREADS_META("tserver.conditionalupdate.threads.meta", "64",
PropertyType.COUNT,
"Numbers of threads for executing conditional updates on the metadata table.", "4.0.0"),
TSERV_CONDITIONAL_UPDATE_THREADS_USER("tserver.conditionalupdate.threads.user", "64",
PropertyType.COUNT, "Numbers of threads for executing conditional updates on user tables.",
"4.0.0"),

// accumulo garbage collector properties
GC_PREFIX("gc.", null, PropertyType.PREFIX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public enum ThreadPoolNames {
TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"),
TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"),
TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"),
TSERVER_CONDITIONAL_UPDATE_ROOT_POOL("accumulo.pool.tserver.conditionalupdate.root"),
TSERVER_CONDITIONAL_UPDATE_META_POOL("accumulo.pool.tserver.conditionalupdate.meta"),
TSERVER_CONDITIONAL_UPDATE_USER_POOL("accumulo.pool.tserver.conditionalupdate.user"),
UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"),
UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
Expand Down Expand Up @@ -343,6 +346,27 @@ public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf
builder.enableThreadPoolMetrics();
}
return builder.build();
case TSERV_CONDITIONAL_UPDATE_THREADS_ROOT:
builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_ROOT_POOL)
.numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case TSERV_CONDITIONAL_UPDATE_THREADS_META:
builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_META_POOL)
.numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case TSERV_CONDITIONAL_UPDATE_THREADS_USER:
builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_USER_POOL)
.numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case GC_DELETE_THREADS:
return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,26 +794,38 @@ public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID,
}
}

ArrayList<TCMResult> results = new ArrayList<>();

Map<KeyExtent,List<ServerConditionalMutation>> deferred =
conditionalUpdate(cs, updates, results, symbols);
var lambdaCs = cs;
// Conditional updates read data into memory, examine it, and then make an update. This can be
// CPU, I/O, and memory intensive. Using a thread pool directly limits CPU usage and
// indirectly limits memory and I/O usage.
Future<ArrayList<TCMResult>> future =
server.resourceManager.executeConditionalUpdate(cs.tableId, () -> {
ArrayList<TCMResult> results = new ArrayList<>();

Map<KeyExtent,List<ServerConditionalMutation>> deferred =
conditionalUpdate(lambdaCs, updates, results, symbols);

while (!deferred.isEmpty()) {
deferred = conditionalUpdate(lambdaCs, deferred, results, symbols);
}

while (!deferred.isEmpty()) {
deferred = conditionalUpdate(cs, deferred, results, symbols);
}
return results;
});

return results;
} catch (IOException ioe) {
throw new TException(ioe);
} catch (Exception e) {
return future.get();
} catch (ExecutionException | InterruptedException e) {
log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: {}",
cs == null ? null : cs.tableId, opid, e);
throw new TException(e);
} catch (RuntimeException e) {
log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: {}",
cs == null ? null : cs.tableId, opid, e);
throw e;
} finally {
if (opid != null) {
writeTracker.finishWrite(opid);
}

if (cs != null) {
server.sessionManager.unreserveSession(sessID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
Expand All @@ -46,8 +49,10 @@
import java.util.OptionalInt;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -62,9 +67,11 @@
import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
import org.apache.accumulo.core.file.blockfile.impl.ScanCacheProvider;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.BlockCacheManager;
import org.apache.accumulo.core.spi.cache.CacheType;
Expand Down Expand Up @@ -118,6 +125,8 @@ public class TabletServerResourceManager {
private final Map<String,ThreadPoolExecutor> scanExecutors;
private final Map<String,ScanExecutor> scanExecutorChoices;

private final Map<Ample.DataLevel,ThreadPoolExecutor> conditionalMutationExecutors;

private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments;

private final FileManager fileManager;
Expand Down Expand Up @@ -385,6 +394,27 @@ public TabletServerResourceManager(ServerContext context, TabletHostingServer ts
memMgmt = new MemoryManagementFramework();
memMgmt.startThreads();

var rootConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT),
TSERVER_CONDITIONAL_UPDATE_ROOT_POOL.poolName, rootConditionalPool);

var metaConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER),
TSERVER_CONDITIONAL_UPDATE_META_POOL.poolName, metaConditionalPool);

var userConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER),
TSERVER_CONDITIONAL_UPDATE_USER_POOL.poolName, userConditionalPool);

conditionalMutationExecutors = Map.of(Ample.DataLevel.ROOT, rootConditionalPool,
Ample.DataLevel.METADATA, metaConditionalPool, Ample.DataLevel.USER, userConditionalPool);

// We can use the same map for both metadata and normal assignments since the keyspace (extent)
// is guaranteed to be unique. Schedule the task once, the task will reschedule itself.
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().schedule(
Expand Down Expand Up @@ -835,6 +865,10 @@ public void addMigration(KeyExtent tablet, Runnable migrationHandler) {
}
}

public <T> Future<T> executeConditionalUpdate(TableId tableId, Callable<T> updateTask) {
return conditionalMutationExecutors.get(Ample.DataLevel.of(tableId)).submit(updateTask);
}

public BlockCache getIndexCache() {
return _iCache;
}
Expand Down

0 comments on commit 47b75d3

Please sign in to comment.