Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature](insert-overwrite) Support create partition for auto partition table when insert overwrite #38628

Merged
merged 3 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ Status VOlapTablePartitionParam::replace_partitions(

// add new partitions with new id.
_partitions.emplace_back(part);
VLOG_NOTICE << "params add new partition " << part->id;

// replace items in _partition_maps
if (_is_in_partition) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ Status ExchangeSinkLocalState::_send_new_partition_batch() {
vectorized::Block tmp_block =
_row_distribution._batching_block->to_block(); // Borrow out, for lval ref
auto& p = _parent->cast<ExchangeSinkOperatorX>();
// these order is only.
// these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
Expand Down
177 changes: 117 additions & 60 deletions be/src/vec/sink/vrow_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

#include <cstdint>
#include <memory>
#include <sstream>
#include <string>

#include "common/logging.h"
#include "common/status.h"
Expand Down Expand Up @@ -116,6 +116,10 @@ Status VRowDistribution::automatic_create_partition() {
if (result.status.status_code == TStatusCode::OK) {
// add new created partitions
RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
for (const auto& part : result.partitions) {
_new_partition_ids.insert(part.id);
VLOG_TRACE << "record new id: " << part.id;
}
RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
}

Expand All @@ -134,7 +138,7 @@ static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg

// use _partitions and replace them
Status VRowDistribution::_replace_overwriting_partition() {
SCOPED_TIMER(_add_partition_request_timer);
SCOPED_TIMER(_add_partition_request_timer); // also for replace_partition
TReplacePartitionRequest request;
TReplacePartitionResult result;
request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id());
Expand All @@ -144,16 +148,20 @@ Status VRowDistribution::_replace_overwriting_partition() {
// only request for partitions not recorded for replacement
std::set<int64_t> id_deduper;
for (const auto* part : _partitions) {
if (part == nullptr) [[unlikely]] {
return Status::InternalError(
"Cannot found origin partitions in auto detect overwriting, stop processing");
}
if (_new_partition_ids.contains(part->id)) {
// this is a new partition. dont replace again.
} else {
// request for replacement
id_deduper.insert(part->id);
}
if (part != nullptr) {
if (_new_partition_ids.contains(part->id)) {
// this is a new partition. dont replace again.
VLOG_TRACE << "skip new partition: " << part->id;
} else {
// request for replacement
id_deduper.insert(part->id);
}
} else if (_missing_map.empty()) {
// no origin partition. and not allow to create.
return Status::InvalidArgument(
"Cannot found origin partitions in auto detect overwriting, stop "
"processing");
} // else: part is null and _missing_map is not empty. dealed outside using auto-partition way. nothing to do here.
}
if (id_deduper.empty()) {
return Status::OK(); // no need to request
Expand Down Expand Up @@ -182,6 +190,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
// record new partitions
for (const auto& part : result.partitions) {
_new_partition_ids.insert(part.id);
VLOG_TRACE << "record new id: " << part.id;
}
// replace data in _partitions
RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions));
Expand Down Expand Up @@ -304,6 +313,52 @@ Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
return Status::OK();
}

Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
const std::vector<uint16_t>& partition_cols_idx,
int64_t& rows_stat_val) {
// for missing partition keys, calc the missing partition and save in _partitions_need_create
auto [part_ctxs, part_exprs] = _get_partition_function();
auto part_col_num = part_exprs.size();
// the two vectors are in column-first-order
std::vector<std::vector<std::string>> col_strs;
std::vector<const NullMap*> col_null_maps;
col_strs.resize(part_col_num);
col_null_maps.reserve(part_col_num);

for (int i = 0; i < part_col_num; ++i) {
auto return_type = part_exprs[i]->data_type();
// expose the data column. the return type would be nullable
const auto& [range_left_col, col_const] =
unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
if (range_left_col->is_nullable()) {
col_null_maps.push_back(&(
assert_cast<const ColumnNullable*>(range_left_col.get())->get_null_map_data()));
} else {
col_null_maps.push_back(nullptr);
}
for (auto row : _missing_map) {
col_strs[i].push_back(
return_type->to_string(*range_left_col, index_check_const(row, col_const)));
}
}

// calc the end value and save them. in the end of sending, we will create partitions for them and deal them.
RETURN_IF_ERROR(
_save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps));

size_t new_bt_rows = _batching_block->rows();
size_t new_bt_bytes = _batching_block->bytes();
rows_stat_val -= new_bt_rows - _batching_rows;
_state->update_num_rows_load_total(_batching_rows - new_bt_rows);
_state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes);
DorisMetrics::instance()->load_rows->increment(_batching_rows - new_bt_rows);
DorisMetrics::instance()->load_bytes->increment(_batching_bytes - new_bt_bytes);
_batching_rows = new_bt_rows;
_batching_bytes = new_bt_bytes;

