Skip to content

Commit

Permalink
i#6831 sched refactor, step 3: Split pick_next_input() (#7078)
Browse files Browse the repository at this point in the history
Splits the bulk of pick_next_input() into a new virtual method
pick_next_input_for_mode() which is implemented by new subclasses
scheduler_{dynamic,fixed,replay}.cpp.
Moves pick_next_input_as_previously() into the _replay version.

Issue: #6831
  • Loading branch information
derekbruening authored Nov 13, 2024
1 parent 52ba775 commit 6360656
Show file tree
Hide file tree
Showing 7 changed files with 672 additions and 441 deletions.
6 changes: 6 additions & 0 deletions clients/drcachesim/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ set(drcachesim_srcs
launcher.cpp
scheduler/scheduler.cpp
scheduler/scheduler_impl.cpp
scheduler/scheduler_dynamic.cpp
scheduler/scheduler_replay.cpp
scheduler/scheduler_fixed.cpp
scheduler/speculator.cpp
analyzer.cpp
analyzer_multi.cpp
Expand Down Expand Up @@ -325,6 +328,9 @@ add_exported_library(drmemtrace_analyzer STATIC
analyzer.cpp
scheduler/scheduler.cpp
scheduler/scheduler_impl.cpp
scheduler/scheduler_dynamic.cpp
scheduler/scheduler_replay.cpp
scheduler/scheduler_fixed.cpp
scheduler/speculator.cpp
common/trace_entry.cpp
reader/reader.cpp
Expand Down
22 changes: 17 additions & 5 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,23 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
std::vector<input_workload_t> &workload_inputs, int output_count,
scheduler_options_t options)
{
// TODO i#6831: Split up scheduler_impl_tmpl_t by mapping_t mode and create the
// mode-appropriate subclass here.
impl_ = std::unique_ptr<scheduler_impl_tmpl_t<RecordType, ReaderType>,
scheduler_impl_deleter_t>(
new scheduler_impl_tmpl_t<RecordType, ReaderType>);
if (options.mapping == sched_type_t::MAP_TO_ANY_OUTPUT) {
impl_ = std::unique_ptr<scheduler_impl_tmpl_t<RecordType, ReaderType>,
scheduler_impl_deleter_t>(
new scheduler_dynamic_tmpl_t<RecordType, ReaderType>);
} else if (options.mapping == sched_type_t::MAP_AS_PREVIOUSLY ||
(options.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT &&
options.replay_as_traced_istream != nullptr)) {
impl_ = std::unique_ptr<scheduler_impl_tmpl_t<RecordType, ReaderType>,
scheduler_impl_deleter_t>(
new scheduler_replay_tmpl_t<RecordType, ReaderType>);
} else {
// Non-dynamic and non-replay fixed modes such as analyzer serial and
// parallel modes.
impl_ = std::unique_ptr<scheduler_impl_tmpl_t<RecordType, ReaderType>,
scheduler_impl_deleter_t>(
new scheduler_fixed_tmpl_t<RecordType, ReaderType>);
}
return impl_->init(workload_inputs, output_count, std::move(options));
}

Expand Down
246 changes: 246 additions & 0 deletions clients/drcachesim/scheduler/scheduler_dynamic.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
/* **********************************************************
* Copyright (c) 2023-2024 Google, Inc. All rights reserved.
* **********************************************************/

/*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of Google, Inc. nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL VMWARE, INC. OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
* DAMAGE.
*/

/* Scheduler dynamic rescheduling-specific code. */

#include "scheduler.h"
#include "scheduler_impl.h"

#include <cinttypes>

namespace dynamorio {
namespace drmemtrace {

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index,
input_ordinal_t &index)
{
uint64_t cur_time = this->get_output_time(output);
uint64_t last_time = this->last_rebalance_time_.load(std::memory_order_acquire);
if (last_time == 0) {
// Initialize.
this->last_rebalance_time_.store(cur_time, std::memory_order_release);
} else {
// Guard against time going backward, which happens: i#6966.
if (cur_time > last_time &&
cur_time - last_time >=
static_cast<uint64_t>(this->options_.rebalance_period_us *
this->options_.time_units_per_us) &&
this->rebalancer_.load(std::memory_order_acquire) == std::thread::id()) {
VPRINT(this, 2,
"Output %d hit rebalance period @%" PRIu64 " (last rebalance @%" PRIu64
")\n",
output, cur_time, last_time);
stream_status_t status = this->rebalance_queues(output, {});
if (status != sched_type_t::STATUS_OK)
return status;
}
}
if (blocked_time > 0 && prev_index != sched_type_t::INVALID_INPUT_ORDINAL) {
std::lock_guard<mutex_dbg_owned> lock(*this->inputs_[prev_index].lock);
if (this->inputs_[prev_index].blocked_time == 0) {
VPRINT(this, 2, "next_record[%d]: blocked time %" PRIu64 "\n", output,
blocked_time);
this->inputs_[prev_index].blocked_time = blocked_time;
this->inputs_[prev_index].blocked_start_time = this->get_output_time(output);
}
}
if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL &&
this->inputs_[prev_index].switch_to_input !=
sched_type_t::INVALID_INPUT_ORDINAL) {
input_info_t *target = &this->inputs_[this->inputs_[prev_index].switch_to_input];
this->inputs_[prev_index].switch_to_input = sched_type_t::INVALID_INPUT_ORDINAL;
std::unique_lock<mutex_dbg_owned> target_input_lock(*target->lock);
// XXX i#5843: Add an invariant check that the next timestamp of the
// target is later than the pre-switch-syscall timestamp?
if (target->containing_output != sched_type_t::INVALID_OUTPUT_ORDINAL) {
output_ordinal_t target_output = target->containing_output;
output_info_t &out = this->outputs_[target->containing_output];
// We cannot hold an input lock when we acquire an output lock.
target_input_lock.unlock();
{
auto target_output_lock =
this->acquire_scoped_output_lock_if_necessary(target_output);
target_input_lock.lock();
if (out.ready_queue.queue.find(target)) {
VPRINT(this, 2,
"next_record[%d]: direct switch from input %d to "
"input %d "
"@%" PRIu64 "\n",
output, prev_index, target->index,
this->inputs_[prev_index].reader->get_last_timestamp());
out.ready_queue.queue.erase(target);
index = target->index;
// Erase any remaining wait time for the target.
if (target->blocked_time > 0) {
VPRINT(this, 3,
"next_record[%d]: direct switch erasing "
"blocked time "
"for input %d\n",
output, target->index);
--out.ready_queue.num_blocked;
target->blocked_time = 0;
target->unscheduled = false;
}
if (target->containing_output != output) {
++this->outputs_[output]
.stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS];
}
++this->outputs_[output]
.stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES];
} // Else, actively running.
target_input_lock.unlock();
}
target_input_lock.lock();
}
std::lock_guard<mutex_dbg_owned> unsched_lock(*this->unscheduled_priority_.lock);
if (index == sched_type_t::INVALID_INPUT_ORDINAL &&
this->unscheduled_priority_.queue.find(target)) {
target->unscheduled = false;
this->unscheduled_priority_.queue.erase(target);
index = target->index;
VPRINT(this, 2,
"next_record[%d]: direct switch from input %d to "
"was-unscheduled input %d "
"@%" PRIu64 "\n",
output, prev_index, target->index,
this->inputs_[prev_index].reader->get_last_timestamp());
if (target->prev_output != sched_type_t::INVALID_OUTPUT_ORDINAL &&
target->prev_output != output) {
++this->outputs_[output].stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS];
}
++this->outputs_[output]
.stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES];
}
if (index == sched_type_t::INVALID_INPUT_ORDINAL) {
// We assume that inter-input dependencies are captured in
// the _DIRECT_THREAD_SWITCH, _UNSCHEDULE, and _SCHEDULE markers
// and that if a switch request targets a thread running elsewhere
// that means there isn't a dependence and this is really a
// dynamic switch to whoever happens to be available (and
// different timing between tracing and analysis has caused this
// miss).
VPRINT(this, 2,
"Direct switch (from %d) target input #%d is running "
"elsewhere; picking a different target @%" PRIu64 "\n",
prev_index, target->index,
this->inputs_[prev_index].reader->get_last_timestamp());
// We do ensure the missed target doesn't wait indefinitely.
// XXX i#6822: It's not clear this is always the right thing to
// do.
target->skip_next_unscheduled = true;
}
}
if (index != sched_type_t::INVALID_INPUT_ORDINAL) {
// We found a direct switch target above.
}
// XXX: We're grabbing the output ready_queue lock 3x here:
// ready_queue_empty(), set_cur_input()'s add_to_ready_queue(),
// and pop_from_ready_queue(). We could call versions of those
// that let the caller hold the lock: but holding it across other
// calls in between here adds complexity.
else if (this->ready_queue_empty(output) && blocked_time == 0) {
// There's nothing else to run so either stick with the
// current input or if it's invalid go idle/eof.
if (prev_index == sched_type_t::INVALID_INPUT_ORDINAL) {
stream_status_t status = this->eof_or_idle(output, prev_index);
if (status != sched_type_t::STATUS_STOLE)
return status;
// eof_or_idle stole an input for us, now in .cur_input.
index = this->outputs_[output].cur_input;
return sched_type_t::STATUS_OK;
} else {
auto lock =
std::unique_lock<mutex_dbg_owned>(*this->inputs_[prev_index].lock);
// If we can't go back to the current input because it's EOF
// or unscheduled indefinitely (we already checked blocked_time
// above: it's 0 here), this output is either idle or EOF.
if (this->inputs_[prev_index].at_eof ||
this->inputs_[prev_index].unscheduled) {
lock.unlock();
stream_status_t status = this->eof_or_idle(output, prev_index);
if (status != sched_type_t::STATUS_STOLE)
return status;
index = this->outputs_[output].cur_input;
return sched_type_t::STATUS_OK;
} else
index = prev_index; // Go back to prior.
}
} else {
// There's something else to run, or we'll soon be in the queue
// even if it's empty now.
// Give up the input before we go to the queue so we can add
// ourselves to the queue. If we're the highest priority we
// shouldn't switch. The queue preserves FIFO for same-priority
// cases so we will switch if someone of equal priority is
// waiting.
this->set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL);
input_info_t *queue_next = nullptr;
stream_status_t status = this->pop_from_ready_queue(output, output, queue_next);
if (status != sched_type_t::STATUS_OK) {
if (status == sched_type_t::STATUS_IDLE) {
this->outputs_[output].waiting = true;
if (this->options_.schedule_record_ostream != nullptr) {
stream_status_t record_status = this->record_schedule_segment(
output, schedule_record_t::IDLE_BY_COUNT, 0,
// Start prior to this idle.
this->outputs_[output].idle_count - 1, 0);
if (record_status != sched_type_t::STATUS_OK)
return record_status;
}
if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL) {
++this->outputs_[output]
.stats[memtrace_stream_t::SCHED_STAT_SWITCH_INPUT_TO_IDLE];
}
}
return status;
}
if (queue_next == nullptr) {
status = this->eof_or_idle(output, prev_index);
if (status != sched_type_t::STATUS_STOLE)
return status;
index = this->outputs_[output].cur_input;
return sched_type_t::STATUS_OK;
} else
index = queue_next->index;
}
return sched_type_t::STATUS_OK;
}

