Skip to content

Commit

Permalink
[minor](exchange) Rename shuffle partition type
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Feb 12, 2025
1 parent 2a5530d commit dd88373
Show file tree
Hide file tree
Showing 18 changed files with 62 additions and 65 deletions.
26 changes: 13 additions & 13 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_wait_broadcast_buffer_timer = ADD_CHILD_TIMER(_profile, "WaitForBroadcastBuffer", timer_name);
// do the shufffle make sure enough random
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM ||
_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
_part_type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED) {
std::random_device rd;
std::mt19937 g(rd());
shuffle(channels.begin(), channels.end(), g);
Expand Down Expand Up @@ -146,7 +146,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
} else if (_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
_partition_count = channels.size();
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
Expand All @@ -155,7 +155,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
p._tablet_sink_partition, p._tablet_sink_location, p._tablet_sink_tuple_id, this);
RETURN_IF_ERROR(_partitioner->init({}));
RETURN_IF_ERROR(_partitioner->prepare(state, {}));
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
} else if (_part_type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED) {
_partition_count =
channels.size() * config::table_sink_partition_write_max_partition_nums_per_writer;
_partitioner = std::make_unique<vectorized::ScaleWriterPartitioner>(
Expand Down Expand Up @@ -239,8 +239,8 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {

if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED ||
_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
_part_type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
RETURN_IF_ERROR(_partitioner->open(state));
}
return Status::OK();
Expand Down Expand Up @@ -283,10 +283,10 @@ ExchangeSinkOperatorX::ExchangeSinkOperatorX(
sink.output_partition.type == TPartitionType::HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::RANDOM ||
sink.output_partition.type == TPartitionType::RANGE_PARTITIONED ||
sink.output_partition.type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED ||
sink.output_partition.type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::TABLE_SINK_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED);
sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED ||
sink.output_partition.type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED);
_name = "ExchangeSinkOperatorX";
_pool = std::make_shared<ObjectPool>();
if (sink.__isset.output_tuple_id) {
Expand All @@ -299,7 +299,7 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
if (_part_type == TPartitionType::RANGE_PARTITIONED) {
return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used");
}
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
if (_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs,
_tablet_sink_expr_ctxs));
}
Expand All @@ -310,7 +310,7 @@ Status ExchangeSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
_state = state;
_compression_type = state->fragement_transmission_compression_type();
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
if (_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED) {
if (_output_tuple_id == -1) {
RETURN_IF_ERROR(
vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child->row_desc()));
Expand Down Expand Up @@ -442,10 +442,10 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
(local_state.current_channel_idx + 1) % local_state.channels.size();
} else if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED ||
_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
_part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED ||
_part_type == TPartitionType::HIVE_TABLE_SINK_HASH_PARTITIONED) {
RETURN_IF_ERROR(local_state._writer->write(&local_state, state, block, eos));
} else if (_part_type == TPartitionType::TABLE_SINK_RANDOM_PARTITIONED) {
} else if (_part_type == TPartitionType::HIVE_TABLE_SINK_UNPARTITIONED) {
// Control the number of channels according to the flow, thereby controlling the number of table sink writers.
// 1. select channel
auto& current_channel = local_state.channels[local_state.current_channel_idx];
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/tablet_sink_hash_partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ Status TabletSinkHashPartitioner::open(RuntimeState* state) {
for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i]));
}
// if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column
// if _part_type == TPartitionType::OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column
// on exchange node rather than on TabletWriter
_block_convertor =
std::make_unique<vectorized::OlapTableBlockConvertor>(_tablet_sink_tuple_desc);
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,7 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) {
}

_block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
// if partition_type is TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column
// if partition_type is OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column
// on exchange node rather than on TabletWriter
_block_convertor->init_autoinc_info(
_schema->db_id(), _schema->table_id(), _state->batch_size(),
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) {
}

