Skip to content

Commit

Permalink
Simplify writer count properties
Browse files Browse the repository at this point in the history
Now, we have only two properties

task.min-writer-count:
Minimum number of local parallel table
writers per task when prefer partitioning and
task writer scaling are not used.

task.max-writer-count:
Maximum number of local parallel table writers
per task when either task writer scaling or prefer
partitioning is used.
  • Loading branch information
gaurav8297 authored and sopel39 committed Sep 27, 2023
1 parent 609ac97 commit e4411b3
Show file tree
Hide file tree
Showing 25 changed files with 178 additions and 193 deletions.
37 changes: 13 additions & 24 deletions core/trino-main/src/main/java/io/trino/SystemSessionProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ public final class SystemSessionProperties
public static final String MIN_HASH_PARTITION_COUNT = "min_hash_partition_count";
public static final String MIN_HASH_PARTITION_COUNT_FOR_WRITE = "min_hash_partition_count_for_write";
public static final String PREFER_STREAMING_OPERATORS = "prefer_streaming_operators";
public static final String TASK_WRITER_COUNT = "task_writer_count";
public static final String TASK_PARTITIONED_WRITER_COUNT = "task_partitioned_writer_count";
public static final String TASK_MIN_WRITER_COUNT = "task_min_writer_count";
public static final String TASK_MAX_WRITER_COUNT = "task_max_writer_count";
public static final String TASK_CONCURRENCY = "task_concurrency";
public static final String TASK_SHARE_INDEX_LOADING = "task_share_index_loading";
public static final String QUERY_MAX_MEMORY = "query_max_memory";
Expand All @@ -84,7 +84,6 @@ public final class SystemSessionProperties
public static final String SCALE_WRITERS = "scale_writers";
public static final String TASK_SCALE_WRITERS_ENABLED = "task_scale_writers_enabled";
public static final String MAX_WRITER_TASKS_COUNT = "max_writer_tasks_count";
public static final String TASK_SCALE_WRITERS_MAX_WRITER_COUNT = "task_scale_writers_max_writer_count";
public static final String WRITER_SCALING_MIN_DATA_PROCESSED = "writer_scaling_min_data_processed";
public static final String SKEWED_PARTITION_MIN_DATA_PROCESSED_REBALANCE_THRESHOLD = "skewed_partition_min_data_processed_rebalance_threshold";
public static final String MAX_MEMORY_PER_PARTITION_WRITER = "max_memory_per_partition_writer";
Expand Down Expand Up @@ -297,15 +296,15 @@ public SystemSessionProperties(
false,
false),
integerProperty(
TASK_WRITER_COUNT,
"Number of local parallel table writers per task when prefer partitioning and task writer scaling are not used",
taskManagerConfig.getWriterCount(),
TASK_MIN_WRITER_COUNT,
"Minimum number of local parallel table writers per task when preferred partitioning and task writer scaling are not used",
taskManagerConfig.getMinWriterCount(),
false),
integerProperty(
TASK_PARTITIONED_WRITER_COUNT,
"Number of local parallel table writers per task when prefer partitioning is used",
taskManagerConfig.getPartitionedWriterCount(),
value -> validateValueIsPowerOfTwo(value, TASK_PARTITIONED_WRITER_COUNT),
TASK_MAX_WRITER_COUNT,
"Maximum number of local parallel table writers per task when either task writer scaling or preferred partitioning is used",
taskManagerConfig.getMaxWriterCount(),
value -> validateValueIsPowerOfTwo(value, TASK_MAX_WRITER_COUNT),
false),
booleanProperty(
REDISTRIBUTE_WRITES,
Expand Down Expand Up @@ -333,11 +332,6 @@ public SystemSessionProperties(
"Scale the number of concurrent table writers per task based on throughput",
taskManagerConfig.isScaleWritersEnabled(),
false),
integerProperty(
TASK_SCALE_WRITERS_MAX_WRITER_COUNT,
"Maximum number of writers per task up to which scaling will happen if task.scale-writers.enabled is set",
taskManagerConfig.getScaleWritersMaxWriterCount(),
true),
dataSizeProperty(
WRITER_SCALING_MIN_DATA_PROCESSED,
"Minimum amount of uncompressed output data processed by writers before writer scaling can happen",
Expand Down Expand Up @@ -1135,14 +1129,14 @@ public static boolean preferStreamingOperators(Session session)
return session.getSystemProperty(PREFER_STREAMING_OPERATORS, Boolean.class);
}

public static int getTaskWriterCount(Session session)
public static int getTaskMinWriterCount(Session session)
{
return session.getSystemProperty(TASK_WRITER_COUNT, Integer.class);
return session.getSystemProperty(TASK_MIN_WRITER_COUNT, Integer.class);
}

public static int getTaskPartitionedWriterCount(Session session)
public static int getTaskMaxWriterCount(Session session)
{
return session.getSystemProperty(TASK_PARTITIONED_WRITER_COUNT, Integer.class);
return session.getSystemProperty(TASK_MAX_WRITER_COUNT, Integer.class);
}

public static boolean isRedistributeWrites(Session session)
Expand All @@ -1165,11 +1159,6 @@ public static boolean isTaskScaleWritersEnabled(Session session)
return session.getSystemProperty(TASK_SCALE_WRITERS_ENABLED, Boolean.class);
}

