Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Executes conditional mutations in a thread pool #5184

Merged
merged 4 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -767,6 +767,15 @@ public enum Property {
"Resource group name for this TabletServer. Resource groups can be defined to dedicate resources "
+ " to specific tables (e.g. balancing tablets for table(s) within a group, see TableLoadBalancer).",
"4.0.0"),
TSERV_CONDITIONAL_UPDATE_THREADS_ROOT("tserver.conditionalupdate.threads.root", "16",
PropertyType.COUNT, "Numbers of threads for executing conditional updates on the root table.",
"4.0.0"),
TSERV_CONDITIONAL_UPDATE_THREADS_META("tserver.conditionalupdate.threads.meta", "64",
PropertyType.COUNT,
"Numbers of threads for executing conditional updates on the metadata table.", "4.0.0"),
TSERV_CONDITIONAL_UPDATE_THREADS_USER("tserver.conditionalupdate.threads.user", "64",
PropertyType.COUNT, "Numbers of threads for executing conditional updates on user tables.",
"4.0.0"),

// accumulo garbage collector properties
GC_PREFIX("gc.", null, PropertyType.PREFIX,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ public enum ThreadPoolNames {
TSERVER_TABLET_MIGRATION_POOL("accumulo.pool.tserver.tablet.migration"),
TSERVER_WAL_CREATOR_POOL("accumulo.pool.tserver.wal.creator"),
TSERVER_WAL_SORT_CONCURRENT_POOL("accumulo.pool.tserver.wal.sort.concurrent"),
TSERVER_CONDITIONAL_UPDATE_ROOT_POOL("accumulo.pool.tserver.conditionalupdate.root"),
TSERVER_CONDITIONAL_UPDATE_META_POOL("accumulo.pool.tserver.conditionalupdate.meta"),
TSERVER_CONDITIONAL_UPDATE_USER_POOL("accumulo.pool.tserver.conditionalupdate.user"),
UTILITY_CHECK_FILE_TASKS("accumulo.pool.util.check.file.tasks"),
UTILITY_VERIFY_TABLET_ASSIGNMENTS("accumulo.pool.util.check.tablet.servers");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.MANAGER_STATUS_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCHED_FUTURE_CHECKER_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_ASSIGNMENT_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MIGRATIONS_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
Expand Down Expand Up @@ -343,6 +346,27 @@ public ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf
builder.enableThreadPoolMetrics();
}
return builder.build();
case TSERV_CONDITIONAL_UPDATE_THREADS_ROOT:
builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_ROOT_POOL)
.numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case TSERV_CONDITIONAL_UPDATE_THREADS_META:
builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_META_POOL)
.numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case TSERV_CONDITIONAL_UPDATE_THREADS_USER:
builder = getPoolBuilder(TSERVER_CONDITIONAL_UPDATE_USER_POOL)
.numCoreThreads(conf.getCount(p)).withTimeOut(60L, MILLISECONDS);
if (emitThreadPoolMetrics) {
builder.enableThreadPoolMetrics();
}
return builder.build();
case GC_DELETE_THREADS:
return getPoolBuilder(GC_DELETE_POOL).numCoreThreads(conf.getCount(p)).build();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -794,26 +794,38 @@ public List<TCMResult> conditionalUpdate(TInfo tinfo, long sessID,
}
}

ArrayList<TCMResult> results = new ArrayList<>();

Map<KeyExtent,List<ServerConditionalMutation>> deferred =
conditionalUpdate(cs, updates, results, symbols);
var lambdaCs = cs;
// Conditional updates read data into memory, examine it, and then make an update. This can be
// CPU, I/O, and memory intensive. Using a thread pool directly limits CPU usage and
// indirectly limits memory and I/O usage.
Future<ArrayList<TCMResult>> future =
server.resourceManager.executeConditionalUpdate(cs.tableId, () -> {
ArrayList<TCMResult> results = new ArrayList<>();

Map<KeyExtent,List<ServerConditionalMutation>> deferred =
conditionalUpdate(lambdaCs, updates, results, symbols);

while (!deferred.isEmpty()) {
deferred = conditionalUpdate(lambdaCs, deferred, results, symbols);
}

while (!deferred.isEmpty()) {
deferred = conditionalUpdate(cs, deferred, results, symbols);
}
return results;
});