return Status::OK();
}

Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
Expand All @@ -329,63 +384,64 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));

if (!_missing_map.empty()) {
// for missing partition keys, calc the missing partition and save in _partitions_need_create
auto [part_ctxs, part_exprs] = _get_partition_function();
auto part_col_num = part_exprs.size();
// the two vectors are in column-first-order
std::vector<std::vector<std::string>> col_strs;
std::vector<const NullMap*> col_null_maps;
col_strs.resize(part_col_num);
col_null_maps.reserve(part_col_num);

for (int i = 0; i < part_col_num; ++i) {
auto return_type = part_exprs[i]->data_type();
// expose the data column. the return type would be nullable
const auto& [range_left_col, col_const] =
unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
if (range_left_col->is_nullable()) {
col_null_maps.push_back(&(assert_cast<const ColumnNullable*>(range_left_col.get())
->get_null_map_data()));
} else {
col_null_maps.push_back(nullptr);
}
for (auto row : _missing_map) {
col_strs[i].push_back(
return_type->to_string(*range_left_col, index_check_const(row, col_const)));
}
}

// calc the end value and save them. in the end of sending, we will create partitions for them and deal them.
RETURN_IF_ERROR(
_save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps));

size_t new_bt_rows = _batching_block->rows();
size_t new_bt_bytes = _batching_block->bytes();
rows_stat_val -= new_bt_rows - _batching_rows;
_state->update_num_rows_load_total(_batching_rows - new_bt_rows);
_state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes);
DorisMetrics::instance()->load_rows->increment(_batching_rows - new_bt_rows);
DorisMetrics::instance()->load_bytes->increment(_batching_bytes - new_bt_bytes);
_batching_rows = new_bt_rows;
_batching_bytes = new_bt_bytes;
RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val));
}
return Status::OK();
}

Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
vectorized::Block* block, bool has_filtered_rows,
std::vector<RowPartTabletIds>& row_part_tablet_ids) {
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val) {
auto num_rows = block->rows();

// for non-auto-partition situation, goes into two 'else' branch. just find the origin partitions, replace them by rpc,
// and find the new partitions to use.
// for auto-partition's, find and save origins in _partitions and replace them. at meanwhile save the missing values for auto
// partition. then we find partition again to get replaced partitions in _partitions. this time _missing_map is ignored cuz
// we already saved missing values.
bool stop_processing = false;
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
_tablet_indexes, stop_processing, _skip));
if (_vpartition->is_auto_partition() &&
_state->query_options().enable_auto_create_when_overwrite) {
// allow auto create partition for missing rows.
std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys();
auto partition_col = block->get_by_position(partition_keys[0]);
_missing_map.clear();
_missing_map.reserve(partition_col.column->size());

RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
_tablet_indexes, stop_processing, _skip,
&_missing_map));

// allow and really need to create during auto-detect-overwriting.
if (!_missing_map.empty()) {
RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val));
}
} else {
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
_tablet_indexes, stop_processing, _skip));
}
RETURN_IF_ERROR(_replace_overwriting_partition());

