Skip to content

Commit

Permalink
Storages: refine SegmentReadTaskScheduler (#8557)
Browse files Browse the repository at this point in the history
ref #6834
  • Loading branch information
JinheLin authored Jan 3, 2024
1 parent f6ea813 commit 64c1ae5
Show file tree
Hide file tree
Showing 11 changed files with 420 additions and 441 deletions.
7 changes: 3 additions & 4 deletions dbms/src/Flash/Pipeline/Schedule/Tasks/RFWaitTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,14 @@ class RFWaitTask : public Task

static void submitReadyRfsAndSegmentTaskPool(
const RuntimeFilteList & ready_rf_list,
const DM::SegmentReadTaskPoolPtr & task_pool,
const LoggerPtr & log)
const DM::SegmentReadTaskPoolPtr & task_pool)
{
for (const RuntimeFilterPtr & rf : ready_rf_list)
{
auto rs_operator = rf->parseToRSOperator(task_pool->getColumnToRead());
task_pool->appendRSOperator(rs_operator);
}
DM::SegmentReadTaskScheduler::instance().add(task_pool, log);
DM::SegmentReadTaskScheduler::instance().add(task_pool);
}

private:
Expand All @@ -83,7 +82,7 @@ class RFWaitTask : public Task
filterAndMoveReadyRfs(waiting_rf_list, ready_rf_list);
if (waiting_rf_list.empty() || stopwatch.elapsed() >= max_wait_time_ns)
{
submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool, log);
submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool);
return ExecTaskStatus::FINISHED;
}
return ExecTaskStatus::WAITING;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Operators/UnorderedSourceOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ void UnorderedSourceOp::operatePrefixImpl()
std::call_once(task_pool->addToSchedulerFlag(), [&]() {
if (waiting_rf_list.empty())
{
DM::SegmentReadTaskScheduler::instance().add(task_pool, log);
DM::SegmentReadTaskScheduler::instance().add(task_pool);
}
else
{
Expand All @@ -74,7 +74,7 @@ void UnorderedSourceOp::operatePrefixImpl()

if (max_wait_time_ms <= 0 || waiting_rf_list.empty())
{
RFWaitTask::submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool, log);
RFWaitTask::submitReadyRfsAndSegmentTaskPool(ready_rf_list, task_pool);
}
else
{
Expand Down
110 changes: 0 additions & 110 deletions dbms/src/Storages/DeltaMerge/ReadThread/CircularScanList.h

This file was deleted.

4 changes: 4 additions & 0 deletions dbms/src/Storages/DeltaMerge/ReadThread/MergedTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,11 @@ class MergedTask
}
}

#ifndef DBMS_PUBLIC_GTEST
private:
#else
public:
#endif
void initOnce();
int readOneBlock();
void setUnitFinish(int i) { finished_count += units[i].setFinish(); }
Expand Down
Loading

0 comments on commit 64c1ae5

Please sign in to comment.