Skip to content

Commit

Permalink
Refine SegmentReadTasks.
Browse files Browse the repository at this point in the history
  • Loading branch information
JinheLin committed Oct 11, 2022
1 parent 187a46c commit 573f7bb
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 40 deletions.
6 changes: 4 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -912,7 +912,8 @@ BlockInputStreams DeltaMergeStore::readRaw(const Context & db_context,
/* do_delete_mark_filter_for_raw = */ false,
std::move(tasks),
after_segment_read,
req_info);
req_info,
enable_read_thread);

BlockInputStreams res;
for (size_t i = 0; i < final_num_stream; ++i)
Expand Down Expand Up @@ -996,7 +997,8 @@ BlockInputStreams DeltaMergeStore::read(const Context & db_context,
/* do_delete_mark_filter_for_raw = */ is_fast_scan,
std::move(tasks),
after_segment_read,
log_tracing_id);
log_tracing_id,
enable_read_thread);

BlockInputStreams res;
for (size_t i = 0; i < final_num_stream; ++i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,23 +37,18 @@ void SegmentReadTaskScheduler::add(const SegmentReadTaskPoolPtr & pool)
Stopwatch sw_do_add;
read_pools.add(pool);

std::unordered_set<uint64_t> seg_ids;
for (const auto & task : pool->getTasks())
const auto & tasks = pool->getTasks();
for (const auto & pa : tasks)
{
auto seg_id = task->segment->segmentId();
auto seg_id = pa.first;
merging_segments[pool->tableId()][seg_id].push_back(pool->poolId());
if (!seg_ids.insert(seg_id).second)
{
throw DB::Exception(fmt::format("Not support split segment task. segment_ids={} => segment_id={} already exist.", seg_ids, seg_id));
}
}
auto block_slots = pool->getFreeBlockSlots();
LOG_DEBUG(log, "Added, pool_id={} table_id={} block_slots={} segment_count={} segments={} pool_count={} cost={}ns do_add_cost={}ns", //
LOG_DEBUG(log, "Added, pool_id={} table_id={} block_slots={} segment_count={} pool_count={} cost={}ns do_add_cost={}ns", //
pool->poolId(),
pool->tableId(),
block_slots,
seg_ids.size(),
seg_ids,
tasks.size(),
read_pools.size(),
sw_add.elapsed(),
sw_do_add.elapsed());
Expand Down
88 changes: 76 additions & 12 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,63 @@ SegmentReadTasks SegmentReadTask::trySplitReadTasks(const SegmentReadTasks & tas
return result_tasks;
}


SegmentReadTasksWrapper::SegmentReadTasksWrapper(bool enable_read_thread_, SegmentReadTasks && ordered_tasks_)
: enable_read_thread(enable_read_thread_)
{
if (enable_read_thread)
{
for (const auto & t : ordered_tasks_)
{
auto [itr, inserted] = unordered_tasks.emplace(t->segment->segmentId(), t);
if (!inserted)
{
throw DB::Exception(fmt::format("segment_id={} already exist.", t->segment->segmentId()));
}
}
}
else
{
ordered_tasks = std::move(ordered_tasks_);
}
}

SegmentReadTaskPtr SegmentReadTasksWrapper::nextTask()
{
RUNTIME_CHECK(!enable_read_thread);
if (ordered_tasks.empty())
{
return nullptr;
}
auto task = ordered_tasks.front();
ordered_tasks.pop_front();
return task;
}

SegmentReadTaskPtr SegmentReadTasksWrapper::getTask(UInt64 seg_id)
{
RUNTIME_CHECK(enable_read_thread);
auto itr = unordered_tasks.find(seg_id);
if (itr == unordered_tasks.end())
{
return nullptr;
}
auto t = itr->second;
unordered_tasks.erase(itr);
return t;
}

const std::unordered_map<UInt64, SegmentReadTaskPtr> & SegmentReadTasksWrapper::getTasks() const
{
RUNTIME_CHECK(enable_read_thread);
return unordered_tasks;
}

bool SegmentReadTasksWrapper::empty() const
{
return ordered_tasks.empty() && unordered_tasks.empty();
}

BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t)
{
MemoryTrackerSetter setter(true, mem_tracker.get());
Expand All @@ -120,7 +177,7 @@ void SegmentReadTaskPool::finishSegment(const SegmentPtr & seg)
{
std::lock_guard lock(mutex);
active_segment_ids.erase(seg->segmentId());
pool_finished = active_segment_ids.empty() && tasks.empty();
pool_finished = active_segment_ids.empty() && tasks_wrapper.empty();
}
LOG_DEBUG(log, "finishSegment pool_id={} segment_id={} pool_finished={}", pool_id, seg->segmentId(), pool_finished);
if (pool_finished)
Expand All @@ -129,21 +186,27 @@ void SegmentReadTaskPool::finishSegment(const SegmentPtr & seg)
}
}

SegmentReadTaskPtr SegmentReadTaskPool::getTask(uint64_t seg_id)
SegmentReadTaskPtr SegmentReadTaskPool::nextTask()
{
std::lock_guard lock(mutex);
// TODO(jinhelin): use unordered_map
auto itr = std::find_if(tasks.begin(), tasks.end(), [seg_id](const SegmentReadTaskPtr & task) { return task->segment->segmentId() == seg_id; });
if (itr == tasks.end())
{
throw Exception(fmt::format("{} pool_id={} segment_id={} not found", __PRETTY_FUNCTION__, pool_id, seg_id));
}
auto t = *(itr);
tasks.erase(itr);
return tasks_wrapper.nextTask();
}