_block_convertor = std::make_unique<OlapTableBlockConvertor>(_output_tuple_desc);
// if partition_type is TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column
// if partition_type is OLAP_TABLE_SINK_HASH_PARTITIONED, we handle the processing of auto_increment column
// on exchange node rather than on TabletWriter
_block_convertor->init_autoinc_info(
_schema->db_id(), _schema->table_id(), _state->batch_size(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,12 @@
import org.apache.doris.nereids.properties.DistributionSpecExecutionAny;
import org.apache.doris.nereids.properties.DistributionSpecGather;
import org.apache.doris.nereids.properties.DistributionSpecHash;
import org.apache.doris.nereids.properties.DistributionSpecHiveTableSinkHashPartitioned;
import org.apache.doris.nereids.properties.DistributionSpecHiveTableSinkUnPartitioned;
import org.apache.doris.nereids.properties.DistributionSpecOlapTableSinkHashPartitioned;
import org.apache.doris.nereids.properties.DistributionSpecReplicated;
import org.apache.doris.nereids.properties.DistributionSpecStorageAny;
import org.apache.doris.nereids.properties.DistributionSpecStorageGather;
import org.apache.doris.nereids.properties.DistributionSpecTableSinkHashPartitioned;
import org.apache.doris.nereids.properties.DistributionSpecTableSinkRandomPartitioned;
import org.apache.doris.nereids.properties.DistributionSpecTabletIdShuffle;
import org.apache.doris.nereids.properties.OrderKey;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.implementation.LogicalWindowToPhysicalWindow.WindowFrameGroup;
Expand Down Expand Up @@ -2664,21 +2664,21 @@ private DataPartition toDataPartition(DistributionSpec distributionSpec,
+ distributionSpecHash.getShuffleType());
}
return new DataPartition(partitionType, partitionExprs);
} else if (distributionSpec instanceof DistributionSpecTabletIdShuffle) {
} else if (distributionSpec instanceof DistributionSpecOlapTableSinkHashPartitioned) {
return DataPartition.TABLET_ID;
} else if (distributionSpec instanceof DistributionSpecTableSinkHashPartitioned) {
DistributionSpecTableSinkHashPartitioned partitionSpecHash =
(DistributionSpecTableSinkHashPartitioned) distributionSpec;
} else if (distributionSpec instanceof DistributionSpecHiveTableSinkHashPartitioned) {
DistributionSpecHiveTableSinkHashPartitioned partitionSpecHash =
(DistributionSpecHiveTableSinkHashPartitioned) distributionSpec;
List<Expr> partitionExprs = Lists.newArrayList();
List<ExprId> partitionExprIds = partitionSpecHash.getOutputColExprIds();
for (ExprId partitionExprId : partitionExprIds) {
if (childOutputIds.contains(partitionExprId)) {
partitionExprs.add(context.findSlotRef(partitionExprId));
}
}
return new DataPartition(TPartitionType.TABLE_SINK_HASH_PARTITIONED, partitionExprs);
} else if (distributionSpec instanceof DistributionSpecTableSinkRandomPartitioned) {
return new DataPartition(TPartitionType.TABLE_SINK_RANDOM_PARTITIONED);
return new DataPartition(TPartitionType.HIVE_TABLE_SINK_HASH_PARTITIONED, partitionExprs);
} else if (distributionSpec instanceof DistributionSpecHiveTableSinkUnPartitioned) {
return new DataPartition(TPartitionType.HIVE_TABLE_SINK_UNPARTITIONED);
} else {
throw new RuntimeException("Unknown DistributionSpec: " + distributionSpec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
/**
* use for shuffle data by partition keys before sink.
*/
public class DistributionSpecTableSinkHashPartitioned extends DistributionSpec {
public class DistributionSpecHiveTableSinkHashPartitioned extends DistributionSpec {

private List<ExprId> outputColExprIds;

public DistributionSpecTableSinkHashPartitioned() {
public DistributionSpecHiveTableSinkHashPartitioned() {
super();
}

Expand All @@ -42,6 +42,6 @@ public void setOutputColExprIds(List<ExprId> outputColExprIds) {

@Override
public boolean satisfy(DistributionSpec other) {
return other instanceof DistributionSpecTableSinkHashPartitioned;
return other instanceof DistributionSpecHiveTableSinkHashPartitioned;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,17 @@
/**
* use for Round Robin by data sink.
*/
public class DistributionSpecTableSinkRandomPartitioned extends DistributionSpec {
public class DistributionSpecHiveTableSinkUnPartitioned extends DistributionSpec {

public static final DistributionSpecTableSinkRandomPartitioned INSTANCE =
new DistributionSpecTableSinkRandomPartitioned();
public static final DistributionSpecHiveTableSinkUnPartitioned INSTANCE =
new DistributionSpecHiveTableSinkUnPartitioned();

private DistributionSpecTableSinkRandomPartitioned() {
private DistributionSpecHiveTableSinkUnPartitioned() {
super();
}

@Override
public boolean satisfy(DistributionSpec other) {
return other instanceof DistributionSpecTableSinkRandomPartitioned;
return other instanceof DistributionSpecHiveTableSinkUnPartitioned;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
/**
* use for shuffle data by tablet-id before sink.
*/
public class DistributionSpecTabletIdShuffle extends DistributionSpec {
public class DistributionSpecOlapTableSinkHashPartitioned extends DistributionSpec {

public static final DistributionSpecTabletIdShuffle INSTANCE = new DistributionSpecTabletIdShuffle();
public static final DistributionSpecOlapTableSinkHashPartitioned
INSTANCE = new DistributionSpecOlapTableSinkHashPartitioned();

private DistributionSpecTabletIdShuffle() {
private DistributionSpecOlapTableSinkHashPartitioned() {
super();
}

@Override
public boolean satisfy(DistributionSpec other) {
return other instanceof DistributionSpecTabletIdShuffle;
return other instanceof DistributionSpecOlapTableSinkHashPartitioned;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ public class PhysicalProperties {
public static PhysicalProperties MUST_SHUFFLE = new PhysicalProperties(DistributionSpecMustShuffle.INSTANCE);

public static PhysicalProperties TABLET_ID_SHUFFLE
= new PhysicalProperties(DistributionSpecTabletIdShuffle.INSTANCE);
= new PhysicalProperties(DistributionSpecOlapTableSinkHashPartitioned.INSTANCE);

public static PhysicalProperties SINK_RANDOM_PARTITIONED
= new PhysicalProperties(DistributionSpecTableSinkRandomPartitioned.INSTANCE);
= new PhysicalProperties(DistributionSpecHiveTableSinkUnPartitioned.INSTANCE);

private final OrderSpec orderSpec;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ public void finalizeSink(PlanFragment fragment, DataSink sink, PhysicalSink phys

// set schema and partition info for tablet id shuffle exchange
if (fragment.getPlanRoot() instanceof ExchangeNode
&& fragment.getDataPartition().getType() == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED) {
&& fragment.getDataPartition().getType() == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED) {
DataSink childFragmentSink = fragment.getChild(0).getSink();
DataStreamSink dataStreamSink = null;
if (childFragmentSink instanceof MultiCastDataSink) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.doris.datasource.hive.HMSExternalDatabase;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecTableSinkHashPartitioned;
import org.apache.doris.nereids.properties.DistributionSpecHiveTableSinkHashPartitioned;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.ExprId;
Expand Down Expand Up @@ -123,7 +123,7 @@ public PhysicalProperties getRequirePhysicalProperties() {
List<ExprId> exprIds = columnIdx.stream()
.map(idx -> child().getOutput().get(idx).getExprId())
.collect(Collectors.toList());
DistributionSpecTableSinkHashPartitioned shuffleInfo = new DistributionSpecTableSinkHashPartitioned();
DistributionSpecHiveTableSinkHashPartitioned shuffleInfo = new DistributionSpecHiveTableSinkHashPartitioned();
shuffleInfo.setOutputColExprIds(exprIds);
return new PhysicalProperties(shuffleInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.apache.doris.datasource.iceberg.IcebergExternalDatabase;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DistributionSpecTableSinkHashPartitioned;
import org.apache.doris.nereids.properties.DistributionSpecHiveTableSinkHashPartitioned;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.trees.expressions.ExprId;
Expand Down Expand Up @@ -124,7 +124,7 @@ public PhysicalProperties getRequirePhysicalProperties() {
List<ExprId> exprIds = columnIdx.stream()
.map(idx -> child().getOutput().get(idx).getExprId())
.collect(Collectors.toList());
DistributionSpecTableSinkHashPartitioned shuffleInfo = new DistributionSpecTableSinkHashPartitioned();
DistributionSpecHiveTableSinkHashPartitioned shuffleInfo = new DistributionSpecHiveTableSinkHashPartitioned();
shuffleInfo.setOutputColExprIds(exprIds);
return new PhysicalProperties(shuffleInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class DataPartition {

public static final DataPartition UNPARTITIONED = new DataPartition(TPartitionType.UNPARTITIONED);
public static final DataPartition RANDOM = new DataPartition(TPartitionType.RANDOM);
public static final DataPartition TABLET_ID = new DataPartition(TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED);
public static final DataPartition TABLET_ID = new DataPartition(TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED);

private final TPartitionType type;
// for hash partition: exprs used to compute hash value
Expand All @@ -58,7 +58,7 @@ public DataPartition(TPartitionType type, List<Expr> exprs) {
Preconditions.checkState(!exprs.isEmpty());
Preconditions.checkState(type == TPartitionType.HASH_PARTITIONED
|| type == TPartitionType.RANGE_PARTITIONED
|| type == TPartitionType.TABLE_SINK_HASH_PARTITIONED
|| type == TPartitionType.HIVE_TABLE_SINK_HASH_PARTITIONED
|| type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED);
this.type = type;
this.partitionExprs = ImmutableList.copyOf(exprs);
Expand All @@ -67,8 +67,8 @@ public DataPartition(TPartitionType type, List<Expr> exprs) {
public DataPartition(TPartitionType type) {
Preconditions.checkState(type == TPartitionType.UNPARTITIONED
|| type == TPartitionType.RANDOM
|| type == TPartitionType.TABLE_SINK_RANDOM_PARTITIONED
|| type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED);
|| type == TPartitionType.HIVE_TABLE_SINK_UNPARTITIONED
|| type == TPartitionType.OLAP_TABLE_SINK_HASH_PARTITIONED);
this.type = type;
this.partitionExprs = ImmutableList.of();
}
Expand All @@ -90,10 +90,6 @@ public boolean isBucketShuffleHashPartition() {
return type == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED;
}

public boolean isTabletSinkShufflePartition() {
return type == TPartitionType.TABLET_SINK_SHUFFLE_PARTITIONED;
}

public TPartitionType getType() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ public void setRightChildOfBroadcastHashJoin(boolean value) {
* FRAGMENT 0:
* Merging Exchange (id = 1)
* NL Join (id = 2)
* DataStreamSender (id = 3, dst_id = 3) (TABLET_SINK_SHUFFLE_PARTITIONED)
* DataStreamSender (id = 3, dst_id = 3) (OLAP_TABLE_SINK_HASH_PARTITIONED)
*
* FRAGMENT 1:
* Exchange (id = 3)
Expand Down
Loading

0 comments on commit dd88373

Please sign in to comment.