Skip to content

Commit

Permalink
Remove DynamicBucketNodeMap
Browse files Browse the repository at this point in the history
This class is a remnant of grouped execution.
  • Loading branch information
electrum committed Aug 9, 2022
1 parent 586b5fd commit c5c195b
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 178 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,35 +13,43 @@
*/
package io.trino.execution.scheduler;

import com.google.common.collect.ImmutableList;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;

import java.util.List;
import java.util.function.ToIntFunction;

import static java.util.Objects.requireNonNull;

public abstract class BucketNodeMap
public final class BucketNodeMap
{
private final List<InternalNode> bucketToNode;
private final ToIntFunction<Split> splitToBucket;

protected BucketNodeMap(ToIntFunction<Split> splitToBucket)
public BucketNodeMap(ToIntFunction<Split> splitToBucket, List<InternalNode> bucketToNode)
{
this.splitToBucket = requireNonNull(splitToBucket, "splitToBucket is null");
this.bucketToNode = ImmutableList.copyOf(requireNonNull(bucketToNode, "bucketToNode is null"));
}

public abstract int getBucketCount();

public abstract InternalNode getAssignedNode(int bucketedId);
public int getBucketCount()
{
return bucketToNode.size();
}

public abstract boolean isDynamic();
public int getBucket(Split split)
{
return splitToBucket.applyAsInt(split);
}

public final InternalNode getAssignedNode(Split split)
public InternalNode getAssignedNode(int bucketId)
{
return getAssignedNode(splitToBucket.applyAsInt(split));
return bucketToNode.get(bucketId);
}

public final int getBucket(Split split)
public InternalNode getAssignedNode(Split split)
{
return splitToBucket.applyAsInt(split);
return getAssignedNode(getBucket(split));
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -1438,8 +1438,7 @@ public void stateChanged(QueryState newState)
List<InternalNode> stageNodeList;
if (fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)) {
// no remote source
bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, false);

bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle);
stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(session, catalogHandle).allNodes());
Collections.shuffle(stageNodeList);
}
Expand Down Expand Up @@ -1892,30 +1891,21 @@ private static BucketToPartition createBucketToPartitionMap(
return new BucketToPartition(Optional.of(IntStream.range(0, partitionCount).toArray()), Optional.empty());
}
if (partitioningHandle.getCatalogHandle().isPresent()) {
BucketNodeMap bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, true);
BucketNodeMap bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle);
int bucketCount = bucketNodeMap.getBucketCount();
int[] bucketToPartition = new int[bucketCount];
if (bucketNodeMap.isDynamic()) {
int nextPartitionId = 0;
for (int bucket = 0; bucket < bucketCount; bucket++) {
bucketToPartition[bucket] = nextPartitionId % partitionCount;
// make sure all buckets mapped to the same node map to the same partition, such that locality requirements are respected in scheduling
Map<InternalNode, Integer> nodeToPartition = new HashMap<>();
int nextPartitionId = 0;
for (int bucket = 0; bucket < bucketCount; bucket++) {
InternalNode node = bucketNodeMap.getAssignedNode(bucket);
Integer partitionId = nodeToPartition.get(node);
if (partitionId == null) {
partitionId = nextPartitionId;
nextPartitionId++;
nodeToPartition.put(node, partitionId);
}
}
else {
// make sure all buckets mapped to the same node map to the same partition, such that locality requirements are respected in scheduling
Map<InternalNode, Integer> nodeToPartition = new HashMap<>();
int nextPartitionId = 0;
for (int bucket = 0; bucket < bucketCount; bucket++) {
InternalNode node = bucketNodeMap.getAssignedNode(bucket);
Integer partitionId = nodeToPartition.get(node);
if (partitionId == null) {
partitionId = nextPartitionId;
nextPartitionId++;
nodeToPartition.put(node, partitionId);
}
bucketToPartition[bucket] = partitionId;
}
bucketToPartition[bucket] = partitionId;
}
return new BucketToPartition(Optional.of(bucketToPartition), Optional.of(bucketNodeMap));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,7 @@ public synchronized ListenableFuture<List<TaskDescriptor>> getMoreTasks()
int bucket = bucketNodeMap.getBucket(split);
int partition = getPartitionForBucket(bucket);

if (!bucketNodeMap.isDynamic()) {
{
HostAddress requiredAddress = bucketNodeMap.getAssignedNode(split).getHostAndPort();
Set<HostAddress> existingRequirement = partitionToNodeMap.get(partition);
if (existingRequirement.isEmpty()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.google.common.collect.ImmutableList;
import io.trino.execution.scheduler.BucketNodeMap;
import io.trino.execution.scheduler.FixedBucketNodeMap;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;

Expand Down Expand Up @@ -76,6 +75,6 @@ public BucketNodeMap asBucketNodeMap()
for (int partition : bucketToPartition) {
bucketToNode.add(partitionToNode.get(partition));
}
return new FixedBucketNodeMap(splitToBucket, bucketToNode.build());
return new BucketNodeMap(splitToBucket, bucketToNode.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@
import io.trino.connector.CatalogHandle;
import io.trino.connector.CatalogServiceProvider;
import io.trino.execution.scheduler.BucketNodeMap;
import io.trino.execution.scheduler.FixedBucketNodeMap;
import io.trino.execution.scheduler.NodeScheduler;
import io.trino.execution.scheduler.NodeSelector;
import io.trino.execution.scheduler.group.DynamicBucketNodeMap;
import io.trino.metadata.InternalNode;
import io.trino.metadata.Split;
import io.trino.operator.BucketPartitionFunction;
Expand Down Expand Up @@ -227,24 +225,18 @@ private NodePartitionMap systemNodePartitionMap(Session session, PartitioningHan
});
}

public BucketNodeMap getBucketNodeMap(Session session, PartitioningHandle partitioningHandle, boolean preferDynamic)
public BucketNodeMap getBucketNodeMap(Session session, PartitioningHandle partitioningHandle)
{
Optional<ConnectorBucketNodeMap> bucketNodeMap = getConnectorBucketNodeMap(session, partitioningHandle);

ToIntFunction<Split> splitToBucket = getSplitToBucket(session, partitioningHandle);
if (bucketNodeMap.map(ConnectorBucketNodeMap::hasFixedMapping).orElse(false)) {
return new FixedBucketNodeMap(splitToBucket, getFixedMapping(bucketNodeMap.get()));
}

if (preferDynamic) {
int bucketCount = bucketNodeMap.map(ConnectorBucketNodeMap::getBucketCount)
.orElseGet(() -> getNodeCount(session, partitioningHandle));
return new DynamicBucketNodeMap(splitToBucket, bucketCount);
return new BucketNodeMap(splitToBucket, getFixedMapping(bucketNodeMap.get()));
}

List<InternalNode> nodes = getAllNodes(session, requiredCatalogHandle(partitioningHandle));
int bucketCount = bucketNodeMap.map(ConnectorBucketNodeMap::getBucketCount).orElseGet(nodes::size);
return new FixedBucketNodeMap(splitToBucket, createArbitraryBucketToNode(nodes, bucketCount));
return new BucketNodeMap(splitToBucket, createArbitraryBucketToNode(nodes, bucketCount));
}

public int getNodeCount(Session session, PartitioningHandle partitioningHandle)
Expand Down
Loading

0 comments on commit c5c195b

Please sign in to comment.