Skip to content

Commit

Permalink
[Improvement](schema scan) Use async scanner for schema scanners (#38403
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Gabriel39 authored and dataroaring committed Aug 2, 2024
1 parent b6dac2c commit e679879
Show file tree
Hide file tree
Showing 54 changed files with 163 additions and 69 deletions.
69 changes: 68 additions & 1 deletion be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@
#include "exec/schema_scanner/schema_workload_groups_scanner.h"
#include "exec/schema_scanner/schema_workload_sched_policy_scanner.h"
#include "olap/hll.h"
#include "pipeline/dependency.h"
#include "runtime/define_primitive_type.h"
#include "runtime/fragment_mgr.h"
#include "runtime/types.h"
#include "util/string_util.h"
#include "util/types.h"
#include "vec/columns/column.h"
Expand All @@ -65,6 +68,7 @@
#include "vec/core/column_with_type_and_name.h"
#include "vec/core/types.h"
#include "vec/data_types/data_type.h"
#include "vec/data_types/data_type_factory.hpp"

namespace doris {
class ObjectPool;
Expand All @@ -85,7 +89,60 @@ Status SchemaScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaScanner::get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
if (_data_block == nullptr) {
return Status::InternalError("No data left!");
}
DCHECK(_async_thread_running == false);
RETURN_IF_ERROR(_scanner_status.status());
for (size_t i = 0; i < block->columns(); i++) {
std::move(*block->get_by_position(i).column)
.mutate()
->insert_range_from(*_data_block->get_by_position(i).column, 0,
_data_block->rows());
}
_data_block->clear_column_data();
*eos = _eos;
if (!*eos) {
RETURN_IF_ERROR(get_next_block_async(state));
}
return Status::OK();
}

Status SchemaScanner::get_next_block_async(RuntimeState* state) {
_dependency->block();
auto task_ctx = state->get_task_execution_context();
RETURN_IF_ERROR(ExecEnv::GetInstance()->fragment_mgr()->get_thread_pool()->submit_func(
[this, task_ctx, state]() {
DCHECK(_async_thread_running == false);
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
_scanner_status.update(Status::InternalError("Task context not exists!"));
return;
}
SCOPED_ATTACH_TASK(state);
_dependency->block();
_async_thread_running = true;
_finish_dependency->block();
if (!_opened) {
_data_block = vectorized::Block::create_unique();
_init_block(_data_block.get());
_scanner_status.update(start(state));
_opened = true;
}
bool eos = false;
_scanner_status.update(get_next_block_internal(_data_block.get(), &eos));
_eos = eos;
_async_thread_running = false;
_dependency->set_ready();
if (eos) {
_finish_dependency->set_ready();
}
}));
return Status::OK();
}

Status SchemaScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("used before initialized.");
}
Expand Down Expand Up @@ -176,6 +233,16 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
}
}

void SchemaScanner::_init_block(vectorized::Block* src_block) {
const std::vector<SchemaScanner::ColumnDesc>& columns_desc(get_column_desc());
for (int i = 0; i < columns_desc.size(); ++i) {
TypeDescriptor descriptor(columns_desc[i].type);
auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true);
src_block->insert(vectorized::ColumnWithTypeAndName(data_type->create_column(), data_type,
columns_desc[i].name));
}
}