public static int getTaskScaleWritersMaxWriterCount(Session session)
{
return session.getSystemProperty(TASK_SCALE_WRITERS_MAX_WRITER_COUNT, Integer.class);
}

public static int getMaxWriterTaskCount(Session session)
{
return session.getSystemProperty(MAX_WRITER_TASKS_COUNT, Integer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,17 +79,13 @@ public class TaskManagerConfig
private Duration interruptStuckSplitTasksDetectionInterval = new Duration(2, TimeUnit.MINUTES);

private boolean scaleWritersEnabled = true;
// Set the value of default max writer count to the number of processors * 2 and cap it to 64. We can set this value
// higher because preferred write partitioning is always enabled for local exchange thus partitioned inserts will never
// use this property. Additionally, we have a mechanism to stop scaling if local memory utilization is high.
private int scaleWritersMaxWriterCount = min(getAvailablePhysicalProcessorCount() * 2, 64);
private int writerCount = 1;
// Default value of partitioned task writer count should be above 1, otherwise it can create a plan
// with a single gather exchange node on the coordinator due to a single available processor. Whereas,
// on the worker nodes due to more available processors, the default value could be above 1. Therefore,
// it can cause error due to config mismatch during execution. Additionally, cap it to 64 in order to
// avoid small pages produced by local partitioning exchanges.
private int partitionedWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount() * 2), 2), 64);
private int minWriterCount = 1;
// Set the value of default max writer count to the number of processors * 2 and cap it to 64. It should be
// above 1, otherwise it can create a plan with a single gather exchange node on the coordinator due to a single
// available processor. Whereas, on the worker nodes due to more available processors, the default value could
// be above 1. Therefore, it can cause error due to config mismatch during execution. Additionally, cap
// it to 64 in order to avoid small pages produced by local partitioning exchanges.
private int maxWriterCount = min(max(nextPowerOfTwo(getAvailablePhysicalProcessorCount() * 2), 2), 64);
// Default value of task concurrency should be above 1, otherwise it can create a plan with a single gather
// exchange node on the coordinator due to a single available processor. Whereas, on the worker nodes due to
// more available processors, the default value could be above 1. Therefore, it can cause error due to config
Expand Down Expand Up @@ -460,46 +456,50 @@ public TaskManagerConfig setScaleWritersEnabled(boolean scaleWritersEnabled)
return this;
}

@Min(1)
public int getScaleWritersMaxWriterCount()
{
return scaleWritersMaxWriterCount;
}

@Config("task.scale-writers.max-writer-count")
@Deprecated
@LegacyConfig(value = "task.scale-writers.max-writer-count", replacedBy = "task.max-writer-count")
@ConfigDescription("Maximum number of writers per task up to which scaling will happen if task.scale-writers.enabled is set")
public TaskManagerConfig setScaleWritersMaxWriterCount(int scaleWritersMaxWriterCount)
{
this.scaleWritersMaxWriterCount = scaleWritersMaxWriterCount;
this.maxWriterCount = scaleWritersMaxWriterCount;
return this;
}

@Min(1)
public int getWriterCount()
public int getMinWriterCount()
{
return writerCount;
return minWriterCount;
}

