Skip to content

Commit

Permalink
[fix](group commit) Fix group commit in nereids
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi committed Jun 28, 2024
1 parent 3f3a6eb commit 228eabe
Show file tree
Hide file tree
Showing 15 changed files with 197 additions and 302 deletions.
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/group_commit_block_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,14 @@ Status GroupCommitBlockSinkLocalState::open(RuntimeState* state) {
Status GroupCommitBlockSinkLocalState::_initialize_load_queue() {
auto& p = _parent->cast<GroupCommitBlockSinkOperatorX>();
if (_state->exec_env()->wal_mgr()->is_running()) {
std::string label;
int64_t txn_id;
RETURN_IF_ERROR(_state->exec_env()->group_commit_mgr()->get_first_block_load_queue(
p._db_id, p._table_id, p._base_schema_version, p._load_id, _load_block_queue,
_state->be_exec_version(), _state->query_mem_tracker(), _create_plan_dependency,
_put_block_dependency));
_put_block_dependency, label, txn_id));
_state->set_import_label(label);
_state->set_wal_id(txn_id); // wal_id is txn_id
return Status::OK();
} else {
return Status::InternalError("be is stopping");
Expand Down
4 changes: 4 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
params.__set_tracking_url(
to_load_error_http_path(rs->get_error_log_file_path()));
}
if (rs->wal_id() > 0) {
params.__set_txn_id(rs->wal_id());
params.__set_label(rs->import_label());
}
}
}
if (!req.runtime_state->export_output_files().empty()) {
Expand Down
9 changes: 5 additions & 4 deletions be/src/runtime/group_commit_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ Status GroupCommitTable::get_first_block_load_queue(
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep) {
std::shared_ptr<pipeline::Dependency> put_block_dep, std::string& label, int64_t& txn_id) {
DCHECK(table_id == _table_id);
std::unique_lock l(_lock);
auto try_to_get_matched_queue = [&]() -> Status {
Expand All @@ -266,7 +266,8 @@ Status GroupCommitTable::get_first_block_load_queue(
if (base_schema_version == inner_block_queue->schema_version) {
if (inner_block_queue->add_load_id(load_id, put_block_dep).ok()) {
load_block_queue = inner_block_queue;

label = inner_block_queue->label;
txn_id = inner_block_queue->txn_id;
return Status::OK();
}
} else {
Expand Down Expand Up @@ -588,7 +589,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
std::shared_ptr<LoadBlockQueue>& load_block_queue, int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep) {
std::shared_ptr<pipeline::Dependency> put_block_dep, std::string& label, int64_t& txn_id) {
std::shared_ptr<GroupCommitTable> group_commit_table;
{
std::lock_guard wlock(_lock);
Expand All @@ -601,7 +602,7 @@ Status GroupCommitMgr::get_first_block_load_queue(
}
RETURN_IF_ERROR(group_commit_table->get_first_block_load_queue(
table_id, base_schema_version, load_id, load_block_queue, be_exe_version, mem_tracker,
create_plan_dep, put_block_dep));
create_plan_dep, put_block_dep, label, txn_id));
return Status::OK();
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/group_commit_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,8 @@ class GroupCommitTable {
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep);
std::shared_ptr<pipeline::Dependency> put_block_dep,
std::string& label, int64_t& txn_id);
Status get_load_block_queue(const TUniqueId& instance_id,
std::shared_ptr<LoadBlockQueue>& load_block_queue,
std::shared_ptr<pipeline::Dependency> get_block_dep);
Expand Down Expand Up @@ -207,7 +208,8 @@ class GroupCommitMgr {
int be_exe_version,
std::shared_ptr<MemTrackerLimiter> mem_tracker,
std::shared_ptr<pipeline::Dependency> create_plan_dep,
std::shared_ptr<pipeline::Dependency> put_block_dep);
std::shared_ptr<pipeline::Dependency> put_block_dep,
std::string& label, int64_t& txn_id);
std::promise<Status> debug_promise;
std::future<Status> debug_future = debug_promise.get_future();

Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor
if (cte.isPresent()) {
this.logicalQuery = ((LogicalPlan) cte.get().withChildren(logicalQuery));
}

if (this.logicalQuery instanceof UnboundTableSink) {
OlapGroupCommitInsertExecutor.analyzeGroupCommit(ctx, targetTableIf,
(UnboundTableSink<?>) this.logicalQuery);
}
LogicalPlanAdapter logicalPlanAdapter = new LogicalPlanAdapter(logicalQuery, ctx.getStatementContext());
NereidsPlanner planner = new NereidsPlanner(ctx.getStatementContext());
planner.plan(logicalPlanAdapter, ctx.getSessionVariable().toThrift());
Expand All @@ -167,17 +170,16 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor

if (physicalSink instanceof PhysicalOlapTableSink) {
boolean emptyInsert = childIsEmptyRelation(physicalSink);
if (GroupCommitInsertExecutor.canGroupCommit(ctx, sink, physicalSink, planner)) {
insertExecutor = new GroupCommitInsertExecutor(ctx, targetTableIf, label, planner, insertCtx,
emptyInsert);
targetTableIf.readUnlock();
return insertExecutor;
}
OlapTable olapTable = (OlapTable) targetTableIf;
// the insertCtx contains some variables to adjust SinkNode
insertExecutor = ctx.isTxnModel()
? new OlapTxnInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert)
: new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert);
if (ctx.isTxnModel()) {
insertExecutor = new OlapTxnInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert);
} else if (ctx.isGroupCommit()) {
insertExecutor = new OlapGroupCommitInsertExecutor(ctx, olapTable, label, planner, insertCtx,
emptyInsert);
} else {
insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert);
}

boolean isEnableMemtableOnSinkNode =
olapTable.getTableProperty().getUseSchemaLightChange()
Expand Down
Loading

0 comments on commit 228eabe

Please sign in to comment.