Status SchemaScanner::fill_dest_column_for_range(vectorized::Block* block, size_t pos,
const std::vector<void*>& datas) {
const ColumnDesc& col_desc = _columns[pos];
Expand Down
24 changes: 23 additions & 1 deletion be/src/exec/schema_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <stddef.h>
#include <stdint.h>

#include <condition_variable>
#include <memory>
#include <string>
#include <vector>
Expand All @@ -43,6 +44,10 @@ namespace vectorized {
class Block;
}

namespace pipeline {
class Dependency;
}

struct SchemaScannerCommonParam {
SchemaScannerCommonParam()
: db(nullptr),
Expand Down Expand Up @@ -94,15 +99,23 @@ class SchemaScanner {

// init object need information, schema etc.
virtual Status init(SchemaScannerParam* param, ObjectPool* pool);
Status get_next_block(RuntimeState* state, vectorized::Block* block, bool* eos);
// Start to work
virtual Status start(RuntimeState* state);
virtual Status get_next_block(vectorized::Block* block, bool* eos);
virtual Status get_next_block_internal(vectorized::Block* block, bool* eos);
const std::vector<ColumnDesc>& get_column_desc() const { return _columns; }
// factory function
static std::unique_ptr<SchemaScanner> create(TSchemaTableType::type type);
TSchemaTableType::type type() const { return _schema_table_type; }
void set_dependency(std::shared_ptr<pipeline::Dependency> dep,
std::shared_ptr<pipeline::Dependency> fin_dep) {
_dependency = dep;
_finish_dependency = fin_dep;
}
Status get_next_block_async(RuntimeState* state);

protected:
void _init_block(vectorized::Block* src_block);
Status fill_dest_column_for_range(vectorized::Block* block, size_t pos,
const std::vector<void*>& datas);

Expand All @@ -125,6 +138,15 @@ class SchemaScanner {
RuntimeProfile::Counter* _get_table_timer = nullptr;
RuntimeProfile::Counter* _get_describe_timer = nullptr;
RuntimeProfile::Counter* _fill_block_timer = nullptr;

std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
std::shared_ptr<pipeline::Dependency> _finish_dependency = nullptr;

std::unique_ptr<vectorized::Block> _data_block;
AtomicStatus _scanner_status;
std::atomic<bool> _eos = false;
std::atomic<bool> _opened = false;
std::atomic<bool> _async_thread_running = false;
};

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ Status SchemaActiveQueriesScanner::_get_active_queries_block_from_fe() {
return Status::OK();
}

Status SchemaActiveQueriesScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaActiveQueriesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_active_queries_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaActiveQueriesScanner : public SchemaScanner {
~SchemaActiveQueriesScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;

Expand Down
3 changes: 2 additions & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ Status SchemaBackendActiveTasksScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaBackendActiveTasksScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaBackendActiveTasksScanner::get_next_block_internal(vectorized::Block* block,
bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_backend_active_tasks.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaBackendActiveTasksScanner : public SchemaScanner {
~SchemaBackendActiveTasksScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_charsets_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ SchemaCharsetsScanner::SchemaCharsetsScanner()

SchemaCharsetsScanner::~SchemaCharsetsScanner() {}

Status SchemaCharsetsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaCharsetsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("call this before initial.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_charsets_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaCharsetsScanner : public SchemaScanner {
SchemaCharsetsScanner();
~SchemaCharsetsScanner() override;

Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

private:
struct CharsetStruct {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_collations_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ SchemaCollationsScanner::SchemaCollationsScanner()

SchemaCollationsScanner::~SchemaCollationsScanner() {}

Status SchemaCollationsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaCollationsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("call this before initial.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_collations_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaCollationsScanner : public SchemaScanner {
SchemaCollationsScanner();
~SchemaCollationsScanner() override;

Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

private:
struct CollationStruct {
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_columns_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ Status SchemaColumnsScanner::_get_new_table() {
return Status::OK();
}

Status SchemaColumnsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaColumnsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("use this class before inited.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_columns_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SchemaColumnsScanner : public SchemaScanner {
SchemaColumnsScanner();
~SchemaColumnsScanner() override;
Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

private:
Status _get_new_table();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_dummy_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Status SchemaDummyScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaDummyScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaDummyScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
*eos = true;
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_dummy_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ class SchemaDummyScanner : public SchemaScanner {
SchemaDummyScanner();
~SchemaDummyScanner() override;
Status start(RuntimeState* state = nullptr) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;
};

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_files_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ Status SchemaFilesScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaFilesScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaFilesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_files_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SchemaFilesScanner : public SchemaScanner {
~SchemaFilesScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

int _db_index;
int _table_index;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ Status SchemaMetadataNameIdsScanner::_fill_block_impl(vectorized::Block* block)
return Status::OK();
}

Status SchemaMetadataNameIdsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaMetadataNameIdsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class SchemaMetadataNameIdsScanner : public SchemaScanner {
~SchemaMetadataNameIdsScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

private:
Status _get_new_table();
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_partitions_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Status SchemaPartitionsScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaPartitionsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaPartitionsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_partitions_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SchemaPartitionsScanner : public SchemaScanner {
~SchemaPartitionsScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

int _db_index;
int _table_index;
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_processlist_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ Status SchemaProcessListScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaProcessListScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaProcessListScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("call this before initial.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_processlist_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SchemaProcessListScanner : public SchemaScanner {
~SchemaProcessListScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_processlist_columns;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_profiling_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ Status SchemaProfilingScanner::start(RuntimeState* state) {
return Status::OK();
}

Status SchemaProfilingScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaProfilingScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_profiling_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class SchemaProfilingScanner : public SchemaScanner {
~SchemaProfilingScanner() override;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;
};
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_routine_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ Status SchemaRoutinesScanner::get_block_from_fe() {
return Status::OK();
}

Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaRoutinesScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_routine_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class SchemaRoutinesScanner : public SchemaScanner {
~SchemaRoutinesScanner() override = default;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

static std::vector<SchemaScanner::ColumnDesc> _s_tbls_columns;

Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_rowsets_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ Status SchemaRowsetsScanner::_get_all_rowsets() {
return Status::OK();
}

Status SchemaRowsetsScanner::get_next_block(vectorized::Block* block, bool* eos) {
Status SchemaRowsetsScanner::get_next_block_internal(vectorized::Block* block, bool* eos) {
if (!_is_init) {
return Status::InternalError("Used before initialized.");
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/exec/schema_scanner/schema_rowsets_scanner.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class SchemaRowsetsScanner : public SchemaScanner {
~SchemaRowsetsScanner() override = default;

Status start(RuntimeState* state) override;
Status get_next_block(vectorized::Block* block, bool* eos) override;
Status get_next_block_internal(vectorized::Block* block, bool* eos) override;

private:
Status _get_all_rowsets();
Expand Down
Loading

0 comments on commit e679879

Please sign in to comment.