Skip to content

Commit

Permalink
Merge branch 'master' into 20231214_set_user_prop
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Dec 17, 2023
2 parents 8a912ba + 03e989b commit 8867435
Show file tree
Hide file tree
Showing 408 changed files with 2,673 additions and 2,158 deletions.
1 change: 0 additions & 1 deletion be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1493,7 +1493,6 @@ void PublishVersionWorkerPool::publish_version_callback(const TAgentTaskRequest&
.tag("retry_time", retry_time)
.error(status);
++retry_time;
std::this_thread::sleep_for(std::chrono::seconds(1));
}

for (auto& item : discontinuous_version_tablets) {
Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -967,6 +967,7 @@ DEFINE_Bool(enable_debug_points, "false");

DEFINE_Int32(pipeline_executor_size, "0");
DEFINE_Bool(enable_workload_group_for_scan, "false");
DEFINE_mInt64(workload_group_scan_task_wait_timeout_ms, "10000");
// 128 MB
DEFINE_mInt64(local_exchange_buffer_mem_limit, "134217728");

Expand Down
1 change: 1 addition & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ DECLARE_Bool(enable_debug_points);

DECLARE_Int32(pipeline_executor_size);
DECLARE_Bool(enable_workload_group_for_scan);
DECLARE_mInt64(workload_group_scan_task_wait_timeout_ms);

// Temp config. True to use optimization for bitmap_index apply predicate except leaf node of the and node.
// Will remove after fully test.
Expand Down
12 changes: 6 additions & 6 deletions be/src/geo/geo_types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,26 +212,26 @@ GeoShape* GeoShape::from_wkb(const char* data, size_t size, GeoParseStatus* stat
return shape;
}

GeoShape* GeoShape::from_encoded(const void* ptr, size_t size) {
std::unique_ptr<GeoShape> GeoShape::from_encoded(const void* ptr, size_t size) {
if (size < 2 || ((const char*)ptr)[0] != 0X00) {
return nullptr;
}
std::unique_ptr<GeoShape> shape;
switch (((const char*)ptr)[1]) {
case GEO_SHAPE_POINT: {
shape.reset(GeoPoint::create_unique().release());
shape = GeoPoint::create_unique();
break;
}
case GEO_SHAPE_LINE_STRING: {
shape.reset(GeoLine::create_unique().release());
shape = GeoLine::create_unique();
break;
}
case GEO_SHAPE_POLYGON: {
shape.reset(GeoPolygon::create_unique().release());
shape = GeoPolygon::create_unique();
break;
}
case GEO_SHAPE_CIRCLE: {
shape.reset(GeoCircle::create_unique().release());
shape = GeoCircle::create_unique();
break;
}
default:
Expand All @@ -241,7 +241,7 @@ GeoShape* GeoShape::from_encoded(const void* ptr, size_t size) {
if (!res) {
return nullptr;
}
return shape.release();
return shape;
}

GeoParseStatus GeoPoint::from_coord(double x, double y) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/geo/geo_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class GeoShape {
virtual GeoShapeType type() const = 0;

// decode from serialized data
static GeoShape* from_encoded(const void* data, size_t size);
static std::unique_ptr<GeoShape> from_encoded(const void* data, size_t size);
// try to construct a GeoShape from a WKT. If construct successfully, a GeoShape will
// be returned, and the client should delete it when don't need it.
// return nullptr if convert failed, and reason will be set in status
Expand Down
8 changes: 6 additions & 2 deletions be/src/olap/full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,12 @@ Status FullCompaction::modify_rowsets(const Merger::Statistics* stats) {
_full_compaction_update_delete_bitmap(_output_rowset, _output_rs_writer.get()));
}
std::vector<RowsetSharedPtr> output_rowsets(1, _output_rowset);
RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true));
_tablet->save_meta();
{
std::lock_guard<std::mutex> rowset_update_wlock(_tablet->get_rowset_update_lock());
std::lock_guard<std::shared_mutex> meta_wlock(_tablet->get_header_lock());
RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true));
_tablet->save_meta();
}
return Status::OK();
}

Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/rowset/segment_v2/segment_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1019,8 +1019,8 @@ Status SegmentWriter::finalize(uint64_t* segment_file_size, uint64_t* index_size
timer.start();
// check disk capacity
if (_data_dir != nullptr && _data_dir->reach_capacity_limit((int64_t)estimate_segment_size())) {
return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit.",
_data_dir->path_hash());
return Status::Error<DISK_REACH_CAPACITY_LIMIT>("disk {} exceed capacity limit, path: {}",
_data_dir->path_hash(), _data_dir->path());
}
// write data
RETURN_IF_ERROR(finalize_columns_data());
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -652,8 +652,8 @@ Status TxnManager::delete_txn(OlapMeta* meta, TPartitionId partition_id,
<< (rowset != nullptr ? rowset->rowset_id().to_string() : "0");
}
}
it->second.erase(load_itr);
}
it->second.erase(load_itr);
if (it->second.empty()) {
txn_tablet_map.erase(it);
_clear_txn_partition_map_unlocked(transaction_id, partition_id);
Expand Down
7 changes: 7 additions & 0 deletions be/src/pipeline/exec/nested_loop_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ class NestedLoopJoinProbeOperatorX final
return _old_version_flag ? _row_descriptor : *_intermediate_row_desc;
}

