Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

i#6831 sched refactor, step 6: Split set_initial_schedule() #7082

Merged
merged 4 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
scheduler_impl_deleter_t>(
new scheduler_replay_tmpl_t<RecordType, ReaderType>);
} else {
// Non-dynamic and non-replay fixed modes such as analyzer serial and
// parallel modes.
// Non-dynamic and non-replay fixed modes such as analyzer
// parallel mode with a static mapping of inputs to outputs and analyzer
// serial mode with a simple time interleaving of all inputs onto one output.
impl_ = std::unique_ptr<scheduler_impl_tmpl_t<RecordType, ReaderType>,
scheduler_impl_deleter_t>(
new scheduler_fixed_tmpl_t<RecordType, ReaderType>);
Expand Down
356 changes: 303 additions & 53 deletions clients/drcachesim/scheduler/scheduler_dynamic.cpp

Large diffs are not rendered by default.

72 changes: 57 additions & 15 deletions clients/drcachesim/scheduler/scheduler_fixed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,9 @@
#include "scheduler.h"
#include "scheduler_impl.h"

#include <atomic>
#include <cinttypes>
#include <cstdint>
#include <mutex>
#include <thread>

#include "memref.h"
#include "mutex_dbg_owned.h"
Expand All @@ -50,45 +48,89 @@
namespace dynamorio {
namespace drmemtrace {

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::set_initial_schedule(
std::unordered_map<int, std::vector<int>> &workload2inputs)
{
if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) {
// Assign the inputs up front to avoid locks once we're in parallel mode.
// We use a simple round-robin static assignment for now.
for (int i = 0; i < static_cast<input_ordinal_t>(inputs_.size()); ++i) {
size_t index = i % outputs_.size();
if (outputs_[index].input_indices.empty())
set_cur_input(static_cast<input_ordinal_t>(index), i);
outputs_[index].input_indices.push_back(i);
VPRINT(this, 2, "Assigning input #%d to output #%zd\n", i, index);
}
} else if (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT) {
if (options_.replay_as_traced_istream != nullptr) {
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
} else if (outputs_.size() > 1) {
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
} else if (inputs_.size() == 1) {
set_cur_input(0, 0);
} else {
// The old file_reader_t interleaving would output the top headers for every
// thread first and then pick the oldest timestamp once it reached a
// timestamp. We instead queue those headers so we can start directly with the
// oldest timestamp's thread.
uint64_t min_time = std::numeric_limits<uint64_t>::max();
input_ordinal_t min_input = -1;
for (int i = 0; i < static_cast<input_ordinal_t>(inputs_.size()); ++i) {
if (inputs_[i].next_timestamp < min_time) {
min_time = inputs_[i].next_timestamp;
min_input = i;
}
}
if (min_input < 0)
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
set_cur_input(0, static_cast<input_ordinal_t>(min_input));
}
} else {
return sched_type_t::STATUS_ERROR_INVALID_PARAMETER;
}
return sched_type_t::STATUS_SUCCESS;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index,
input_ordinal_t &index)
{
if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) {
if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) {
uint64_t min_time = std::numeric_limits<uint64_t>::max();
for (size_t i = 0; i < this->inputs_.size(); ++i) {
std::lock_guard<mutex_dbg_owned> lock(*this->inputs_[i].lock);
if (!this->inputs_[i].at_eof && this->inputs_[i].next_timestamp > 0 &&
this->inputs_[i].next_timestamp < min_time) {
min_time = this->inputs_[i].next_timestamp;
for (size_t i = 0; i < inputs_.size(); ++i) {
std::lock_guard<mutex_dbg_owned> lock(*inputs_[i].lock);
if (!inputs_[i].at_eof && inputs_[i].next_timestamp > 0 &&
inputs_[i].next_timestamp < min_time) {
min_time = inputs_[i].next_timestamp;
index = static_cast<int>(i);
}
}
if (index < 0) {
stream_status_t status = this->eof_or_idle(output, prev_index);
if (status != sched_type_t::STATUS_STOLE)
return status;
index = this->outputs_[output].cur_input;
index = outputs_[output].cur_input;
return sched_type_t::STATUS_OK;
}
VPRINT(this, 2,
"next_record[%d]: advancing to timestamp %" PRIu64 " == input #%d\n",
output, min_time, index);
} else if (this->options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) {
} else if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) {
// We're done with the prior thread; take the next one that was
// pre-allocated to this output (pre-allocated to avoid locks). Invariant:
// the same output will not be accessed by two different threads
// simultaneously in this mode, allowing us to support a lock-free
// parallel-friendly increment here.
int indices_index = ++this->outputs_[output].input_indices_index;
if (indices_index >=
static_cast<int>(this->outputs_[output].input_indices.size())) {
int indices_index = ++outputs_[output].input_indices_index;
if (indices_index >= static_cast<int>(outputs_[output].input_indices.size())) {
VPRINT(this, 2, "next_record[%d]: all at eof\n", output);
return sched_type_t::STATUS_EOF;
}
index = this->outputs_[output].input_indices[indices_index];
index = outputs_[output].input_indices[indices_index];
VPRINT(this, 2, "next_record[%d]: advancing to local index %d == input #%d\n",
output, indices_index, index);
} else
Expand All @@ -103,7 +145,7 @@ scheduler_fixed_tmpl_t<RecordType, ReaderType>::check_for_input_switch(
output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time,
bool &need_new_input, bool &preempt, uint64_t &blocked_time)
{
if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS &&
if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS &&
this->record_type_is_timestamp(record, input->next_timestamp))
need_new_input = true;
return sched_type_t::STATUS_OK;
Expand Down
Loading
Loading