-
Notifications
You must be signed in to change notification settings - Fork 562
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
i#6831 sched refactor, step 3: Split pick_next_input()
Splits the bulk of pick_next_input() into a new virtual method pick_next_input_try() which is implemented by new subclasses scheduler_{dynamic,fixed,replay}.cpp. Moves pick_next_input_as_previously() into the _replay version. Issue: #6831
- Loading branch information
1 parent
52ba775
commit 0b7a65b
Showing
7 changed files
with
664 additions
and
440 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,246 @@ | ||
/* ********************************************************** | ||
* Copyright (c) 2023-2024 Google, Inc. All rights reserved. | ||
* **********************************************************/ | ||
|
||
/* | ||
* Redistribution and use in source and binary forms, with or without | ||
* modification, are permitted provided that the following conditions are met: | ||
* | ||
* * Redistributions of source code must retain the above copyright notice, | ||
* this list of conditions and the following disclaimer. | ||
* | ||
* * Redistributions in binary form must reproduce the above copyright notice, | ||
* this list of conditions and the following disclaimer in the documentation | ||
* and/or other materials provided with the distribution. | ||
* | ||
* * Neither the name of Google, Inc. nor the names of its contributors may be | ||
* used to endorse or promote products derived from this software without | ||
* specific prior written permission. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | ||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | ||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | ||
* ARE DISCLAIMED. IN NO EVENT SHALL VMWARE, INC. OR CONTRIBUTORS BE LIABLE | ||
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | ||
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | ||
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | ||
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | ||
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY | ||
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH | ||
* DAMAGE. | ||
*/ | ||
|
||
/* Scheduler dynamic rescheduling-specific code. */ | ||
|
||
#include "scheduler.h" | ||
#include "scheduler_impl.h" | ||
|
||
#include <cinttypes> | ||
|
||
namespace dynamorio { | ||
namespace drmemtrace { | ||
|
||
template <typename RecordType, typename ReaderType> | ||
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t | ||
scheduler_dynamic_tmpl_t<RecordType, ReaderType>::pick_next_input_try( | ||
output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, | ||
input_ordinal_t &index) | ||
{ | ||
uint64_t cur_time = this->get_output_time(output); | ||
uint64_t last_time = this->last_rebalance_time_.load(std::memory_order_acquire); | ||
if (last_time == 0) { | ||
// Initialize. | ||
this->last_rebalance_time_.store(cur_time, std::memory_order_release); | ||
} else { | ||
// Guard against time going backward, which happens: i#6966. | ||
if (cur_time > last_time && | ||
cur_time - last_time >= | ||
static_cast<uint64_t>(this->options_.rebalance_period_us * | ||
this->options_.time_units_per_us) && | ||
this->rebalancer_.load(std::memory_order_acquire) == std::thread::id()) { | ||
VPRINT(this, 2, | ||
"Output %d hit rebalance period @%" PRIu64 " (last rebalance @%" PRIu64 | ||
")\n", | ||
output, cur_time, last_time); | ||
stream_status_t status = this->rebalance_queues(output, {}); | ||
if (status != sched_type_t::STATUS_OK) | ||
return status; | ||
} | ||
} | ||
if (blocked_time > 0 && prev_index != sched_type_t::INVALID_INPUT_ORDINAL) { | ||
std::lock_guard<mutex_dbg_owned> lock(*this->inputs_[prev_index].lock); | ||
if (this->inputs_[prev_index].blocked_time == 0) { | ||
VPRINT(this, 2, "next_record[%d]: blocked time %" PRIu64 "\n", output, | ||
blocked_time); | ||
this->inputs_[prev_index].blocked_time = blocked_time; | ||
this->inputs_[prev_index].blocked_start_time = this->get_output_time(output); | ||
} | ||
} | ||
if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL && | ||
this->inputs_[prev_index].switch_to_input != | ||
sched_type_t::INVALID_INPUT_ORDINAL) { | ||
input_info_t *target = &this->inputs_[this->inputs_[prev_index].switch_to_input]; | ||
this->inputs_[prev_index].switch_to_input = sched_type_t::INVALID_INPUT_ORDINAL; | ||
std::unique_lock<mutex_dbg_owned> target_input_lock(*target->lock); | ||
// XXX i#5843: Add an invariant check that the next timestamp of the | ||
// target is later than the pre-switch-syscall timestamp? | ||
if (target->containing_output != sched_type_t::INVALID_OUTPUT_ORDINAL) { | ||
output_ordinal_t target_output = target->containing_output; | ||
output_info_t &out = this->outputs_[target->containing_output]; | ||
// We cannot hold an input lock when we acquire an output lock. | ||
target_input_lock.unlock(); | ||
{ | ||
auto target_output_lock = | ||
this->acquire_scoped_output_lock_if_necessary(target_output); | ||
target_input_lock.lock(); | ||
if (out.ready_queue.queue.find(target)) { | ||
VPRINT(this, 2, | ||
"next_record[%d]: direct switch from input %d to " | ||
"input %d " | ||
"@%" PRIu64 "\n", | ||
output, prev_index, target->index, | ||
this->inputs_[prev_index].reader->get_last_timestamp()); | ||
out.ready_queue.queue.erase(target); | ||
index = target->index; | ||
// Erase any remaining wait time for the target. | ||
if (target->blocked_time > 0) { | ||
VPRINT(this, 3, | ||
"next_record[%d]: direct switch erasing " | ||
"blocked time " | ||
"for input %d\n", | ||
output, target->index); | ||
--out.ready_queue.num_blocked; | ||
target->blocked_time = 0; | ||
target->unscheduled = false; | ||
} | ||
if (target->containing_output != output) { | ||
++this->outputs_[output] | ||
.stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; | ||
} | ||
++this->outputs_[output] | ||
.stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; | ||
} // Else, actively running. | ||
target_input_lock.unlock(); | ||
} | ||
target_input_lock.lock(); | ||
} | ||
std::lock_guard<mutex_dbg_owned> unsched_lock(*this->unscheduled_priority_.lock); | ||
if (index == sched_type_t::INVALID_INPUT_ORDINAL && | ||
this->unscheduled_priority_.queue.find(target)) { | ||
target->unscheduled = false; | ||
this->unscheduled_priority_.queue.erase(target); | ||
index = target->index; | ||
VPRINT(this, 2, | ||
"next_record[%d]: direct switch from input %d to " | ||
"was-unscheduled input %d " | ||
"@%" PRIu64 "\n", | ||
output, prev_index, target->index, | ||
this->inputs_[prev_index].reader->get_last_timestamp()); | ||
if (target->prev_output != sched_type_t::INVALID_OUTPUT_ORDINAL && | ||
target->prev_output != output) { | ||
++this->outputs_[output].stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; | ||
} | ||
++this->outputs_[output] | ||
.stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; | ||
} | ||
if (index == sched_type_t::INVALID_INPUT_ORDINAL) { | ||
// We assume that inter-input dependencies are captured in | ||
// the _DIRECT_THREAD_SWITCH, _UNSCHEDULE, and _SCHEDULE markers | ||
// and that if a switch request targets a thread running elsewhere | ||
// that means there isn't a dependence and this is really a | ||
// dynamic switch to whoever happens to be available (and | ||
// different timing between tracing and analysis has caused this | ||
// miss). | ||
VPRINT(this, 2, | ||
"Direct switch (from %d) target input #%d is running " | ||
"elsewhere; picking a different target @%" PRIu64 "\n", | ||
prev_index, target->index, | ||
this->inputs_[prev_index].reader->get_last_timestamp()); | ||
// We do ensure the missed target doesn't wait indefinitely. | ||
// XXX i#6822: It's not clear this is always the right thing to | ||
// do. | ||
target->skip_next_unscheduled = true; | ||
} | ||
} | ||
if (index != sched_type_t::INVALID_INPUT_ORDINAL) { | ||
// We found a direct switch target above. | ||
} | ||
// XXX: We're grabbing the output ready_queue lock 3x here: | ||
// ready_queue_empty(), set_cur_input()'s add_to_ready_queue(), | ||
// and pop_from_ready_queue(). We could call versions of those | ||
// that let the caller hold the lock: but holding it across other | ||
// calls in between here adds complexity. | ||
else if (this->ready_queue_empty(output) && blocked_time == 0) { | ||
// There's nothing else to run so either stick with the | ||
// current input or if it's invalid go idle/eof. | ||
if (prev_index == sched_type_t::INVALID_INPUT_ORDINAL) { | ||
stream_status_t status = this->eof_or_idle(output, prev_index); | ||
if (status != sched_type_t::STATUS_STOLE) | ||
return status; | ||
// eof_or_idle stole an input for us, now in .cur_input. | ||
index = this->outputs_[output].cur_input; | ||
return sched_type_t::STATUS_OK; | ||
} else { | ||
auto lock = | ||
std::unique_lock<mutex_dbg_owned>(*this->inputs_[prev_index].lock); | ||
// If we can't go back to the current input because it's EOF | ||
// or unscheduled indefinitely (we already checked blocked_time | ||
// above: it's 0 here), this output is either idle or EOF. | ||
if (this->inputs_[prev_index].at_eof || | ||
this->inputs_[prev_index].unscheduled) { | ||
lock.unlock(); | ||
stream_status_t status = this->eof_or_idle(output, prev_index); | ||
if (status != sched_type_t::STATUS_STOLE) | ||
return status; | ||
index = this->outputs_[output].cur_input; | ||
return sched_type_t::STATUS_OK; | ||
} else | ||
index = prev_index; // Go back to prior. | ||
} | ||
} else { | ||
// There's something else to run, or we'll soon be in the queue | ||
// even if it's empty now. | ||
// Give up the input before we go to the queue so we can add | ||
// ourselves to the queue. If we're the highest priority we | ||
// shouldn't switch. The queue preserves FIFO for same-priority | ||
// cases so we will switch if someone of equal priority is | ||
// waiting. | ||
this->set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); | ||
input_info_t *queue_next = nullptr; | ||
stream_status_t status = this->pop_from_ready_queue(output, output, queue_next); | ||
if (status != sched_type_t::STATUS_OK) { | ||
if (status == sched_type_t::STATUS_IDLE) { | ||
this->outputs_[output].waiting = true; | ||
if (this->options_.schedule_record_ostream != nullptr) { | ||
stream_status_t record_status = this->record_schedule_segment( | ||
output, schedule_record_t::IDLE_BY_COUNT, 0, | ||
// Start prior to this idle. | ||
this->outputs_[output].idle_count - 1, 0); | ||
if (record_status != sched_type_t::STATUS_OK) | ||
return record_status; | ||
} | ||
if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL) { | ||
++this->outputs_[output] | ||
.stats[memtrace_stream_t::SCHED_STAT_SWITCH_INPUT_TO_IDLE]; | ||
} | ||
} | ||
return status; | ||
} | ||
if (queue_next == nullptr) { | ||
status = this->eof_or_idle(output, prev_index); | ||
if (status != sched_type_t::STATUS_STOLE) | ||
return status; | ||
index = this->outputs_[output].cur_input; | ||
return sched_type_t::STATUS_OK; | ||
} else | ||
index = queue_next->index; | ||
} | ||
return sched_type_t::STATUS_OK; | ||
} | ||
|
||
template class scheduler_dynamic_tmpl_t<memref_t, reader_t>; | ||
template class scheduler_dynamic_tmpl_t<trace_entry_t, | ||
dynamorio::drmemtrace::record_reader_t>; | ||
|
||
} // namespace drmemtrace | ||
} // namespace dynamorio |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,95 @@ | ||
/* ********************************************************** | ||
* Copyright (c) 2023-2024 Google, Inc. All rights reserved. | ||
* **********************************************************/ | ||
|
||
/* | ||
* Redistribution and use in source and binary forms, with or without | ||
* modification, are permitted provided that the following conditions are met: | ||
* | ||
* * Redistributions of source code must retain the above copyright notice, | ||
* this list of conditions and the following disclaimer. | ||
* | ||
* * Redistributions in binary form must reproduce the above copyright notice, | ||
* this list of conditions and the following disclaimer in the documentation | ||
* and/or other materials provided with the distribution. | ||
* | ||
* * Neither the name of Google, Inc. nor the names of its contributors may be | ||
* used to endorse or promote products derived from this software without | ||
* specific prior written permission. | ||
* | ||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" | ||
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE | ||
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE | ||
* ARE DISCLAIMED. IN NO EVENT SHALL VMWARE, INC. OR CONTRIBUTORS BE LIABLE | ||
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL | ||
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR | ||
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER | ||
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT | ||
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY | ||
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH | ||
* DAMAGE. | ||
*/ | ||
|
||
/* Scheduler fixed-schedule-specific code. */ | ||
|
||
#include "scheduler.h" | ||
#include "scheduler_impl.h" | ||
|
||
#include <cinttypes> | ||
|
||
namespace dynamorio { | ||
namespace drmemtrace { | ||
|
||
template <typename RecordType, typename ReaderType> | ||
typename scheduler_tmpl_t<RecordType, ReaderType>::stream_status_t | ||
scheduler_fixed_tmpl_t<RecordType, ReaderType>::pick_next_input_try( | ||
output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, | ||
input_ordinal_t &index) | ||
{ | ||
if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { | ||
uint64_t min_time = std::numeric_limits<uint64_t>::max(); | ||
for (size_t i = 0; i < this->inputs_.size(); ++i) { | ||
std::lock_guard<mutex_dbg_owned> lock(*this->inputs_[i].lock); | ||
if (!this->inputs_[i].at_eof && this->inputs_[i].next_timestamp > 0 && | ||
this->inputs_[i].next_timestamp < min_time) { | ||
min_time = this->inputs_[i].next_timestamp; | ||
index = static_cast<int>(i); | ||
} | ||
} | ||
if (index < 0) { | ||
stream_status_t status = this->eof_or_idle(output, prev_index); | ||
if (status != sched_type_t::STATUS_STOLE) | ||
return status; | ||
index = this->outputs_[output].cur_input; | ||
return sched_type_t::STATUS_OK; | ||
} | ||
VPRINT(this, 2, | ||
"next_record[%d]: advancing to timestamp %" PRIu64 " == input #%d\n", | ||
output, min_time, index); | ||
} else if (this->options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) { | ||
// We're done with the prior thread; take the next one that was | ||
// pre-allocated to this output (pre-allocated to avoid locks). Invariant: | ||
// the same output will not be accessed by two different threads | ||
// simultaneously in this mode, allowing us to support a lock-free | ||
// parallel-friendly increment here. | ||
int indices_index = ++this->outputs_[output].input_indices_index; | ||
if (indices_index >= | ||
static_cast<int>(this->outputs_[output].input_indices.size())) { | ||
VPRINT(this, 2, "next_record[%d]: all at eof\n", output); | ||
return sched_type_t::STATUS_EOF; | ||
} | ||
index = this->outputs_[output].input_indices[indices_index]; | ||
VPRINT(this, 2, "next_record[%d]: advancing to local index %d == input #%d\n", | ||
output, indices_index, index); | ||
} else | ||
return sched_type_t::STATUS_INVALID; | ||
|
||
return sched_type_t::STATUS_OK; | ||
} | ||
|
||
template class scheduler_fixed_tmpl_t<memref_t, reader_t>; | ||
template class scheduler_fixed_tmpl_t<trace_entry_t, | ||
dynamorio::drmemtrace::record_reader_t>; | ||
|
||
} // namespace drmemtrace | ||
} // namespace dynamorio |
Oops, something went wrong.