@Config("task.writer-count")
@ConfigDescription("Number of local parallel table writers per task when prefer partitioning and task writer scaling are not used")
public TaskManagerConfig setWriterCount(int writerCount)
@Config("task.min-writer-count")
@ConfigDescription("Minimum number of local parallel table writers per task when preferred partitioning and task writer scaling are not used")
public TaskManagerConfig setMinWriterCount(int minWriterCount)
{
this.writerCount = writerCount;
this.minWriterCount = minWriterCount;
return this;
}

@Min(1)
@PowerOfTwo
public int getPartitionedWriterCount()
public int getMaxWriterCount()
{
return partitionedWriterCount;
return maxWriterCount;
}

@Config("task.max-writer-count")
@ConfigDescription("Maximum number of local parallel table writers per task when either task writer scaling or preferred partitioning is used")
public TaskManagerConfig setMaxWriterCount(int maxWriterCount)
{
this.maxWriterCount = maxWriterCount;
return this;
}

@Config("task.partitioned-writer-count")
@Deprecated
@LegacyConfig(value = "task.partitioned-writer-count", replacedBy = "task.max-writer-count")
@ConfigDescription("Number of local parallel table writers per task when prefer partitioning is used")
public TaskManagerConfig setPartitionedWriterCount(int partitionedWriterCount)
{
this.partitionedWriterCount = partitionedWriterCount;
this.maxWriterCount = partitionedWriterCount;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,8 +243,8 @@ private TestingTrinoServer(
.put("catalog.management", "dynamic")
.put("task.concurrency", "4")
.put("task.max-worker-threads", "4")
// Use task.writer-count > 1, as this allows to expose writer-concurrency related bugs.
.put("task.writer-count", "2")
// Use task.min-writer-count > 1, as this allows to expose writer-concurrency related bugs.
.put("task.min-writer-count", "2")
.put("exchange.client-threads", "4")
// Reduce memory footprint in tests
.put("exchange.max-buffer-size", "4MB")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,8 @@
import static io.trino.SystemSessionProperties.getPagePartitioningBufferPoolSize;
import static io.trino.SystemSessionProperties.getSkewedPartitionMinDataProcessedRebalanceThreshold;
import static io.trino.SystemSessionProperties.getTaskConcurrency;
import static io.trino.SystemSessionProperties.getTaskPartitionedWriterCount;
import static io.trino.SystemSessionProperties.getTaskScaleWritersMaxWriterCount;
import static io.trino.SystemSessionProperties.getTaskWriterCount;
import static io.trino.SystemSessionProperties.getTaskMaxWriterCount;
import static io.trino.SystemSessionProperties.getTaskMinWriterCount;
import static io.trino.SystemSessionProperties.getWriterScalingMinDataProcessed;
import static io.trino.SystemSessionProperties.isAdaptivePartialAggregationEnabled;
import static io.trino.SystemSessionProperties.isEnableCoordinatorDynamicFiltersDistribution;
Expand Down Expand Up @@ -3496,7 +3495,7 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin
{
// This check is required because we don't know which writer count to use when exchange is
// single distribution. It could be possible that when scaling is enabled, a single distribution is
// selected for partitioned write using "task_partitioned_writer_count". However, we can't say for sure
// selected for partitioned write using "task_max_writer_count". However, we can't say for sure
// whether this single distribution comes from unpartitioned or partitioned writer count.
if (isSingleGatheringExchange(source)) {
return 1;
Expand All @@ -3507,20 +3506,20 @@ private int getWriterCount(Session session, WriterScalingOptions connectorScalin
// enough to use it for cases with or without scaling enabled. Additionally, it doesn't lead
// to too many small files when scaling is disabled because single partition will be written by
// a single writer only.
int partitionedWriterCount = getTaskPartitionedWriterCount(session);
int partitionedWriterCount = getTaskMaxWriterCount(session);
if (isLocalScaledWriterExchange(source)) {
partitionedWriterCount = connectorScalingOptions.perTaskMaxScaledWriterCount()
.map(writerCount -> min(writerCount, getTaskPartitionedWriterCount(session)))
.orElse(getTaskPartitionedWriterCount(session));
.map(writerCount -> min(writerCount, getTaskMaxWriterCount(session)))
.orElse(getTaskMaxWriterCount(session));
}
return getPartitionedWriterCountBasedOnMemory(partitionedWriterCount, session);
}

int unpartitionedWriterCount = getTaskWriterCount(session);
int unpartitionedWriterCount = getTaskMinWriterCount(session);
if (isLocalScaledWriterExchange(source)) {
unpartitionedWriterCount = connectorScalingOptions.perTaskMaxScaledWriterCount()
.map(writerCount -> min(writerCount, getTaskScaleWritersMaxWriterCount(session)))
.orElse(getTaskScaleWritersMaxWriterCount(session));
.map(writerCount -> min(writerCount, getTaskMaxWriterCount(session)))
.orElse(getTaskMaxWriterCount(session));
}
// Consider memory while calculating writer count.
return min(unpartitionedWriterCount, getMaxWritersBasedOnMemory(session));
Expand All @@ -3542,8 +3541,8 @@ public PhysicalOperation visitMergeWriter(MergeWriterNode node, LocalExecutionPl
{
// Todo: Implement writer scaling for merge. https://github.com/trinodb/trino/issues/14622
int writerCount = node.getPartitioningScheme()
.map(scheme -> getTaskPartitionedWriterCount(session))
.orElseGet(() -> getTaskWriterCount(session));
.map(scheme -> getTaskMaxWriterCount(session))
.orElseGet(() -> getTaskMinWriterCount(session));
context.setDriverInstanceCount(writerCount);

PhysicalOperation source = node.getSource().accept(this, context);
Expand Down Expand Up @@ -4109,7 +4108,7 @@ private OperatorFactory createHashAggregationOperatorFactory(

private int getPartitionedWriterCountBasedOnMemory(Session session)
{
return getPartitionedWriterCountBasedOnMemory(getTaskPartitionedWriterCount(session), session);
return getPartitionedWriterCountBasedOnMemory(getTaskMaxWriterCount(session), session);
}

private int getPartitionedWriterCountBasedOnMemory(int partitionedWriterCount, Session session)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static io.trino.SystemSessionProperties.getTaskConcurrency;
import static io.trino.SystemSessionProperties.getTaskPartitionedWriterCount;
import static io.trino.SystemSessionProperties.getTaskWriterCount;
import static io.trino.SystemSessionProperties.getTaskMaxWriterCount;
import static io.trino.SystemSessionProperties.getTaskMinWriterCount;
import static io.trino.SystemSessionProperties.isDistributedSortEnabled;
import static io.trino.SystemSessionProperties.isSpillEnabled;
import static io.trino.SystemSessionProperties.isTaskScaleWritersEnabled;
Expand Down Expand Up @@ -746,7 +746,7 @@ private PlanWithProperties visitUnpartitionedWriter(PlanNode node, PlanNode sour
return rebaseAndDeriveProperties(node, ImmutableList.of(exchange));
}

if (getTaskWriterCount(session) == 1) {
if (getTaskMinWriterCount(session) == 1) {
return planAndEnforceChildren(node, singleStream(), defaultParallelism(session));
}

Expand All @@ -755,7 +755,7 @@ private PlanWithProperties visitUnpartitionedWriter(PlanNode node, PlanNode sour

private PlanWithProperties visitPartitionedWriter(PlanNode node, PartitioningScheme partitioningScheme, PlanNode source, StreamPreferredProperties parentPreferences)
{
if (getTaskPartitionedWriterCount(session) == 1) {
if (getTaskMaxWriterCount(session) == 1) {
return planAndEnforceChildren(node, singleStream(), defaultParallelism(session));
}

Expand Down Expand Up @@ -784,7 +784,7 @@ private PlanWithProperties visitPartitionedWriter(PlanNode node, PartitioningSch

private PlanWithProperties visitScalePartitionedWriter(PlanNode node, PartitioningScheme partitioningScheme, PlanNode source)
{
if (getTaskPartitionedWriterCount(session) == 1) {
if (getTaskMaxWriterCount(session) == 1) {
return planAndEnforceChildren(node, singleStream(), defaultParallelism(session));
}

Expand Down
Loading

0 comments on commit e4411b3

Please sign in to comment.