From 6360656a68a27e4b53e82eb3f6c54dc17f02dfbb Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Tue, 12 Nov 2024 20:10:41 -0500 Subject: [PATCH] i#6831 sched refactor, step 3: Split pick_next_input() (#7078) Splits the bulk of pick_next_input() into a new virtual method pick_next_input_for_mode() which is implemented by new subclasses scheduler_{dynamic,fixed,replay}.cpp. Moves pick_next_input_as_previously() into the _replay version. Issue: #6831 --- clients/drcachesim/CMakeLists.txt | 6 + clients/drcachesim/scheduler/scheduler.cpp | 22 +- .../scheduler/scheduler_dynamic.cpp | 246 ++++++++++ .../drcachesim/scheduler/scheduler_fixed.cpp | 95 ++++ .../drcachesim/scheduler/scheduler_impl.cpp | 437 +----------------- clients/drcachesim/scheduler/scheduler_impl.h | 87 +++- .../drcachesim/scheduler/scheduler_replay.cpp | 220 +++++++++ 7 files changed, 672 insertions(+), 441 deletions(-) create mode 100644 clients/drcachesim/scheduler/scheduler_dynamic.cpp create mode 100644 clients/drcachesim/scheduler/scheduler_fixed.cpp create mode 100644 clients/drcachesim/scheduler/scheduler_replay.cpp diff --git a/clients/drcachesim/CMakeLists.txt b/clients/drcachesim/CMakeLists.txt index 436f51b06e1..a67e5b64504 100644 --- a/clients/drcachesim/CMakeLists.txt +++ b/clients/drcachesim/CMakeLists.txt @@ -262,6 +262,9 @@ set(drcachesim_srcs launcher.cpp scheduler/scheduler.cpp scheduler/scheduler_impl.cpp + scheduler/scheduler_dynamic.cpp + scheduler/scheduler_replay.cpp + scheduler/scheduler_fixed.cpp scheduler/speculator.cpp analyzer.cpp analyzer_multi.cpp @@ -325,6 +328,9 @@ add_exported_library(drmemtrace_analyzer STATIC analyzer.cpp scheduler/scheduler.cpp scheduler/scheduler_impl.cpp + scheduler/scheduler_dynamic.cpp + scheduler/scheduler_replay.cpp + scheduler/scheduler_fixed.cpp scheduler/speculator.cpp common/trace_entry.cpp reader/reader.cpp diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index 8e17655a1eb..cc1c6ddfd72 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -48,11 +48,23 @@ scheduler_tmpl_t::init( std::vector &workload_inputs, int output_count, scheduler_options_t options) { - // TODO i#6831: Split up scheduler_impl_tmpl_t by mapping_t mode and create the - // mode-appropriate subclass here. - impl_ = std::unique_ptr, - scheduler_impl_deleter_t>( - new scheduler_impl_tmpl_t); + if (options.mapping == sched_type_t::MAP_TO_ANY_OUTPUT) { + impl_ = std::unique_ptr, + scheduler_impl_deleter_t>( + new scheduler_dynamic_tmpl_t); + } else if (options.mapping == sched_type_t::MAP_AS_PREVIOUSLY || + (options.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT && + options.replay_as_traced_istream != nullptr)) { + impl_ = std::unique_ptr, + scheduler_impl_deleter_t>( + new scheduler_replay_tmpl_t); + } else { + // Non-dynamic and non-replay fixed modes such as analyzer serial and + // parallel modes. + impl_ = std::unique_ptr, + scheduler_impl_deleter_t>( + new scheduler_fixed_tmpl_t); + } return impl_->init(workload_inputs, output_count, std::move(options)); } diff --git a/clients/drcachesim/scheduler/scheduler_dynamic.cpp b/clients/drcachesim/scheduler/scheduler_dynamic.cpp new file mode 100644 index 00000000000..93d9f10ef83 --- /dev/null +++ b/clients/drcachesim/scheduler/scheduler_dynamic.cpp @@ -0,0 +1,246 @@ +/* ********************************************************** + * Copyright (c) 2023-2024 Google, Inc. All rights reserved. + * **********************************************************/ + +/* + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of Google, Inc. nor the names of its contributors may be + * used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL VMWARE, INC. OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH + * DAMAGE. + */ + +/* Scheduler dynamic rescheduling-specific code. */ + +#include "scheduler.h" +#include "scheduler_impl.h" + +#include + +namespace dynamorio { +namespace drmemtrace { + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::pick_next_input_for_mode( + output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, + input_ordinal_t &index) +{ + uint64_t cur_time = this->get_output_time(output); + uint64_t last_time = this->last_rebalance_time_.load(std::memory_order_acquire); + if (last_time == 0) { + // Initialize. + this->last_rebalance_time_.store(cur_time, std::memory_order_release); + } else { + // Guard against time going backward, which happens: i#6966. + if (cur_time > last_time && + cur_time - last_time >= + static_cast(this->options_.rebalance_period_us * + this->options_.time_units_per_us) && + this->rebalancer_.load(std::memory_order_acquire) == std::thread::id()) { + VPRINT(this, 2, + "Output %d hit rebalance period @%" PRIu64 " (last rebalance @%" PRIu64 + ")\n", + output, cur_time, last_time); + stream_status_t status = this->rebalance_queues(output, {}); + if (status != sched_type_t::STATUS_OK) + return status; + } + } + if (blocked_time > 0 && prev_index != sched_type_t::INVALID_INPUT_ORDINAL) { + std::lock_guard lock(*this->inputs_[prev_index].lock); + if (this->inputs_[prev_index].blocked_time == 0) { + VPRINT(this, 2, "next_record[%d]: blocked time %" PRIu64 "\n", output, + blocked_time); + this->inputs_[prev_index].blocked_time = blocked_time; + this->inputs_[prev_index].blocked_start_time = this->get_output_time(output); + } + } + if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL && + this->inputs_[prev_index].switch_to_input != + sched_type_t::INVALID_INPUT_ORDINAL) { + input_info_t *target = &this->inputs_[this->inputs_[prev_index].switch_to_input]; + this->inputs_[prev_index].switch_to_input = sched_type_t::INVALID_INPUT_ORDINAL; + std::unique_lock target_input_lock(*target->lock); + // XXX i#5843: Add an invariant check that the next timestamp of the + // target is later than the pre-switch-syscall timestamp? + if (target->containing_output != sched_type_t::INVALID_OUTPUT_ORDINAL) { + output_ordinal_t target_output = target->containing_output; + output_info_t &out = this->outputs_[target->containing_output]; + // We cannot hold an input lock when we acquire an output lock. + target_input_lock.unlock(); + { + auto target_output_lock = + this->acquire_scoped_output_lock_if_necessary(target_output); + target_input_lock.lock(); + if (out.ready_queue.queue.find(target)) { + VPRINT(this, 2, + "next_record[%d]: direct switch from input %d to " + "input %d " + "@%" PRIu64 "\n", + output, prev_index, target->index, + this->inputs_[prev_index].reader->get_last_timestamp()); + out.ready_queue.queue.erase(target); + index = target->index; + // Erase any remaining wait time for the target. + if (target->blocked_time > 0) { + VPRINT(this, 3, + "next_record[%d]: direct switch erasing " + "blocked time " + "for input %d\n", + output, target->index); + --out.ready_queue.num_blocked; + target->blocked_time = 0; + target->unscheduled = false; + } + if (target->containing_output != output) { + ++this->outputs_[output] + .stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; + } + ++this->outputs_[output] + .stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; + } // Else, actively running. + target_input_lock.unlock(); + } + target_input_lock.lock(); + } + std::lock_guard unsched_lock(*this->unscheduled_priority_.lock); + if (index == sched_type_t::INVALID_INPUT_ORDINAL && + this->unscheduled_priority_.queue.find(target)) { + target->unscheduled = false; + this->unscheduled_priority_.queue.erase(target); + index = target->index; + VPRINT(this, 2, + "next_record[%d]: direct switch from input %d to " + "was-unscheduled input %d " + "@%" PRIu64 "\n", + output, prev_index, target->index, + this->inputs_[prev_index].reader->get_last_timestamp()); + if (target->prev_output != sched_type_t::INVALID_OUTPUT_ORDINAL && + target->prev_output != output) { + ++this->outputs_[output].stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; + } + ++this->outputs_[output] + .stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; + } + if (index == sched_type_t::INVALID_INPUT_ORDINAL) { + // We assume that inter-input dependencies are captured in + // the _DIRECT_THREAD_SWITCH, _UNSCHEDULE, and _SCHEDULE markers + // and that if a switch request targets a thread running elsewhere + // that means there isn't a dependence and this is really a + // dynamic switch to whoever happens to be available (and + // different timing between tracing and analysis has caused this + // miss). + VPRINT(this, 2, + "Direct switch (from %d) target input #%d is running " + "elsewhere; picking a different target @%" PRIu64 "\n", + prev_index, target->index, + this->inputs_[prev_index].reader->get_last_timestamp()); + // We do ensure the missed target doesn't wait indefinitely. + // XXX i#6822: It's not clear this is always the right thing to + // do. + target->skip_next_unscheduled = true; + } + } + if (index != sched_type_t::INVALID_INPUT_ORDINAL) { + // We found a direct switch target above. + } + // XXX: We're grabbing the output ready_queue lock 3x here: + // ready_queue_empty(), set_cur_input()'s add_to_ready_queue(), + // and pop_from_ready_queue(). We could call versions of those + // that let the caller hold the lock: but holding it across other + // calls in between here adds complexity. + else if (this->ready_queue_empty(output) && blocked_time == 0) { + // There's nothing else to run so either stick with the + // current input or if it's invalid go idle/eof. + if (prev_index == sched_type_t::INVALID_INPUT_ORDINAL) { + stream_status_t status = this->eof_or_idle(output, prev_index); + if (status != sched_type_t::STATUS_STOLE) + return status; + // eof_or_idle stole an input for us, now in .cur_input. + index = this->outputs_[output].cur_input; + return sched_type_t::STATUS_OK; + } else { + auto lock = + std::unique_lock(*this->inputs_[prev_index].lock); + // If we can't go back to the current input because it's EOF + // or unscheduled indefinitely (we already checked blocked_time + // above: it's 0 here), this output is either idle or EOF. + if (this->inputs_[prev_index].at_eof || + this->inputs_[prev_index].unscheduled) { + lock.unlock(); + stream_status_t status = this->eof_or_idle(output, prev_index); + if (status != sched_type_t::STATUS_STOLE) + return status; + index = this->outputs_[output].cur_input; + return sched_type_t::STATUS_OK; + } else + index = prev_index; // Go back to prior. + } + } else { + // There's something else to run, or we'll soon be in the queue + // even if it's empty now. + // Give up the input before we go to the queue so we can add + // ourselves to the queue. If we're the highest priority we + // shouldn't switch. The queue preserves FIFO for same-priority + // cases so we will switch if someone of equal priority is + // waiting. + this->set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); + input_info_t *queue_next = nullptr; + stream_status_t status = this->pop_from_ready_queue(output, output, queue_next); + if (status != sched_type_t::STATUS_OK) { + if (status == sched_type_t::STATUS_IDLE) { + this->outputs_[output].waiting = true; + if (this->options_.schedule_record_ostream != nullptr) { + stream_status_t record_status = this->record_schedule_segment( + output, schedule_record_t::IDLE_BY_COUNT, 0, + // Start prior to this idle. + this->outputs_[output].idle_count - 1, 0); + if (record_status != sched_type_t::STATUS_OK) + return record_status; + } + if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL) { + ++this->outputs_[output] + .stats[memtrace_stream_t::SCHED_STAT_SWITCH_INPUT_TO_IDLE]; + } + } + return status; + } + if (queue_next == nullptr) { + status = this->eof_or_idle(output, prev_index); + if (status != sched_type_t::STATUS_STOLE) + return status; + index = this->outputs_[output].cur_input; + return sched_type_t::STATUS_OK; + } else + index = queue_next->index; + } + return sched_type_t::STATUS_OK; +} + +template class scheduler_dynamic_tmpl_t; +template class scheduler_dynamic_tmpl_t; + +} // namespace drmemtrace +} // namespace dynamorio diff --git a/clients/drcachesim/scheduler/scheduler_fixed.cpp b/clients/drcachesim/scheduler/scheduler_fixed.cpp new file mode 100644 index 00000000000..e465290d5bc --- /dev/null +++ b/clients/drcachesim/scheduler/scheduler_fixed.cpp @@ -0,0 +1,95 @@ +/* ********************************************************** + * Copyright (c) 2023-2024 Google, Inc. All rights reserved. + * **********************************************************/ + +/* + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of Google, Inc. nor the names of its contributors may be + * used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL VMWARE, INC. OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH + * DAMAGE. + */ + +/* Scheduler fixed-schedule-specific code. */ + +#include "scheduler.h" +#include "scheduler_impl.h" + +#include + +namespace dynamorio { +namespace drmemtrace { + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_fixed_tmpl_t::pick_next_input_for_mode( + output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, + input_ordinal_t &index) +{ + if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { + uint64_t min_time = std::numeric_limits::max(); + for (size_t i = 0; i < this->inputs_.size(); ++i) { + std::lock_guard lock(*this->inputs_[i].lock); + if (!this->inputs_[i].at_eof && this->inputs_[i].next_timestamp > 0 && + this->inputs_[i].next_timestamp < min_time) { + min_time = this->inputs_[i].next_timestamp; + index = static_cast(i); + } + } + if (index < 0) { + stream_status_t status = this->eof_or_idle(output, prev_index); + if (status != sched_type_t::STATUS_STOLE) + return status; + index = this->outputs_[output].cur_input; + return sched_type_t::STATUS_OK; + } + VPRINT(this, 2, + "next_record[%d]: advancing to timestamp %" PRIu64 " == input #%d\n", + output, min_time, index); + } else if (this->options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) { + // We're done with the prior thread; take the next one that was + // pre-allocated to this output (pre-allocated to avoid locks). Invariant: + // the same output will not be accessed by two different threads + // simultaneously in this mode, allowing us to support a lock-free + // parallel-friendly increment here. + int indices_index = ++this->outputs_[output].input_indices_index; + if (indices_index >= + static_cast(this->outputs_[output].input_indices.size())) { + VPRINT(this, 2, "next_record[%d]: all at eof\n", output); + return sched_type_t::STATUS_EOF; + } + index = this->outputs_[output].input_indices[indices_index]; + VPRINT(this, 2, "next_record[%d]: advancing to local index %d == input #%d\n", + output, indices_index, index); + } else + return sched_type_t::STATUS_INVALID; + + return sched_type_t::STATUS_OK; +} + +template class scheduler_fixed_tmpl_t; +template class scheduler_fixed_tmpl_t; + +} // namespace drmemtrace +} // namespace dynamorio diff --git a/clients/drcachesim/scheduler/scheduler_impl.cpp b/clients/drcachesim/scheduler/scheduler_impl.cpp index 6561f9bd640..d483a1ff550 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.cpp +++ b/clients/drcachesim/scheduler/scheduler_impl.cpp @@ -80,23 +80,6 @@ #include "directory_iterator.h" #include "utils.h" -#undef VPRINT -// We make logging available in release build to help in diagnosing issues -// and understanding scheduler behavior. -// We assume the extra branches do not add undue overhead. -#define VPRINT(obj, level, ...) \ - do { \ - if ((obj)->verbosity_ >= (level)) { \ - fprintf(stderr, "%s ", (obj)->output_prefix_); \ - fprintf(stderr, __VA_ARGS__); \ - } \ - } while (0) -#define VDO(obj, level, statement) \ - do { \ - if ((obj)->verbosity_ >= (level)) \ - statement \ - } while (0) - namespace dynamorio { namespace drmemtrace { @@ -2982,153 +2965,6 @@ scheduler_impl_tmpl_t::set_cur_input( return sched_type_t::STATUS_OK; } -template -typename scheduler_tmpl_t::stream_status_t -scheduler_impl_tmpl_t::pick_next_input_as_previously( - output_ordinal_t output, input_ordinal_t &index) -{ - // Our own index is only modified by us so we can cache it here. - int record_index = outputs_[output].record_index->load(std::memory_order_acquire); - if (record_index + 1 >= static_cast(outputs_[output].record.size())) { - if (!outputs_[output].at_eof) { - outputs_[output].at_eof = true; - live_replay_output_count_.fetch_add(-1, std::memory_order_release); - } - return eof_or_idle(output, outputs_[output].cur_input); - } - schedule_record_t &segment = outputs_[output].record[record_index + 1]; - if (segment.type == schedule_record_t::IDLE || - segment.type == schedule_record_t::IDLE_BY_COUNT) { - outputs_[output].waiting = true; - if (segment.type == schedule_record_t::IDLE) { - // Convert a legacy idle duration from microseconds to record counts. - segment.value.idle_duration = static_cast( - options_.time_units_per_us * segment.value.idle_duration); - } - outputs_[output].idle_start_count = outputs_[output].idle_count; - outputs_[output].record_index->fetch_add(1, std::memory_order_release); - ++outputs_[output].idle_count; - VPRINT(this, 5, "%s[%d]: next replay segment idle for %" PRIu64 "\n", - __FUNCTION__, output, segment.value.idle_duration); - return sched_type_t::STATUS_IDLE; - } - index = segment.key.input; - VPRINT(this, 5, - "%s[%d]: next replay segment in=%d (@%" PRId64 ") type=%d start=%" PRId64 - " end=%" PRId64 "\n", - __FUNCTION__, output, index, get_instr_ordinal(inputs_[index]), segment.type, - segment.value.start_instruction, segment.stop_instruction); - { - std::lock_guard lock(*inputs_[index].lock); - if (get_instr_ordinal(inputs_[index]) > segment.value.start_instruction) { - VPRINT(this, 1, - "WARNING: next_record[%d]: input %d wants instr #%" PRId64 - " but it is already at #%" PRId64 "\n", - output, index, segment.value.start_instruction, - get_instr_ordinal(inputs_[index])); - } - if (get_instr_ordinal(inputs_[index]) < segment.value.start_instruction && - // Don't wait for an ROI that starts at the beginning. - segment.value.start_instruction > 1 && - // The output may have begun in the wait state. - (record_index == -1 || - // When we skip our separator+timestamp markers are at the - // prior instr ord so do not wait for that. - (outputs_[output].record[record_index].type != schedule_record_t::SKIP && - // Don't wait if we're at the end and just need the end record. - segment.type != schedule_record_t::SYNTHETIC_END))) { - // Some other output stream has not advanced far enough, and we do - // not support multiple positions in one input stream: we wait. - // XXX i#5843: We may want to provide a kernel-mediated wait - // feature so a multi-threaded simulator doesn't have to do a - // spinning poll loop. - // XXX i#5843: For replaying a schedule as it was traced with - // sched_type_t::MAP_TO_RECORDED_OUTPUT there may have been true idle periods - // during tracing where some other process than the traced workload was - // scheduled on a core. If we could identify those, we should return - // sched_type_t::STATUS_IDLE rather than sched_type_t::STATUS_WAIT. - VPRINT(this, 3, "next_record[%d]: waiting for input %d instr #%" PRId64 "\n", - output, index, segment.value.start_instruction); - // Give up this input and go into a wait state. - // We'll come back here on the next next_record() call. - set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL, - // Avoid livelock if prev input == cur input which happens - // with back-to-back segments with the same input. - index == outputs_[output].cur_input); - outputs_[output].waiting = true; - return sched_type_t::STATUS_WAIT; - } - } - // Also wait if this segment is ahead of the next-up segment on another - // output. We only have a timestamp per context switch so we can't - // enforce finer-grained timing replay. - if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { - for (int i = 0; i < static_cast(outputs_.size()); ++i) { - if (i == output) - continue; - // Do an atomic load once and use it to de-reference if it's not at the end. - // This is safe because if the target advances to the end concurrently it - // will only cause an extra wait that will just come back here and then - // continue. - int other_index = outputs_[i].record_index->load(std::memory_order_acquire); - if (other_index + 1 < static_cast(outputs_[i].record.size()) && - segment.timestamp > outputs_[i].record[other_index + 1].timestamp) { - VPRINT(this, 3, - "next_record[%d]: waiting because timestamp %" PRIu64 - " is ahead of output %d\n", - output, segment.timestamp, i); - // Give up this input and go into a wait state. - // We'll come back here on the next next_record() call. - // XXX: We should add a timeout just in case some timestamps are out of - // order due to using prior values, to avoid hanging. We try to avoid - // this by using wall-clock time in record_schedule_segment() rather than - // the stored output time. - set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); - outputs_[output].waiting = true; - return sched_type_t::STATUS_WAIT; - } - } - } - if (segment.type == schedule_record_t::SYNTHETIC_END) { - std::lock_guard lock(*inputs_[index].lock); - // We're past the final region of interest and we need to insert - // a synthetic thread exit record. We need to first throw out the - // queued candidate record, if any. - clear_input_queue(inputs_[index]); - inputs_[index].queue.push_back(create_thread_exit(inputs_[index].tid)); - VPRINT(this, 2, "early end for input %d\n", index); - // We're done with this entry but we need the queued record to be read, - // so we do not move past the entry. - outputs_[output].record_index->fetch_add(1, std::memory_order_release); - stream_status_t status = mark_input_eof(inputs_[index]); - if (status != sched_type_t::STATUS_OK) - return status; - return sched_type_t::STATUS_SKIPPED; - } else if (segment.type == schedule_record_t::SKIP) { - std::lock_guard lock(*inputs_[index].lock); - uint64_t cur_reader_instr = inputs_[index].reader->get_instruction_ordinal(); - VPRINT(this, 2, - "next_record[%d]: skipping from %" PRId64 " to %" PRId64 - " in %d for schedule\n", - output, cur_reader_instr, segment.stop_instruction, index); - auto status = skip_instructions(inputs_[index], - segment.stop_instruction - cur_reader_instr - - 1 /*exclusive*/); - // Increment the region to get window id markers with ordinals. - inputs_[index].cur_region++; - if (status != sched_type_t::STATUS_SKIPPED) - return sched_type_t::STATUS_INVALID; - // We're done with the skip so move to and past it. - outputs_[output].record_index->fetch_add(2, std::memory_order_release); - return sched_type_t::STATUS_SKIPPED; - } else { - VPRINT(this, 2, "next_record[%d]: advancing to input %d instr #%" PRId64 "\n", - output, index, segment.value.start_instruction); - } - outputs_[output].record_index->fetch_add(1, std::memory_order_release); - return sched_type_t::STATUS_OK; -} - template bool scheduler_impl_tmpl_t::need_output_lock() @@ -3173,274 +3009,11 @@ scheduler_impl_tmpl_t::pick_next_input(output_ordinal_t while (true) { ++iters; if (index < 0) { - // XXX i#6831: Refactor to use subclasses or templates to specialize - // scheduler code based on mapping options, to avoid these top-level - // conditionals in many functions? - if (options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY) { - res = pick_next_input_as_previously(output, index); - VDO(this, 2, { - // Our own index is only modified by us so we can cache it here. - int record_index = - outputs_[output].record_index->load(std::memory_order_acquire); - if (record_index >= 0 && - record_index < static_cast(outputs_[output].record.size())) { - const schedule_record_t &segment = - outputs_[output].record[record_index]; - int input = segment.key.input; - VPRINT(this, - (res == sched_type_t::STATUS_IDLE || - res == sched_type_t::STATUS_WAIT) - ? 3 - : 2, - "next_record[%d]: replay segment in=%d (@%" PRId64 - ") type=%d start=%" PRId64 " end=%" PRId64 "\n", - output, input, get_instr_ordinal(inputs_[input]), - segment.type, segment.value.start_instruction, - segment.stop_instruction); - } - }); - if (res == sched_type_t::STATUS_SKIPPED) - break; - if (res != sched_type_t::STATUS_OK) - return res; - } else if (options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT) { - uint64_t cur_time = get_output_time(output); - uint64_t last_time = last_rebalance_time_.load(std::memory_order_acquire); - if (last_time == 0) { - // Initialize. - last_rebalance_time_.store(cur_time, std::memory_order_release); - } else { - // Guard against time going backward, which happens: i#6966. - if (cur_time > last_time && - cur_time - last_time >= - static_cast(options_.rebalance_period_us * - options_.time_units_per_us) && - rebalancer_.load(std::memory_order_acquire) == - std::thread::id()) { - VPRINT(this, 2, - "Output %d hit rebalance period @%" PRIu64 - " (last rebalance @%" PRIu64 ")\n", - output, cur_time, last_time); - stream_status_t status = rebalance_queues(output, {}); - if (status != sched_type_t::STATUS_OK) - return status; - } - } - if (blocked_time > 0 && - prev_index != sched_type_t::INVALID_INPUT_ORDINAL) { - std::lock_guard lock(*inputs_[prev_index].lock); - if (inputs_[prev_index].blocked_time == 0) { - VPRINT(this, 2, "next_record[%d]: blocked time %" PRIu64 "\n", - output, blocked_time); - inputs_[prev_index].blocked_time = blocked_time; - inputs_[prev_index].blocked_start_time = get_output_time(output); - } - } - if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL && - inputs_[prev_index].switch_to_input != - sched_type_t::INVALID_INPUT_ORDINAL) { - input_info_t *target = &inputs_[inputs_[prev_index].switch_to_input]; - inputs_[prev_index].switch_to_input = - sched_type_t::INVALID_INPUT_ORDINAL; - std::unique_lock target_input_lock(*target->lock); - // XXX i#5843: Add an invariant check that the next timestamp of the - // target is later than the pre-switch-syscall timestamp? - if (target->containing_output != - sched_type_t::INVALID_OUTPUT_ORDINAL) { - output_ordinal_t target_output = target->containing_output; - output_info_t &out = outputs_[target->containing_output]; - // We cannot hold an input lock when we acquire an output lock. - target_input_lock.unlock(); - { - auto target_output_lock = - acquire_scoped_output_lock_if_necessary(target_output); - target_input_lock.lock(); - if (out.ready_queue.queue.find(target)) { - VPRINT(this, 2, - "next_record[%d]: direct switch from input %d to " - "input %d " - "@%" PRIu64 "\n", - output, prev_index, target->index, - inputs_[prev_index].reader->get_last_timestamp()); - out.ready_queue.queue.erase(target); - index = target->index; - // Erase any remaining wait time for the target. - if (target->blocked_time > 0) { - VPRINT(this, 3, - "next_record[%d]: direct switch erasing " - "blocked time " - "for input %d\n", - output, target->index); - --out.ready_queue.num_blocked; - target->blocked_time = 0; - target->unscheduled = false; - } - if (target->containing_output != output) { - ++outputs_[output].stats - [memtrace_stream_t::SCHED_STAT_MIGRATIONS]; - } - ++outputs_[output] - .stats[memtrace_stream_t:: - SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; - } // Else, actively running. - target_input_lock.unlock(); - } - target_input_lock.lock(); - } - std::lock_guard unsched_lock( - *unscheduled_priority_.lock); - if (index == sched_type_t::INVALID_INPUT_ORDINAL && - unscheduled_priority_.queue.find(target)) { - target->unscheduled = false; - unscheduled_priority_.queue.erase(target); - index = target->index; - VPRINT(this, 2, - "next_record[%d]: direct switch from input %d to " - "was-unscheduled input %d " - "@%" PRIu64 "\n", - output, prev_index, target->index, - inputs_[prev_index].reader->get_last_timestamp()); - if (target->prev_output != sched_type_t::INVALID_OUTPUT_ORDINAL && - target->prev_output != output) { - ++outputs_[output] - .stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; - } - ++outputs_[output].stats - [memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; - } - if (index == sched_type_t::INVALID_INPUT_ORDINAL) { - // We assume that inter-input dependencies are captured in - // the _DIRECT_THREAD_SWITCH, _UNSCHEDULE, and _SCHEDULE markers - // and that if a switch request targets a thread running elsewhere - // that means there isn't a dependence and this is really a - // dynamic switch to whoever happens to be available (and - // different timing between tracing and analysis has caused this - // miss). - VPRINT(this, 2, - "Direct switch (from %d) target input #%d is running " - "elsewhere; picking a different target @%" PRIu64 "\n", - prev_index, target->index, - inputs_[prev_index].reader->get_last_timestamp()); - // We do ensure the missed target doesn't wait indefinitely. - // XXX i#6822: It's not clear this is always the right thing to - // do. - target->skip_next_unscheduled = true; - } - } - if (index != sched_type_t::INVALID_INPUT_ORDINAL) { - // We found a direct switch target above. - } - // XXX: We're grabbing the output ready_queue lock 3x here: - // ready_queue_empty(), set_cur_input()'s add_to_ready_queue(), - // and pop_from_ready_queue(). We could call versions of those - // that let the caller hold the lock: but holding it across other - // calls in between here adds complexity. - else if (ready_queue_empty(output) && blocked_time == 0) { - // There's nothing else to run so either stick with the - // current input or if it's invalid go idle/eof. - if (prev_index == sched_type_t::INVALID_INPUT_ORDINAL) { - stream_status_t status = eof_or_idle(output, prev_index); - if (status != sched_type_t::STATUS_STOLE) - return status; - // eof_or_idle stole an input for us, now in .cur_input. - index = outputs_[output].cur_input; - res = sched_type_t::STATUS_OK; - } else { - auto lock = - std::unique_lock(*inputs_[prev_index].lock); - // If we can't go back to the current input because it's EOF - // or unscheduled indefinitely (we already checked blocked_time - // above: it's 0 here), this output is either idle or EOF. - if (inputs_[prev_index].at_eof || - inputs_[prev_index].unscheduled) { - lock.unlock(); - stream_status_t status = eof_or_idle(output, prev_index); - if (status != sched_type_t::STATUS_STOLE) - return status; - index = outputs_[output].cur_input; - res = sched_type_t::STATUS_OK; - } else - index = prev_index; // Go back to prior. - } - } else { - // There's something else to run, or we'll soon be in the queue - // even if it's empty now. - // Give up the input before we go to the queue so we can add - // ourselves to the queue. If we're the highest priority we - // shouldn't switch. The queue preserves FIFO for same-priority - // cases so we will switch if someone of equal priority is - // waiting. - set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); - input_info_t *queue_next = nullptr; - stream_status_t status = - pop_from_ready_queue(output, output, queue_next); - if (status != sched_type_t::STATUS_OK) { - if (status == sched_type_t::STATUS_IDLE) { - outputs_[output].waiting = true; - if (options_.schedule_record_ostream != nullptr) { - stream_status_t record_status = record_schedule_segment( - output, schedule_record_t::IDLE_BY_COUNT, 0, - // Start prior to this idle. - outputs_[output].idle_count - 1, 0); - if (record_status != sched_type_t::STATUS_OK) - return record_status; - } - if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL) { - ++outputs_[output] - .stats[memtrace_stream_t:: - SCHED_STAT_SWITCH_INPUT_TO_IDLE]; - } - } - return status; - } - if (queue_next == nullptr) { - status = eof_or_idle(output, prev_index); - if (status != sched_type_t::STATUS_STOLE) - return status; - index = outputs_[output].cur_input; - res = sched_type_t::STATUS_OK; - } else - index = queue_next->index; - } - } else if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { - uint64_t min_time = std::numeric_limits::max(); - for (size_t i = 0; i < inputs_.size(); ++i) { - std::lock_guard lock(*inputs_[i].lock); - if (!inputs_[i].at_eof && inputs_[i].next_timestamp > 0 && - inputs_[i].next_timestamp < min_time) { - min_time = inputs_[i].next_timestamp; - index = static_cast(i); - } - } - if (index < 0) { - stream_status_t status = eof_or_idle(output, prev_index); - if (status != sched_type_t::STATUS_STOLE) - return status; - index = outputs_[output].cur_input; - res = sched_type_t::STATUS_OK; - } - VPRINT(this, 2, - "next_record[%d]: advancing to timestamp %" PRIu64 - " == input #%d\n", - output, min_time, index); - } else if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) { - // We're done with the prior thread; take the next one that was - // pre-allocated to this output (pre-allocated to avoid locks). Invariant: - // the same output will not be accessed by two different threads - // simultaneously in this mode, allowing us to support a lock-free - // parallel-friendly increment here. - int indices_index = ++outputs_[output].input_indices_index; - if (indices_index >= - static_cast(outputs_[output].input_indices.size())) { - VPRINT(this, 2, "next_record[%d]: all at eof\n", output); - return sched_type_t::STATUS_EOF; - } - index = outputs_[output].input_indices[indices_index]; - VPRINT(this, 2, - "next_record[%d]: advancing to local index %d == input #%d\n", - output, indices_index, index); - } else - return sched_type_t::STATUS_INVALID; + res = pick_next_input_for_mode(output, blocked_time, prev_index, index); + if (res == sched_type_t::STATUS_SKIPPED) + break; + if (res != sched_type_t::STATUS_OK) + return res; // reader_t::at_eof_ is true until init() is called. std::lock_guard lock(*inputs_[index].lock); if (inputs_[index].needs_init) { diff --git a/clients/drcachesim/scheduler/scheduler_impl.h b/clients/drcachesim/scheduler/scheduler_impl.h index 55eebde621a..64f5a106f39 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.h +++ b/clients/drcachesim/scheduler/scheduler_impl.h @@ -66,6 +66,24 @@ #include "trace_entry.h" #include "utils.h" +#undef VPRINT +// We make logging available in release build to help in diagnosing issues +// and understanding scheduler behavior. +// We assume the extra branches do not add undue overhead. +#define VPRINT(obj, level, ...) \ + do { \ + if ((obj)->verbosity_ >= (level)) { \ + fprintf(stderr, "%s ", (obj)->output_prefix_); \ + fprintf(stderr, __VA_ARGS__); \ + } \ + } while (0) +#define VDO(obj, level, statement) \ + do { \ + if ((obj)->verbosity_ >= (level)) { \ + statement \ + } \ + } while (0) + namespace dynamorio { namespace drmemtrace { @@ -74,7 +92,7 @@ namespace drmemtrace { // of scheduler_impl_tmpl_t inside the same unchanging-type outer class created by // the user. template class scheduler_impl_tmpl_t { -private: +protected: using sched_type_t = scheduler_tmpl_t; using input_ordinal_t = typename sched_type_t::input_ordinal_t; using output_ordinal_t = typename sched_type_t::output_ordinal_t; @@ -734,10 +752,16 @@ template class scheduler_impl_tmpl_t stream_status_t pick_next_input(output_ordinal_t output, uint64_t blocked_time); - // Helper for pick_next_input() for MAP_AS_PREVIOUSLY. + // Helper for pick_next_input() specialized by mapping_t mode. // No input_info_t lock can be held on entry. - stream_status_t - pick_next_input_as_previously(output_ordinal_t output, input_ordinal_t &index); + virtual stream_status_t + pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, + input_ordinal_t prev_index, input_ordinal_t &index) + { + // Return an error, rather than being pure virtual, to make subclassing + // in tests easier. + return sched_type_t::STATUS_NOT_IMPLEMENTED; + } // If the given record has a thread id field, returns true and the value. bool @@ -976,6 +1000,61 @@ typedef scheduler_impl_tmpl_t scheduler_impl_t; typedef scheduler_impl_tmpl_t record_scheduler_impl_t; +// Specialized code for dynamic schedules (MAP_TO_ANY_OUTPUT). +template +class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_t { +private: + using sched_type_t = scheduler_tmpl_t; + using input_ordinal_t = typename sched_type_t::input_ordinal_t; + using output_ordinal_t = typename sched_type_t::output_ordinal_t; + using stream_status_t = typename sched_type_t::stream_status_t; + using input_info_t = + typename scheduler_impl_tmpl_t::input_info_t; + using output_info_t = + typename scheduler_impl_tmpl_t::output_info_t; + using schedule_record_t = + typename scheduler_impl_tmpl_t::schedule_record_t; + +protected: + stream_status_t + pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, + input_ordinal_t prev_index, input_ordinal_t &index) override; +}; + +// Specialized code for replaying schedules: either a recorded dynamic schedule +// or an as-traced schedule. +template +class scheduler_replay_tmpl_t : public scheduler_impl_tmpl_t { +private: + using sched_type_t = scheduler_tmpl_t; + using input_ordinal_t = typename sched_type_t::input_ordinal_t; + using output_ordinal_t = typename sched_type_t::output_ordinal_t; + using stream_status_t = typename sched_type_t::stream_status_t; + using schedule_record_t = + typename scheduler_impl_tmpl_t::schedule_record_t; + +protected: + stream_status_t + pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, + input_ordinal_t prev_index, input_ordinal_t &index) override; +}; + +// Specialized code for fixed "schedules": typically serial or parallel analyzer +// modes. +template +class scheduler_fixed_tmpl_t : public scheduler_impl_tmpl_t { +private: + using sched_type_t = scheduler_tmpl_t; + using input_ordinal_t = typename sched_type_t::input_ordinal_t; + using output_ordinal_t = typename sched_type_t::output_ordinal_t; + using stream_status_t = typename sched_type_t::stream_status_t; + +protected: + stream_status_t + pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, + input_ordinal_t prev_index, input_ordinal_t &index) override; +}; + /* For testing, where schedule_record_t is not accessible. */ class replay_file_checker_t { public: diff --git a/clients/drcachesim/scheduler/scheduler_replay.cpp b/clients/drcachesim/scheduler/scheduler_replay.cpp new file mode 100644 index 00000000000..ffc1e13aceb --- /dev/null +++ b/clients/drcachesim/scheduler/scheduler_replay.cpp @@ -0,0 +1,220 @@ +/* ********************************************************** + * Copyright (c) 2023-2024 Google, Inc. All rights reserved. + * **********************************************************/ + +/* + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * * Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * + * * Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * + * * Neither the name of Google, Inc. nor the names of its contributors may be + * used to endorse or promote products derived from this software without + * specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL VMWARE, INC. OR CONTRIBUTORS BE LIABLE + * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR + * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER + * CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT + * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY + * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH + * DAMAGE. + */ + +/* Scheduler replay code. */ + +#include "scheduler.h" +#include "scheduler_impl.h" + +#include + +namespace dynamorio { +namespace drmemtrace { + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_replay_tmpl_t::pick_next_input_for_mode( + output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, + input_ordinal_t &index) +{ + // Our own index is only modified by us so we can cache it here. + int record_index = + this->outputs_[output].record_index->load(std::memory_order_acquire); + if (record_index + 1 >= static_cast(this->outputs_[output].record.size())) { + if (!this->outputs_[output].at_eof) { + this->outputs_[output].at_eof = true; + this->live_replay_output_count_.fetch_add(-1, std::memory_order_release); + } + return this->eof_or_idle(output, this->outputs_[output].cur_input); + } + schedule_record_t &segment = this->outputs_[output].record[record_index + 1]; + if (segment.type == schedule_record_t::IDLE || + segment.type == schedule_record_t::IDLE_BY_COUNT) { + this->outputs_[output].waiting = true; + if (segment.type == schedule_record_t::IDLE) { + // Convert a legacy idle duration from microseconds to record counts. + segment.value.idle_duration = static_cast( + this->options_.time_units_per_us * segment.value.idle_duration); + } + this->outputs_[output].idle_start_count = this->outputs_[output].idle_count; + this->outputs_[output].record_index->fetch_add(1, std::memory_order_release); + ++this->outputs_[output].idle_count; + VPRINT(this, 5, "%s[%d]: next replay segment idle for %" PRIu64 "\n", + __FUNCTION__, output, segment.value.idle_duration); + return sched_type_t::STATUS_IDLE; + } + index = segment.key.input; + VPRINT(this, 5, + "%s[%d]: next replay segment in=%d (@%" PRId64 ") type=%d start=%" PRId64 + " end=%" PRId64 "\n", + __FUNCTION__, output, index, this->get_instr_ordinal(this->inputs_[index]), + segment.type, segment.value.start_instruction, segment.stop_instruction); + { + std::lock_guard lock(*this->inputs_[index].lock); + if (this->get_instr_ordinal(this->inputs_[index]) > + segment.value.start_instruction) { + VPRINT(this, 1, + "WARNING: next_record[%d]: input %d wants instr #%" PRId64 + " but it is already at #%" PRId64 "\n", + output, index, segment.value.start_instruction, + this->get_instr_ordinal(this->inputs_[index])); + } + if (this->get_instr_ordinal(this->inputs_[index]) < + segment.value.start_instruction && + // Don't wait for an ROI that starts at the beginning. + segment.value.start_instruction > 1 && + // The output may have begun in the wait state. + (record_index == -1 || + // When we skip our separator+timestamp markers are at the + // prior instr ord so do not wait for that. + (this->outputs_[output].record[record_index].type != + schedule_record_t::SKIP && + // Don't wait if we're at the end and just need the end record. + segment.type != schedule_record_t::SYNTHETIC_END))) { + // Some other output stream has not advanced far enough, and we do + // not support multiple positions in one input stream: we wait. + // XXX i#5843: We may want to provide a kernel-mediated wait + // feature so a multi-threaded simulator doesn't have to do a + // spinning poll loop. + // XXX i#5843: For replaying a schedule as it was traced with + // sched_type_t::MAP_TO_RECORDED_OUTPUT there may have been true idle periods + // during tracing where some other process than the traced workload was + // scheduled on a core. If we could identify those, we should return + // sched_type_t::STATUS_IDLE rather than sched_type_t::STATUS_WAIT. + VPRINT(this, 3, "next_record[%d]: waiting for input %d instr #%" PRId64 "\n", + output, index, segment.value.start_instruction); + // Give up this input and go into a wait state. + // We'll come back here on the next next_record() call. + this->set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL, + // Avoid livelock if prev input == cur input which happens + // with back-to-back segments with the same input. + index == this->outputs_[output].cur_input); + this->outputs_[output].waiting = true; + return sched_type_t::STATUS_WAIT; + } + } + // Also wait if this segment is ahead of the next-up segment on another + // output. We only have a timestamp per context switch so we can't + // enforce finer-grained timing replay. + if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { + for (int i = 0; i < static_cast(this->outputs_.size()); ++i) { + if (i == output) + continue; + // Do an atomic load once and use it to de-reference if it's not at the end. + // This is safe because if the target advances to the end concurrently it + // will only cause an extra wait that will just come back here and then + // continue. + int other_index = + this->outputs_[i].record_index->load(std::memory_order_acquire); + if (other_index + 1 < static_cast(this->outputs_[i].record.size()) && + segment.timestamp > this->outputs_[i].record[other_index + 1].timestamp) { + VPRINT(this, 3, + "next_record[%d]: waiting because timestamp %" PRIu64 + " is ahead of output %d\n", + output, segment.timestamp, i); + // Give up this input and go into a wait state. + // We'll come back here on the next next_record() call. + // XXX: We should add a timeout just in case some timestamps are out of + // order due to using prior values, to avoid hanging. We try to avoid + // this by using wall-clock time in record_schedule_segment() rather than + // the stored output time. + this->set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); + this->outputs_[output].waiting = true; + return sched_type_t::STATUS_WAIT; + } + } + } + if (segment.type == schedule_record_t::SYNTHETIC_END) { + std::lock_guard lock(*this->inputs_[index].lock); + // We're past the final region of interest and we need to insert + // a synthetic thread exit record. We need to first throw out the + // queued candidate record, if any. + this->clear_input_queue(this->inputs_[index]); + this->inputs_[index].queue.push_back( + this->create_thread_exit(this->inputs_[index].tid)); + VPRINT(this, 2, "early end for input %d\n", index); + // We're done with this entry but we need the queued record to be read, + // so we do not move past the entry. + this->outputs_[output].record_index->fetch_add(1, std::memory_order_release); + stream_status_t status = this->mark_input_eof(this->inputs_[index]); + if (status != sched_type_t::STATUS_OK) + return status; + return sched_type_t::STATUS_SKIPPED; + } else if (segment.type == schedule_record_t::SKIP) { + std::lock_guard lock(*this->inputs_[index].lock); + uint64_t cur_reader_instr = + this->inputs_[index].reader->get_instruction_ordinal(); + VPRINT(this, 2, + "next_record[%d]: skipping from %" PRId64 " to %" PRId64 + " in %d for schedule\n", + output, cur_reader_instr, segment.stop_instruction, index); + auto status = this->skip_instructions(this->inputs_[index], + segment.stop_instruction - + cur_reader_instr - 1 /*exclusive*/); + // Increment the region to get window id markers with ordinals. + this->inputs_[index].cur_region++; + if (status != sched_type_t::STATUS_SKIPPED) + return sched_type_t::STATUS_INVALID; + // We're done with the skip so move to and past it. + this->outputs_[output].record_index->fetch_add(2, std::memory_order_release); + return sched_type_t::STATUS_SKIPPED; + } else { + VPRINT(this, 2, "next_record[%d]: advancing to input %d instr #%" PRId64 "\n", + output, index, segment.value.start_instruction); + } + this->outputs_[output].record_index->fetch_add(1, std::memory_order_release); + VDO(this, 2, { + // Our own index is only modified by us so we can cache it here. + int local_index = + this->outputs_[output].record_index->load(std::memory_order_acquire); + if (local_index >= 0 && + local_index < static_cast(this->outputs_[output].record.size())) { + const schedule_record_t &local_segment = + this->outputs_[output].record[local_index]; + int input = local_segment.key.input; + VPRINT(this, 2, + "next_record[%d]: replay segment in=%d (@%" PRId64 + ") type=%d start=%" PRId64 " end=%" PRId64 "\n", + output, input, this->get_instr_ordinal(this->inputs_[input]), + local_segment.type, local_segment.value.start_instruction, + local_segment.stop_instruction); + } + }); + return sched_type_t::STATUS_OK; +} + +template class scheduler_replay_tmpl_t; +template class scheduler_replay_tmpl_t; + +} // namespace drmemtrace +} // namespace dynamorio