ExchangeType get_local_exchange_type() const override {
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
return ExchangeType::NOOP;
}
return ExchangeType::ADAPTIVE_PASSTHROUGH;
}

const RowDescriptor& row_desc() override {
return _old_version_flag
? (_output_row_descriptor ? *_output_row_descriptor : _row_descriptor)
Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@
#include "vec/runtime/vdata_stream_mgr.h"

namespace doris::pipeline {
bvar::Adder<int64_t> g_pipeline_tasks_count("doris_pipeline_tasks_count");

PipelineFragmentContext::PipelineFragmentContext(
const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id,
Expand Down Expand Up @@ -370,7 +371,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
_tasks.emplace_back(std::move(task));
_runtime_profile->add_child(pipeline->pipeline_profile(), true, nullptr);
}

g_pipeline_tasks_count << _total_tasks;
for (auto& task : _tasks) {
RETURN_IF_ERROR(task->prepare(_runtime_state.get()));
}
Expand Down Expand Up @@ -889,6 +890,7 @@ void PipelineFragmentContext::_close_fragment_instance() {

void PipelineFragmentContext::close_a_pipeline() {
std::lock_guard<std::mutex> l(_task_mutex);
g_pipeline_tasks_count << -1;
++_closed_tasks;
if (_closed_tasks == _total_tasks) {
_close_fragment_instance();
Expand Down
3 changes: 3 additions & 0 deletions be/src/pipeline/pipeline_x/dependency.h
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,7 @@ enum class ExchangeType : uint8_t {
PASSTHROUGH = 2,
BUCKET_HASH_SHUFFLE = 3,
BROADCAST = 4,
ADAPTIVE_PASSTHROUGH = 5,
};

inline std::string get_exchange_type_name(ExchangeType idx) {
Expand All @@ -591,6 +592,8 @@ inline std::string get_exchange_type_name(ExchangeType idx) {
return "BUCKET_HASH_SHUFFLE";
case ExchangeType::BROADCAST:
return "BROADCAST";
case ExchangeType::ADAPTIVE_PASSTHROUGH:
return "ADAPTIVE_PASSTHROUGH";
}
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class LocalExchangeSinkLocalState final
friend class BucketShuffleExchanger;
friend class PassthroughExchanger;
friend class BroadcastExchanger;
friend class AdaptivePassthroughExchanger;

Exchanger* _exchanger = nullptr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class LocalExchangeSourceLocalState final
friend class ShuffleExchanger;
friend class PassthroughExchanger;
friend class BroadcastExchanger;
friend class AdaptivePassthroughExchanger;

Exchanger* _exchanger = nullptr;
int _channel_id;
Expand Down
196 changes: 196 additions & 0 deletions be/src/pipeline/pipeline_x/local_exchange/local_exchanger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,200 @@ Status BroadcastExchanger::get_block(RuntimeState* state, vectorized::Block* blo
return Status::OK();
}

Status AdaptivePassthroughExchanger::_passthrough_sink(RuntimeState* state,
vectorized::Block* in_block,
SourceState source_state,
LocalExchangeSinkLocalState& local_state) {
vectorized::Block new_block;
if (!_free_blocks.try_dequeue(new_block)) {
new_block = {in_block->clone_empty()};
}
new_block.swap(*in_block);
auto channel_id = (local_state._channel_id++) % _num_partitions;
local_state._shared_state->add_mem_usage(channel_id, new_block.allocated_bytes());
_passthrough_data_queue[channel_id].enqueue(std::move(new_block));
local_state._shared_state->set_ready_to_read(channel_id);

return Status::OK();
}

bool AdaptivePassthroughExchanger::_passthrough_get_block(
RuntimeState* state, vectorized::Block* block, SourceState& source_state,
LocalExchangeSourceLocalState& local_state) {
vectorized::Block next_block;
if (_running_sink_operators == 0) {
if (_passthrough_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
_free_blocks.enqueue(std::move(next_block));
local_state._shared_state->sub_mem_usage(local_state._channel_id,
block->allocated_bytes());
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
source_state = SourceState::FINISHED;
return false;
}
} else if (_passthrough_data_queue[local_state._channel_id].try_dequeue(next_block)) {
block->swap(next_block);
_free_blocks.enqueue(std::move(next_block));
local_state._shared_state->sub_mem_usage(local_state._channel_id, block->allocated_bytes());
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
return false;
}
return true;
}

Status AdaptivePassthroughExchanger::_shuffle_sink(RuntimeState* state, vectorized::Block* block,
SourceState source_state,
LocalExchangeSinkLocalState& local_state) {
std::vector<uint32_t> channel_ids;
const auto num_rows = block->rows();
channel_ids.resize(num_rows, 0);
if (num_rows <= _num_partitions) {
std::iota(channel_ids.begin(), channel_ids.end(), 0);
} else {
size_t i = 0;
for (; i < num_rows - _num_partitions; i += _num_partitions) {
std::iota(channel_ids.begin() + i, channel_ids.begin() + i + _num_partitions, 0);
}
if (i < num_rows - 1) {
std::iota(channel_ids.begin() + i, channel_ids.end(), 0);
}
}
return _split_rows(state, channel_ids.data(), block, source_state, local_state);
}

Status AdaptivePassthroughExchanger::_split_rows(RuntimeState* state,
const uint32_t* __restrict channel_ids,
vectorized::Block* block, SourceState source_state,
LocalExchangeSinkLocalState& local_state) {
auto& data_queue = _shuffle_data_queue;
const auto rows = block->rows();
auto row_idx = std::make_shared<std::vector<uint32_t>>(rows);
{
local_state._partition_rows_histogram.assign(_num_partitions + 1, 0);
for (size_t i = 0; i < rows; ++i) {
local_state._partition_rows_histogram[channel_ids[i]]++;
}
for (int32_t i = 1; i <= _num_partitions; ++i) {
local_state._partition_rows_histogram[i] +=
local_state._partition_rows_histogram[i - 1];
}

for (int32_t i = rows - 1; i >= 0; --i) {
(*row_idx)[local_state._partition_rows_histogram[channel_ids[i]] - 1] = i;
local_state._partition_rows_histogram[channel_ids[i]]--;
}
}

vectorized::Block data_block;
std::shared_ptr<ShuffleBlockWrapper> new_block_wrapper;
if (_free_blocks.try_enqueue(data_block)) {
new_block_wrapper = ShuffleBlockWrapper::create_shared(std::move(data_block));
} else {
new_block_wrapper = ShuffleBlockWrapper::create_shared(block->clone_empty());
}

new_block_wrapper->data_block.swap(*block);
if (new_block_wrapper->data_block.empty()) {
return Status::OK();
}
local_state._shared_state->add_total_mem_usage(new_block_wrapper->data_block.allocated_bytes());
new_block_wrapper->ref(_num_partitions);

for (size_t i = 0; i < _num_partitions; i++) {
size_t start = local_state._partition_rows_histogram[i];
size_t size = local_state._partition_rows_histogram[i + 1] - start;
if (size > 0) {
local_state._shared_state->add_mem_usage(
i, new_block_wrapper->data_block.allocated_bytes(), false);
data_queue[i].enqueue({new_block_wrapper, {row_idx, start, size}});
local_state._shared_state->set_ready_to_read(i);
} else {
new_block_wrapper->unref(local_state._shared_state);
}
}

return Status::OK();
}
bool AdaptivePassthroughExchanger::_shuffle_get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state,
LocalExchangeSourceLocalState& local_state) {
PartitionedBlock partitioned_block;
std::unique_ptr<vectorized::MutableBlock> mutable_block = nullptr;

auto get_data = [&](vectorized::Block* result_block) {
do {
const auto* offset_start = &((
*std::get<0>(partitioned_block.second))[std::get<1>(partitioned_block.second)]);
auto block_wrapper = partitioned_block.first;
local_state._shared_state->sub_mem_usage(
local_state._channel_id, block_wrapper->data_block.allocated_bytes(), false);
mutable_block->add_rows(&block_wrapper->data_block, offset_start,
offset_start + std::get<2>(partitioned_block.second));
block_wrapper->unref(local_state._shared_state);
} while (mutable_block->rows() < state->batch_size() &&
_shuffle_data_queue[local_state._channel_id].try_dequeue(partitioned_block));
*result_block = mutable_block->to_block();
};
if (_running_sink_operators == 0) {
if (_shuffle_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block = vectorized::MutableBlock::create_unique(
partitioned_block.first->data_block.clone_empty());
get_data(block);
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
source_state = SourceState::FINISHED;
return false;
}
} else if (_shuffle_data_queue[local_state._channel_id].try_dequeue(partitioned_block)) {
SCOPED_TIMER(local_state._copy_data_timer);
mutable_block = vectorized::MutableBlock::create_unique(
partitioned_block.first->data_block.clone_empty());
get_data(block);
} else {
COUNTER_UPDATE(local_state._get_block_failed_counter, 1);
return false;
}
return true;
}

Status AdaptivePassthroughExchanger::sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state,
LocalExchangeSinkLocalState& local_state) {
if (_is_pass_through) {
return _passthrough_sink(state, in_block, source_state, local_state);
} else {
if (_total_block++ > _num_partitions) {
_is_pass_through = true;
}
return _shuffle_sink(state, in_block, source_state, local_state);
}
}

Status AdaptivePassthroughExchanger::get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state,
LocalExchangeSourceLocalState& local_state) {
auto is_shuffle_success = false, is_passthrough_success = false;
SourceState shuffle_state = SourceState::MORE_DATA, passthrough_state = SourceState::MORE_DATA;

is_shuffle_success = _shuffle_get_block(state, block, shuffle_state, local_state);

if (is_shuffle_success) {
return Status::OK();
}

is_passthrough_success = _passthrough_get_block(state, block, passthrough_state, local_state);

if (is_passthrough_success) {
return Status::OK();
}

if (shuffle_state == SourceState::FINISHED && passthrough_state == SourceState::FINISHED) {
source_state = SourceState::FINISHED;
}
return Status::OK();
}

} // namespace doris::pipeline
Loading

0 comments on commit 8867435

Please sign in to comment.