// regenerate locations for new partitions & tablets
_reset_find_tablets(num_rows);
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
_tablet_indexes, stop_processing, _skip));
if (_vpartition->is_auto_partition() &&
_state->query_options().enable_auto_create_when_overwrite) {
// here _missing_map is just a placeholder
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
_tablet_indexes, stop_processing, _skip,
&_missing_map));
if (VLOG_TRACE_IS_ON) {
std::string tmp;
for (auto v : _missing_map) {
tmp += std::to_string(v).append(", ");
}
VLOG_TRACE << "Trace missing map of " << this << ':' << tmp;
}
} else {
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
_tablet_indexes, stop_processing, _skip));
}
if (has_filtered_rows) {
for (int i = 0; i < num_rows; i++) {
_skip[i] = _skip[i] || _block_convertor->filter_map()[i];
Expand Down Expand Up @@ -456,10 +512,11 @@ Status VRowDistribution::generate_rows_distribution(
}

Status st = Status::OK();
if (_vpartition->is_auto_detect_overwrite()) {
if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) {
zclllyybb marked this conversation as resolved.
Show resolved Hide resolved
// when overwrite, no auto create partition allowed.
st = _generate_rows_distribution_for_auto_overwrite(block.get(), has_filtered_rows,
row_part_tablet_ids);
st = _generate_rows_distribution_for_auto_overwrite(block.get(), partition_cols_idx,
has_filtered_rows, row_part_tablet_ids,
rows_stat_val);
} else if (_vpartition->is_auto_partition() && !_deal_batched) {
st = _generate_rows_distribution_for_auto_partition(block.get(), partition_cols_idx,
has_filtered_rows, row_part_tablet_ids,
Expand Down
9 changes: 7 additions & 2 deletions be/src/vec/sink/vrow_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,19 @@ class VRowDistribution {
vectorized::Block* block, const std::vector<uint16_t>& partition_col_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
// the whole process to deal missing rows. will call _save_missing_values
Status _deal_missing_map(vectorized::Block* block,
const std::vector<uint16_t>& partition_cols_idx,
int64_t& rows_stat_val);

Status _generate_rows_distribution_for_non_auto_partition(
vectorized::Block* block, bool has_filtered_rows,
std::vector<RowPartTabletIds>& row_part_tablet_ids);

Status _generate_rows_distribution_for_auto_overwrite(
vectorized::Block* block, bool has_filtered_rows,
std::vector<RowPartTabletIds>& row_part_tablet_ids);
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
Status _replace_overwriting_partition();

void _reset_row_part_tablet_ids(std::vector<RowPartTabletIds>& row_part_tablet_ids,
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ Status VTabletWriter::_send_new_partition_batch() {

Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref

// these order is only.
// these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ Status VTabletWriterV2::_send_new_partition_batch() {

Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref

// these order is only.
// these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ public class NativeInsertStmt extends InsertStmt {

boolean hasEmptyTargetColumns = false;
private boolean allowAutoPartition = true;
private boolean withAutoDetectOverwrite = false;

enum InsertType {
NATIVE_INSERT("insert_"),
Expand Down Expand Up @@ -333,11 +332,6 @@ public boolean isTransactionBegin() {
return isTransactionBegin;
}

public NativeInsertStmt withAutoDetectOverwrite() {
this.withAutoDetectOverwrite = true;
return this;
}

protected void preCheckAnalyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,10 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) {
LogicalPlan plan = visitQuery(ctx.query());
// partitionSpec may be NULL. means auto detect partition. only available when IOT
Pair<Boolean, List<String>> partitionSpec = visitPartitionSpec(ctx.partitionSpec());
// partitionSpec.second :
// null - auto detect
// zero - whole table
// others - specific partitions
boolean isAutoDetect = partitionSpec.second == null;
LogicalSink<?> sink = UnboundTableSinkCreator.createUnboundTableSinkMaybeOverwrite(
tableName.build(),
Expand Down
Loading
Loading