template class scheduler_dynamic_tmpl_t<memref_t, reader_t>;
template class scheduler_dynamic_tmpl_t<trace_entry_t,
dynamorio::drmemtrace::record_reader_t>;

} // namespace drmemtrace
} // namespace dynamorio
95 changes: 95 additions & 0 deletions clients/drcachesim/scheduler/scheduler_fixed.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
/* **********************************************************
* Copyright (c) 2023-2024 Google, Inc. All rights reserved.
* **********************************************************/

/*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* * Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
*
* * Redistributions in binary form must reproduce the above copyright notice,
* this list of conditions and the following disclaimer in the documentation
* and/or other materials provided with the distribution.
*
* * Neither the name of Google, Inc. nor the names of its contributors may be
* used to endorse or promote products derived from this software without
* specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL VMWARE, INC. OR CONTRIBUTORS BE LIABLE
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
* DAMAGE.
*/

/* Scheduler fixed-schedule-specific code. */

#include "scheduler.h"
#include "scheduler_impl.h"

#include <cinttypes>

namespace dynamorio {
namespace drmemtrace {

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t
scheduler_fixed_tmpl_t<RecordType, ReaderType>::pick_next_input_for_mode(
output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index,
input_ordinal_t &index)
{
if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) {
uint64_t min_time = std::numeric_limits<uint64_t>::max();
for (size_t i = 0; i < this->inputs_.size(); ++i) {
std::lock_guard<mutex_dbg_owned> lock(*this->inputs_[i].lock);
if (!this->inputs_[i].at_eof && this->inputs_[i].next_timestamp > 0 &&
this->inputs_[i].next_timestamp < min_time) {
min_time = this->inputs_[i].next_timestamp;
index = static_cast<int>(i);
}
}
if (index < 0) {
stream_status_t status = this->eof_or_idle(output, prev_index);
if (status != sched_type_t::STATUS_STOLE)
return status;
index = this->outputs_[output].cur_input;
return sched_type_t::STATUS_OK;
}
VPRINT(this, 2,
"next_record[%d]: advancing to timestamp %" PRIu64 " == input #%d\n",
output, min_time, index);
} else if (this->options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) {
// We're done with the prior thread; take the next one that was
// pre-allocated to this output (pre-allocated to avoid locks). Invariant:
// the same output will not be accessed by two different threads
// simultaneously in this mode, allowing us to support a lock-free
// parallel-friendly increment here.
int indices_index = ++this->outputs_[output].input_indices_index;
if (indices_index >=
static_cast<int>(this->outputs_[output].input_indices.size())) {
VPRINT(this, 2, "next_record[%d]: all at eof\n", output);
return sched_type_t::STATUS_EOF;
}
index = this->outputs_[output].input_indices[indices_index];
VPRINT(this, 2, "next_record[%d]: advancing to local index %d == input #%d\n",
output, indices_index, index);
} else
return sched_type_t::STATUS_INVALID;

return sched_type_t::STATUS_OK;
}

template class scheduler_fixed_tmpl_t<memref_t, reader_t>;
template class scheduler_fixed_tmpl_t<trace_entry_t,
dynamorio::drmemtrace::record_reader_t>;

} // namespace drmemtrace
} // namespace dynamorio
Loading

0 comments on commit 6360656

Please sign in to comment.