Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add multipleWritersPerPartitionSupported flag in ConnectorTableLayout + Refactor scale writer #14956

Merged
merged 4 commits into from
Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@
import static io.trino.sql.planner.SystemPartitioningHandle.COORDINATOR_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
import static io.trino.sql.planner.plan.ExchangeNode.Type.REPLICATE;
Expand Down Expand Up @@ -176,7 +176,7 @@ private SplitAssigner createSplitAssigner(
.addAll(replicatedSources)
.build());
}
if (partitioning.equals(FIXED_ARBITRARY_DISTRIBUTION) || partitioning.equals(SCALED_WRITER_DISTRIBUTION) || partitioning.equals(SOURCE_DISTRIBUTION)) {
if (partitioning.equals(FIXED_ARBITRARY_DISTRIBUTION) || partitioning.equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION) || partitioning.equals(SOURCE_DISTRIBUTION)) {
return new ArbitraryDistributionSplitAssigner(
partitioning.getCatalogHandle(),
partitionedSources,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@
import static io.trino.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static io.trino.spi.StandardErrorCode.REMOTE_TASK_FAILED;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
import static io.trino.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
import static io.trino.sql.planner.plan.ExchangeNode.Type.REPLICATE;
Expand Down Expand Up @@ -952,7 +952,7 @@ private static Optional<int[]> getBucketToPartition(
PlanNode fragmentRoot,
List<RemoteSourceNode> remoteSourceNodes)
{
if (partitioningHandle.equals(SOURCE_DISTRIBUTION) || partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
if (partitioningHandle.equals(SOURCE_DISTRIBUTION) || partitioningHandle.equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
return Optional.of(new int[1]);
}
if (searchFrom(fragmentRoot).where(node -> node instanceof TableScanNode).findFirst().isPresent()) {
Expand Down Expand Up @@ -986,7 +986,7 @@ private static Map<PlanFragmentId, PipelinedOutputBufferManager> createOutputBuf
if (partitioningHandle.equals(FIXED_BROADCAST_DISTRIBUTION)) {
outputBufferManager = new BroadcastPipelinedOutputBufferManager();
}
else if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
else if (partitioningHandle.equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
outputBufferManager = new ScaledPipelinedOutputBufferManager();
}
else {
Expand Down Expand Up @@ -1058,7 +1058,7 @@ public void stateChanged(QueryState newState)
() -> childStageExecutions.stream().anyMatch(StageExecution::isAnyTaskBlocked));
}

if (partitioningHandle.equals(SCALED_WRITER_DISTRIBUTION)) {
if (partitioningHandle.equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
Supplier<Collection<TaskStatus>> sourceTasksProvider = () -> childStageExecutions.stream()
.map(StageExecution::getTaskStatuses)
.flatMap(List::stream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@
import static io.trino.sql.planner.SystemPartitioningHandle.COORDINATOR_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SOURCE_DISTRIBUTION;
import static io.trino.sql.planner.plan.ExchangeNode.Type.REPLICATE;
Expand Down Expand Up @@ -160,7 +160,7 @@ public TaskSource create(
if (partitioning.equals(SINGLE_DISTRIBUTION) || partitioning.equals(COORDINATOR_DISTRIBUTION)) {
return SingleDistributionTaskSource.create(fragment, exchangeSourceHandles, nodeManager, partitioning.equals(COORDINATOR_DISTRIBUTION));
}
if (partitioning.equals(FIXED_ARBITRARY_DISTRIBUTION) || partitioning.equals(SCALED_WRITER_DISTRIBUTION)) {
if (partitioning.equals(FIXED_ARBITRARY_DISTRIBUTION) || partitioning.equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
return ArbitraryDistributionTaskSource.create(
fragment,
exchangeSourceHandles,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
*/
package io.trino.operator.exchange;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import io.airlift.slice.XxHash64;
Expand Down Expand Up @@ -41,6 +40,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
Expand All @@ -53,7 +53,7 @@
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_HASH_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_PASSTHROUGH_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static java.util.Objects.requireNonNull;
import static java.util.function.Function.identity;
Expand All @@ -67,6 +67,9 @@ public class LocalExchange

private final LocalExchangeMemoryManager memoryManager;

// Physical written bytes for each writer in the same order as source buffers
private final List<Supplier<Long>> physicalWrittenBytesSuppliers = new CopyOnWriteArrayList<>();

@GuardedBy("this")
private boolean allSourcesFinished;

Expand All @@ -92,7 +95,6 @@ public LocalExchange(
Optional<Integer> partitionHashChannel,
DataSize maxBufferedBytes,
BlockTypeOperators blockTypeOperators,
Supplier<Long> physicalWrittenBytesSupplier,
DataSize writerMinSize)
{
ImmutableList.Builder<LocalExchangeSource> sources = ImmutableList.builder();
Expand Down Expand Up @@ -123,12 +125,19 @@ else if (partitioning.equals(FIXED_PASSTHROUGH_DISTRIBUTION)) {
return new PassthroughExchanger(sourceIterator.next(), maxBufferedBytes.toBytes() / bufferCount, memoryManager::updateMemoryUsage);
};
}
else if (partitioning.equals(SCALED_WRITER_DISTRIBUTION)) {
else if (partitioning.equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
exchangerSupplier = () -> new ScaleWriterExchanger(
buffers,
memoryManager,
maxBufferedBytes.toBytes(),
physicalWrittenBytesSupplier,
() -> {
// Avoid using stream api for performance reasons
long physicalWrittenBytes = 0;
for (Supplier<Long> physicalWrittenBytesSupplier : physicalWrittenBytesSuppliers) {
physicalWrittenBytes += physicalWrittenBytesSupplier.get();
}
return physicalWrittenBytes;
},
writerMinSize);
}
else if (partitioning.equals(FIXED_HASH_DISTRIBUTION) || partitioning.getCatalogHandle().isPresent() ||
Expand All @@ -143,18 +152,10 @@ else if (partitioning.equals(FIXED_HASH_DISTRIBUTION) || partitioning.getCatalog
partitionChannels,
partitionChannelTypes,
partitionHashChannel);
Function<Page, Page> partitionPagePreparer;
if (isSystemPartitioning(partitioning)) {
partitionPagePreparer = identity();
}
else {
int[] partitionChannelsArray = Ints.toArray(partitionChannels);
partitionPagePreparer = page -> page.getColumns(partitionChannelsArray);
}
return new PartitioningExchanger(
buffers,
memoryManager,
partitionPagePreparer,
createPartitionPagePreparer(partitioning, partitionChannels),
partitionFunction);
};
}
Expand All @@ -181,18 +182,26 @@ public synchronized LocalExchangeSinkFactory createSinkFactory()
return newFactory;
}

public synchronized LocalExchangeSource getNextSource()
public synchronized LocalExchangeSource getNextSource(Supplier<Long> physicalWrittenBytesSupplier)
{
checkState(nextSourceIndex < sources.size(), "All operators already created");
LocalExchangeSource result = sources.get(nextSourceIndex);
physicalWrittenBytesSuppliers.add(physicalWrittenBytesSupplier);
nextSourceIndex++;
return result;
}

@VisibleForTesting
LocalExchangeSource getSource(int partitionIndex)
private static Function<Page, Page> createPartitionPagePreparer(PartitioningHandle partitioning, List<Integer> partitionChannels)
{
return sources.get(partitionIndex);
Function<Page, Page> partitionPagePreparer;
if (partitioning.getConnectorHandle() instanceof SystemPartitioningHandle) {
partitionPagePreparer = identity();
}
else {
int[] partitionChannelsArray = Ints.toArray(partitionChannels);
partitionPagePreparer = page -> page.getColumns(partitionChannelsArray);
}
return partitionPagePreparer;
}

private static PartitionFunction createPartitionFunction(
Expand Down Expand Up @@ -358,7 +367,7 @@ else if (partitioning.equals(FIXED_PASSTHROUGH_DISTRIBUTION)) {
bufferCount = defaultConcurrency;
checkArgument(partitionChannels.isEmpty(), "Passthrough exchange must not have partition channels");
}
else if (partitioning.equals(SCALED_WRITER_DISTRIBUTION)) {
else if (partitioning.equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
// Even when scale writers is enabled, the buffer count or the number of drivers will remain constant.
// However, only some of them are actively doing the work.
bufferCount = defaultConcurrency;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public Operator createOperator(DriverContext driverContext)
checkState(!closed, "Factory is already closed");

OperatorContext operatorContext = driverContext.addOperatorContext(operatorId, planNodeId, LocalExchangeSourceOperator.class.getSimpleName());
return new LocalExchangeSourceOperator(operatorContext, localExchange.getNextSource());
return new LocalExchangeSourceOperator(operatorContext, localExchange.getNextSource(driverContext::getPhysicalWrittenDataSize));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public Operator createOperator(DriverContext driverContext)
PageWithPositionComparator comparator = orderingCompiler.compilePageWithPositionComparator(types, sortChannels, orderings);
List<LocalExchangeSource> sources = IntStream.range(0, localExchange.getBufferCount())
.boxed()
.map(index -> localExchange.getNextSource())
.map(index -> localExchange.getNextSource(driverContext::getPhysicalWrittenDataSize))
.collect(toImmutableList());
return new LocalMergeSourceOperator(operatorContext, sources, types, comparator);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@
import static io.trino.sql.planner.SystemPartitioningHandle.COORDINATOR_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_ARBITRARY_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.FIXED_BROADCAST_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION;
import static io.trino.sql.planner.SystemPartitioningHandle.SINGLE_DISTRIBUTION;
import static io.trino.sql.planner.optimizations.PlanNodeSearcher.searchFrom;
import static io.trino.sql.planner.plan.AggregationNode.Step.FINAL;
Expand Down Expand Up @@ -516,7 +516,7 @@ public LocalExecutionPlan plan(

if (partitioningScheme.getPartitioning().getHandle().equals(FIXED_BROADCAST_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(FIXED_ARBITRARY_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(SCALED_WRITER_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(SINGLE_DISTRIBUTION) ||
partitioningScheme.getPartitioning().getHandle().equals(COORDINATOR_DISTRIBUTION)) {
return plan(taskContext, plan, outputLayout, types, partitionedSourceOrder, new TaskOutputFactory(outputBuffer));
Expand Down Expand Up @@ -3506,7 +3506,7 @@ private boolean isLocalScaledWriterExchange(PlanNode node)

return result.isPresent()
&& result.get() instanceof ExchangeNode
&& ((ExchangeNode) result.get()).getPartitioningScheme().getPartitioning().getHandle().equals(SCALED_WRITER_DISTRIBUTION);
&& ((ExchangeNode) result.get()).getPartitioningScheme().getPartitioning().getHandle().isScaleWriters();
}

private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlanContext context)
Expand All @@ -3533,7 +3533,6 @@ private PhysicalOperation createLocalMerge(ExchangeNode node, LocalExecutionPlan
Optional.empty(),
maxLocalExchangeBufferSize,
blockTypeOperators,
context.getTaskContext()::getPhysicalWrittenDataSize,
getWriterMinSize(session));

List<Symbol> expectedLayout = node.getInputs().get(0);
Expand Down Expand Up @@ -3611,7 +3610,6 @@ else if (context.getDriverInstanceCount().isPresent()) {
hashChannel,
maxLocalExchangeBufferSize,
blockTypeOperators,
context.getTaskContext()::getPhysicalWrittenDataSize,
getWriterMinSize(session));
for (int i = 0; i < node.getSources().size(); i++) {
DriverFactoryParameters driverFactoryParameters = driverFactoryParametersList.get(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,28 @@ public class PartitioningHandle
private final Optional<CatalogHandle> catalogHandle;
private final Optional<ConnectorTransactionHandle> transactionHandle;
private final ConnectorPartitioningHandle connectorHandle;
private final boolean scaleWriters;

public PartitioningHandle(
Optional<CatalogHandle> catalogHandle,
Optional<ConnectorTransactionHandle> transactionHandle,
ConnectorPartitioningHandle connectorHandle)
{
this(catalogHandle, transactionHandle, connectorHandle, false);
}

@JsonCreator
public PartitioningHandle(
@JsonProperty("catalogHandle") Optional<CatalogHandle> catalogHandle,
@JsonProperty("transactionHandle") Optional<ConnectorTransactionHandle> transactionHandle,
@JsonProperty("connectorHandle") ConnectorPartitioningHandle connectorHandle)
@JsonProperty("connectorHandle") ConnectorPartitioningHandle connectorHandle,
@JsonProperty("scaleWriters") boolean scaleWriters)
{
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.transactionHandle = requireNonNull(transactionHandle, "transactionHandle is null");
checkArgument(catalogHandle.isEmpty() || transactionHandle.isPresent(), "transactionHandle is required when catalogHandle is present");
this.connectorHandle = requireNonNull(connectorHandle, "connectorHandle is null");
this.scaleWriters = scaleWriters;
}

@JsonProperty
Expand All @@ -61,6 +72,12 @@ public ConnectorPartitioningHandle getConnectorHandle()
return connectorHandle;
}

@JsonProperty
public boolean isScaleWriters()
{
return scaleWriters;
}

public boolean isSingleNode()
{
return connectorHandle.isSingleNode();
Expand All @@ -84,21 +101,26 @@ public boolean equals(Object o)

return Objects.equals(catalogHandle, that.catalogHandle) &&
Objects.equals(transactionHandle, that.transactionHandle) &&
Objects.equals(connectorHandle, that.connectorHandle);
Objects.equals(connectorHandle, that.connectorHandle) &&
scaleWriters == that.scaleWriters;
}

@Override
public int hashCode()
{
return Objects.hash(catalogHandle, transactionHandle, connectorHandle);
return Objects.hash(catalogHandle, transactionHandle, connectorHandle, scaleWriters);
}

@Override
public String toString()
{
String result = connectorHandle.toString();
if (scaleWriters) {
result = result + " (scale writers)";
}
if (catalogHandle.isPresent()) {
return catalogHandle.get() + ":" + connectorHandle;
result = catalogHandle.get() + ":" + result;
}
return connectorHandle.toString();
return result;
}
}
Loading