Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ensure PartitionedOutputOperator is run with fixed local distribution #13834

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public final class SystemSessionProperties
public static final String USE_EXACT_PARTITIONING = "use_exact_partitioning";
public static final String FORCE_SPILLING_JOIN = "force_spilling_join";
public static final String FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED = "fault_tolerant_execution_event_driven_scheduler_enabled";
public static final String FORCE_FIXED_DISTRIBUTION_FOR_PARTITIONED_OUTPUT_OPERATOR_ENABLED = "force_fixed_distribution_for_partitioned_output_operator_enabled";

private final List<PropertyMetadata<?>> sessionProperties;

Expand Down Expand Up @@ -865,6 +866,11 @@ public SystemSessionProperties(
FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED,
"Enable event driven scheduler for fault tolerant execution",
queryManagerConfig.isFaultTolerantExecutionEventDrivenSchedulerEnabled(),
true),
booleanProperty(
FORCE_FIXED_DISTRIBUTION_FOR_PARTITIONED_OUTPUT_OPERATOR_ENABLED,
"Force partitioned output operator to be run with fixed distribution",
optimizerConfig.isForceFixedDistributionForPartitionedOutputOperatorEnabled(),
true));
}

Expand Down Expand Up @@ -1548,4 +1554,9 @@ public static boolean isFaultTolerantExecutionEventDriverSchedulerEnabled(Sessio
{
return session.getSystemProperty(FAULT_TOLERANT_EXECUTION_EVENT_DRIVEN_SCHEDULER_ENABLED, Boolean.class);
}

public static boolean isForceFixedDistributionForPartitionedOutputOperatorEnabled(Session session)
{
return session.getSystemProperty(FORCE_FIXED_DISTRIBUTION_FOR_PARTITIONED_OUTPUT_OPERATOR_ENABLED, Boolean.class);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add a comment that PageReference is no longer needed after removing broadcast exchange

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should I add it to the commit message? Not sure if it makes sense to keep mentions of the PageReference in the codebase, since the PageReference itself is going away

package io.trino.operator.exchange;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.primitives.Ints;
import io.airlift.slice.XxHash64;
Expand All @@ -38,14 +39,15 @@

import java.io.Closeable;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
Expand All @@ -70,8 +72,6 @@ public class LocalExchange

private final List<LocalExchangeSource> sources;

private final LocalExchangeMemoryManager memoryManager;

// Physical written bytes for each writer in the same order as source buffers
private final List<Supplier<Long>> physicalWrittenBytesSuppliers = new CopyOnWriteArrayList<>();

Expand Down Expand Up @@ -102,37 +102,36 @@ public LocalExchange(
BlockTypeOperators blockTypeOperators,
DataSize writerMinSize)
{
ImmutableList.Builder<LocalExchangeSource> sources = ImmutableList.builder();
int bufferCount = computeBufferCount(partitioning, defaultConcurrency, partitionChannels);
for (int i = 0; i < bufferCount; i++) {
sources.add(new LocalExchangeSource(source -> checkAllSourcesFinished()));
}
this.sources = sources.build();

List<Consumer<PageReference>> buffers = this.sources.stream()
.map(buffer -> (Consumer<PageReference>) buffer::addPage)
.collect(toImmutableList());

this.memoryManager = new LocalExchangeMemoryManager(maxBufferedBytes.toBytes());
if (partitioning.equals(SINGLE_DISTRIBUTION)) {
exchangerSupplier = () -> new BroadcastExchanger(buffers, memoryManager);
}
else if (partitioning.equals(FIXED_BROADCAST_DISTRIBUTION)) {
exchangerSupplier = () -> new BroadcastExchanger(buffers, memoryManager);
}
else if (partitioning.equals(FIXED_ARBITRARY_DISTRIBUTION)) {
exchangerSupplier = () -> new RandomExchanger(buffers, memoryManager);
if (partitioning.equals(SINGLE_DISTRIBUTION) || partitioning.equals(FIXED_ARBITRARY_DISTRIBUTION)) {
LocalExchangeMemoryManager memoryManager = new LocalExchangeMemoryManager(maxBufferedBytes.toBytes());
sources = IntStream.range(0, bufferCount)
.mapToObj(i -> new LocalExchangeSource(memoryManager, source -> checkAllSourcesFinished()))
.collect(toImmutableList());
exchangerSupplier = () -> new RandomExchanger(asPageConsumers(sources), memoryManager);
}
else if (partitioning.equals(FIXED_PASSTHROUGH_DISTRIBUTION)) {
Iterator<LocalExchangeSource> sourceIterator = this.sources.iterator();
List<LocalExchangeMemoryManager> memoryManagers = IntStream.range(0, bufferCount)
.mapToObj(i -> new LocalExchangeMemoryManager(maxBufferedBytes.toBytes() / bufferCount))
.collect(toImmutableList());
sources = memoryManagers.stream()
.map(memoryManager -> new LocalExchangeSource(memoryManager, source -> checkAllSourcesFinished()))
.collect(toImmutableList());
AtomicInteger nextSource = new AtomicInteger();
exchangerSupplier = () -> {
checkState(sourceIterator.hasNext(), "no more sources");
return new PassthroughExchanger(sourceIterator.next(), maxBufferedBytes.toBytes() / bufferCount, memoryManager::updateMemoryUsage);
int currentSource = nextSource.getAndIncrement();
checkState(currentSource < sources.size(), "no more sources");
return new PassthroughExchanger(sources.get(currentSource), memoryManagers.get(currentSource));
};
}
else if (partitioning.equals(SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION)) {
LocalExchangeMemoryManager memoryManager = new LocalExchangeMemoryManager(maxBufferedBytes.toBytes());
sources = IntStream.range(0, bufferCount)
.mapToObj(i -> new LocalExchangeSource(memoryManager, source -> checkAllSourcesFinished()))
.collect(toImmutableList());
exchangerSupplier = () -> new ScaleWriterExchanger(
buffers,
asPageConsumers(sources),
memoryManager,
maxBufferedBytes.toBytes(),
() -> {
Expand All @@ -155,6 +154,11 @@ else if (isScaledWriterHashDistribution(partitioning)) {
bufferCount,
writerMinSize.toBytes());

LocalExchangeMemoryManager memoryManager = new LocalExchangeMemoryManager(maxBufferedBytes.toBytes());
sources = IntStream.range(0, bufferCount)
.mapToObj(i -> new LocalExchangeSource(memoryManager, source -> checkAllSourcesFinished()))
.collect(toImmutableList());

exchangerSupplier = () -> {
PartitionFunction partitionFunction = createPartitionFunction(
nodePartitioningManager,
Expand All @@ -166,7 +170,7 @@ else if (isScaledWriterHashDistribution(partitioning)) {
partitionChannelTypes,
partitionHashChannel);
ScaleWriterPartitioningExchanger exchanger = new ScaleWriterPartitioningExchanger(
buffers,
asPageConsumers(sources),
memoryManager,
maxBufferedBytes.toBytes(),
createPartitionPagePreparer(partitioning, partitionChannels),
Expand All @@ -179,6 +183,10 @@ else if (isScaledWriterHashDistribution(partitioning)) {
}
else if (partitioning.equals(FIXED_HASH_DISTRIBUTION) || partitioning.getCatalogHandle().isPresent() ||
(partitioning.getConnectorHandle() instanceof MergePartitioningHandle)) {
LocalExchangeMemoryManager memoryManager = new LocalExchangeMemoryManager(maxBufferedBytes.toBytes());
sources = IntStream.range(0, bufferCount)
.mapToObj(i -> new LocalExchangeSource(memoryManager, source -> checkAllSourcesFinished()))
.collect(toImmutableList());
exchangerSupplier = () -> {
PartitionFunction partitionFunction = createPartitionFunction(
nodePartitioningManager,
Expand All @@ -190,7 +198,7 @@ else if (partitioning.equals(FIXED_HASH_DISTRIBUTION) || partitioning.getCatalog
partitionChannelTypes,
partitionHashChannel);
return new PartitioningExchanger(
buffers,
asPageConsumers(sources),
memoryManager,
createPartitionPagePreparer(partitioning, partitionChannels),
partitionFunction);
Expand All @@ -206,11 +214,6 @@ public int getBufferCount()
return sources.size();
}

public long getBufferedBytes()
{
return memoryManager.getBufferedBytes();
}

public synchronized LocalExchangeSinkFactory createSinkFactory()
{
checkState(!noMoreSinkFactories, "No more sink factories already set");
Expand Down Expand Up @@ -394,6 +397,12 @@ private void checkAllSinksComplete()
sources.forEach(LocalExchangeSource::finish);
}

@VisibleForTesting
LocalExchangeSource getSource(int partitionIndex)
{
return sources.get(partitionIndex);
}

private static void checkNotHoldsLock(Object lock)
{
checkState(!Thread.holdsLock(lock), "Cannot execute this method while holding a lock");
Expand Down Expand Up @@ -440,6 +449,13 @@ else if (partitioning.equals(FIXED_HASH_DISTRIBUTION) || partitioning.getCatalog
return bufferCount;
}

private static List<Consumer<Page>> asPageConsumers(List<LocalExchangeSource> sources)
{
return sources.stream()
.map(buffer -> (Consumer<Page>) buffer::addPage)
.collect(toImmutableList());
}

// Sink factory is entirely a pass thought to LocalExchange.
// This class only exists as a separate entity to deal with the complex lifecycle caused
// by operator factories (e.g., duplicate and noMoreSinkFactories).
Expand Down
Loading