return results;
} catch (IOException ioe) {
throw new TException(ioe);
} catch (Exception e) {
return future.get();
} catch (ExecutionException | InterruptedException e) {
log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: {}",
cs == null ? null : cs.tableId, opid, e);
throw new TException(e);
} catch (RuntimeException e) {
log.warn("Exception returned for conditionalUpdate. tableId: {}, opid: {}",
cs == null ? null : cs.tableId, opid, e);
throw e;
} finally {
if (opid != null) {
writeTracker.finishWrite(opid);
}

if (cs != null) {
server.sessionManager.unreserveSession(sessID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_ASSIGNMENT_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.METADATA_TABLET_MIGRATION_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TABLET_ASSIGNMENT_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_META_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_ROOT_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_CONDITIONAL_UPDATE_USER_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_MINOR_COMPACTOR_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_FILE_RETRIEVER_POOL;
import static org.apache.accumulo.core.util.threads.ThreadPoolNames.TSERVER_SUMMARY_PARTITION_POOL;
Expand All @@ -46,8 +49,10 @@
import java.util.OptionalInt;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
Expand All @@ -62,9 +67,11 @@
import org.apache.accumulo.core.conf.AccumuloConfiguration.ScanExecutorConfig;
import org.apache.accumulo.core.conf.ConfigurationTypeHelper;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.blockfile.cache.impl.BlockCacheManagerFactory;
import org.apache.accumulo.core.file.blockfile.impl.ScanCacheProvider;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.spi.cache.BlockCache;
import org.apache.accumulo.core.spi.cache.BlockCacheManager;
import org.apache.accumulo.core.spi.cache.CacheType;
Expand Down Expand Up @@ -118,6 +125,8 @@ public class TabletServerResourceManager {
private final Map<String,ThreadPoolExecutor> scanExecutors;
private final Map<String,ScanExecutor> scanExecutorChoices;

private final Map<Ample.DataLevel,ThreadPoolExecutor> conditionalMutationExecutors;

private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments;

private final FileManager fileManager;
Expand Down Expand Up @@ -385,6 +394,27 @@ public TabletServerResourceManager(ServerContext context, TabletHostingServer ts
memMgmt = new MemoryManagementFramework();
memMgmt.startThreads();

var rootConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_ROOT),
TSERVER_CONDITIONAL_UPDATE_ROOT_POOL.poolName, rootConditionalPool);

var metaConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER),
TSERVER_CONDITIONAL_UPDATE_META_POOL.poolName, metaConditionalPool);

var userConditionalPool = ThreadPools.getServerThreadPools().createExecutorService(acuConf,
Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER, enableMetrics);
modifyThreadPoolSizesAtRuntime(
() -> context.getConfiguration().getCount(Property.TSERV_CONDITIONAL_UPDATE_THREADS_USER),
TSERVER_CONDITIONAL_UPDATE_USER_POOL.poolName, userConditionalPool);

conditionalMutationExecutors = Map.of(Ample.DataLevel.ROOT, rootConditionalPool,
Ample.DataLevel.METADATA, metaConditionalPool, Ample.DataLevel.USER, userConditionalPool);

// We can use the same map for both metadata and normal assignments since the keyspace (extent)
// is guaranteed to be unique. Schedule the task once, the task will reschedule itself.
ThreadPools.watchCriticalScheduledTask(context.getScheduledExecutor().schedule(
Expand Down Expand Up @@ -835,6 +865,10 @@ public void addMigration(KeyExtent tablet, Runnable migrationHandler) {
}
}

public <T> Future<T> executeConditionalUpdate(TableId tableId, Callable<T> updateTask) {
return conditionalMutationExecutors.get(Ample.DataLevel.of(tableId)).submit(updateTask);
}

public BlockCache getIndexCache() {
return _iCache;
}
Expand Down
Loading