SegmentReadTaskPtr SegmentReadTaskPool::getTask(UInt64 seg_id)
{
std::lock_guard lock(mutex);
auto t = tasks_wrapper.getTask(seg_id);
RUNTIME_CHECK(t != nullptr, pool_id, seg_id);
active_segment_ids.insert(seg_id);
return t;
}

const std::unordered_map<UInt64, SegmentReadTaskPtr> & SegmentReadTaskPool::getTasks()
{
std::lock_guard lock(mutex);
return tasks_wrapper.getTasks();
}

// Choose a segment to read.
// Returns <segment_id, pool_ids>.
std::unordered_map<uint64_t, std::vector<uint64_t>>::const_iterator SegmentReadTaskPool::scheduleSegment(const std::unordered_map<uint64_t, std::vector<uint64_t>> & segments, uint64_t expected_merge_count)
Expand All @@ -156,12 +219,13 @@ std::unordered_map<uint64_t, std::vector<uint64_t>>::const_iterator SegmentReadT
}
static constexpr int max_iter_count = 32;
int iter_count = 0;
const auto & tasks = tasks_wrapper.getTasks();
for (const auto & task : tasks)
{
auto itr = segments.find(task->segment->segmentId());
auto itr = segments.find(task.first);
if (itr == segments.end())
{
throw DB::Exception(fmt::format("segment_id {} not found from merging segments", task->segment->segmentId()));
throw DB::Exception(fmt::format("segment_id {} not found from merging segments", task.first));
}
if (std::find(itr->second.begin(), itr->second.end(), poolId()) == itr->second.end())
{
Expand Down
46 changes: 30 additions & 16 deletions dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,32 @@ class BlockStat
std::atomic<int64_t> total_bytes;
};

// `SegmentReadTasksWrapper` likes `SegmentReadTasks` but supports fast access by segment id if `enable_read_thread` is true.
// `SegmeneReadTasksWrapper` is not thread-safe.
class SegmentReadTasksWrapper
{
public:
SegmentReadTasksWrapper(bool enable_read_thread_, SegmentReadTasks && ordered_tasks_);

// `nextTask` pops task sequentially. This function is used when `enable_read_thread` is false.
SegmentReadTaskPtr nextTask();

// `getTask` and `getTasks` are used when `enable_read_thread` is true.
SegmentReadTaskPtr getTask(UInt64 seg_id);
const std::unordered_map<UInt64, SegmentReadTaskPtr> & getTasks() const;

bool empty() const;

private:
bool enable_read_thread;
SegmentReadTasks ordered_tasks;
std::unordered_map<UInt64, SegmentReadTaskPtr> unordered_tasks;
};

class SegmentReadTaskPool : private boost::noncopyable
{
public:
explicit SegmentReadTaskPool(
SegmentReadTaskPool(
int64_t table_id_,
const DMContextPtr & dm_context_,
const ColumnDefines & columns_to_read_,
Expand All @@ -125,7 +147,8 @@ class SegmentReadTaskPool : private boost::noncopyable
bool do_range_filter_for_raw_,
SegmentReadTasks && tasks_,
AfterSegmentRead after_segment_read_,
const String & tracing_id)
const String & tracing_id,
bool enable_read_thread_)
: pool_id(nextPoolId())
, table_id(table_id_)
, dm_context(dm_context_)
Expand All @@ -135,7 +158,7 @@ class SegmentReadTaskPool : private boost::noncopyable
, expected_block_size(expected_block_size_)
, is_raw(is_raw_)
, do_range_filter_for_raw(do_range_filter_for_raw_)
, tasks(std::move(tasks_))
, tasks_wrapper(enable_read_thread_, std::move(tasks_))
, after_segment_read(after_segment_read_)
, log(Logger::get("SegmentReadTaskPool", tracing_id))
, unordered_input_stream_ref_count(0)
Expand Down Expand Up @@ -164,22 +187,14 @@ class SegmentReadTaskPool : private boost::noncopyable
total_bytes / 1024.0 / 1024.0);
}

SegmentReadTaskPtr nextTask()
{
std::lock_guard lock(mutex);
if (tasks.empty())
return {};
auto task = tasks.front();
tasks.pop_front();
return task;
}
SegmentReadTaskPtr nextTask();
const std::unordered_map<UInt64, SegmentReadTaskPtr> & getTasks();
SegmentReadTaskPtr getTask(UInt64 seg_id);

uint64_t poolId() const { return pool_id; }

int64_t tableId() const { return table_id; }

const SegmentReadTasks & getTasks() const { return tasks; }

BlockInputStreamPtr buildInputStream(SegmentReadTaskPtr & t);

bool readOneBlock(BlockInputStreamPtr & stream, const SegmentPtr & seg);
Expand All @@ -194,7 +209,6 @@ class SegmentReadTaskPool : private boost::noncopyable
int64_t getFreeBlockSlots() const;
bool valid() const;
void setException(const DB::Exception & e);
SegmentReadTaskPtr getTask(uint64_t seg_id);

std::once_flag & addToSchedulerFlag()
{
Expand All @@ -216,7 +230,7 @@ class SegmentReadTaskPool : private boost::noncopyable
const size_t expected_block_size;
const bool is_raw;
const bool do_range_filter_for_raw;
SegmentReadTasks tasks;
SegmentReadTasksWrapper tasks_wrapper;
AfterSegmentRead after_segment_read;
std::mutex mutex;
std::unordered_set<uint64_t> active_segment_ids;
Expand Down

0 comments on commit 573f7bb

Please sign in to comment.