From 388ffb025f8d4d21ac08422c27f85e34a0d1d567 Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Wed, 13 Nov 2024 10:19:19 -0500 Subject: [PATCH 1/5] i#6831 sched refactor, step 6: Split set_initial_schedule() Splits set_initial_schedule() into separate overrides in the 3 scheduler subclasses, sharing the initial content reading in init() prior to calling set_initial_schedule(). Splits set_output_active(): it is only valid in dynamic. Moves rebalance_queues() and two rebalance vars to dynamic. Moves read_recorded_schedule() and read_and_instantiate_traced_schedule() to replay. Adds more using statements to remove the need for "this->" prefixes on frequent base class member field references. Issue: #6831 --- clients/drcachesim/scheduler/scheduler.cpp | 3 +- .../scheduler/scheduler_dynamic.cpp | 356 +++++++++-- .../drcachesim/scheduler/scheduler_fixed.cpp | 72 ++- .../drcachesim/scheduler/scheduler_impl.cpp | 579 +----------------- clients/drcachesim/scheduler/scheduler_impl.h | 105 +++- .../drcachesim/scheduler/scheduler_replay.cpp | 352 +++++++++-- .../drcachesim/tests/scheduler_unit_tests.cpp | 54 +- 7 files changed, 781 insertions(+), 740 deletions(-) diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index cc1c6ddfd72..828c59be514 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -60,7 +60,8 @@ scheduler_tmpl_t::init( new scheduler_replay_tmpl_t); } else { // Non-dynamic and non-replay fixed modes such as analyzer serial and - // parallel modes. + // parallel modes. Although serial does do interleaving by timestamp + // it is closer to parallel mode than the file-based replay modes. impl_ = std::unique_ptr, scheduler_impl_deleter_t>( new scheduler_fixed_tmpl_t); diff --git a/clients/drcachesim/scheduler/scheduler_dynamic.cpp b/clients/drcachesim/scheduler/scheduler_dynamic.cpp index 92d47b1591d..9897d010f01 100644 --- a/clients/drcachesim/scheduler/scheduler_dynamic.cpp +++ b/clients/drcachesim/scheduler/scheduler_dynamic.cpp @@ -51,6 +51,123 @@ namespace dynamorio { namespace drmemtrace { +template +typename scheduler_tmpl_t::scheduler_status_t +scheduler_dynamic_tmpl_t::set_initial_schedule( + std::unordered_map> &workload2inputs) +{ + if (options_.mapping != sched_type_t::MAP_TO_ANY_OUTPUT) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + // Assign initial inputs. + if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { + // Compute the min timestamp (==base_timestamp) per workload and sort + // all inputs by relative time from the base. + for (int workload_idx = 0; + workload_idx < static_cast(workload2inputs.size()); ++workload_idx) { + uint64_t min_time = std::numeric_limits::max(); + input_ordinal_t min_input = -1; + for (int input_idx : workload2inputs[workload_idx]) { + if (inputs_[input_idx].next_timestamp < min_time) { + min_time = inputs_[input_idx].next_timestamp; + min_input = input_idx; + } + } + if (min_input < 0) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + for (int input_idx : workload2inputs[workload_idx]) { + VPRINT(this, 4, + "workload %d: setting input %d base_timestamp to %" PRIu64 + " vs next_timestamp %zu\n", + workload_idx, input_idx, min_time, + inputs_[input_idx].next_timestamp); + inputs_[input_idx].base_timestamp = min_time; + inputs_[input_idx].order_by_timestamp = true; + } + } + // We'll pick the starting inputs below by sorting by relative time from + // each workload's base_timestamp, which our queue does for us. + } + // First, put all inputs into a temporary queue to sort by priority and + // time for us. + flexible_queue_t allq; + for (int i = 0; i < static_cast(inputs_.size()); ++i) { + inputs_[i].queue_counter = i; + allq.push(&inputs_[i]); + } + // Now assign round-robin to the outputs. We have to obey bindings here: we + // just take the first. This isn't guaranteed to be perfect if there are + // many bindings, but we run a rebalancing afterward. + output_ordinal_t output = 0; + while (!allq.empty()) { + input_info_t *input = allq.top(); + allq.pop(); + output_ordinal_t target = output; + if (!input->binding.empty()) + target = *input->binding.begin(); + else + output = (output + 1) % outputs_.size(); + this->add_to_ready_queue(target, input); + } + stream_status_t status = rebalance_queues(0, {}); + if (status != sched_type_t::STATUS_OK) { + VPRINT(this, 0, "Failed to rebalance with status %d\n", status); + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + for (int i = 0; i < static_cast(outputs_.size()); ++i) { + input_info_t *queue_next; +#ifndef NDEBUG + status = +#endif + this->pop_from_ready_queue(i, i, queue_next); + assert(status == sched_type_t::STATUS_OK || status == sched_type_t::STATUS_IDLE); + if (queue_next == nullptr) + set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); + else + set_cur_input(i, queue_next->index); + } + VPRINT(this, 2, "Initial queues:\n"); + VDO(this, 2, { this->print_queue_stats(); }); + + return sched_type_t::STATUS_SUCCESS; +} + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::set_output_active( + output_ordinal_t output, bool active) +{ + if (outputs_[output].active->load(std::memory_order_acquire) == active) + return sched_type_t::STATUS_OK; + outputs_[output].active->store(active, std::memory_order_release); + VPRINT(this, 2, "Output stream %d is now %s\n", output, + active ? "active" : "inactive"); + std::vector ordinals; + if (!active) { + // Make the now-inactive output's input available for other cores. + // This will reset its quantum too. + // We aren't switching on a just-read instruction not passed to the consumer, + // if the queue is empty. + input_ordinal_t cur_input = outputs_[output].cur_input; + if (cur_input != sched_type_t::INVALID_INPUT_ORDINAL) { + if (inputs_[cur_input].queue.empty()) + inputs_[cur_input].switching_pre_instruction = true; + set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); + } + // Move the ready_queue to other outputs. + { + auto lock = this->acquire_scoped_output_lock_if_necessary(output); + while (!outputs_[output].ready_queue.queue.empty()) { + input_info_t *tomove = outputs_[output].ready_queue.queue.top(); + ordinals.push_back(tomove->index); + outputs_[output].ready_queue.queue.pop(); + } + } + } else { + outputs_[output].waiting = true; + } + return rebalance_queues(output, ordinals); +} + template typename scheduler_tmpl_t::stream_status_t scheduler_dynamic_tmpl_t::pick_next_input_for_mode( @@ -58,17 +175,16 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( input_ordinal_t &index) { uint64_t cur_time = this->get_output_time(output); - uint64_t last_time = this->last_rebalance_time_.load(std::memory_order_acquire); + uint64_t last_time = last_rebalance_time_.load(std::memory_order_acquire); if (last_time == 0) { // Initialize. - this->last_rebalance_time_.store(cur_time, std::memory_order_release); + last_rebalance_time_.store(cur_time, std::memory_order_release); } else { // Guard against time going backward, which happens: i#6966. if (cur_time > last_time && - cur_time - last_time >= - static_cast(this->options_.rebalance_period_us * - this->options_.time_units_per_us) && - this->rebalancer_.load(std::memory_order_acquire) == std::thread::id()) { + cur_time - last_time >= static_cast(options_.rebalance_period_us * + options_.time_units_per_us) && + rebalancer_.load(std::memory_order_acquire) == std::thread::id()) { VPRINT(this, 2, "Output %d hit rebalance period @%" PRIu64 " (last rebalance @%" PRIu64 ")\n", @@ -79,25 +195,24 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( } } if (blocked_time > 0 && prev_index != sched_type_t::INVALID_INPUT_ORDINAL) { - std::lock_guard lock(*this->inputs_[prev_index].lock); - if (this->inputs_[prev_index].blocked_time == 0) { + std::lock_guard lock(*inputs_[prev_index].lock); + if (inputs_[prev_index].blocked_time == 0) { VPRINT(this, 2, "next_record[%d]: blocked time %" PRIu64 "\n", output, blocked_time); - this->inputs_[prev_index].blocked_time = blocked_time; - this->inputs_[prev_index].blocked_start_time = this->get_output_time(output); + inputs_[prev_index].blocked_time = blocked_time; + inputs_[prev_index].blocked_start_time = this->get_output_time(output); } } if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL && - this->inputs_[prev_index].switch_to_input != - sched_type_t::INVALID_INPUT_ORDINAL) { - input_info_t *target = &this->inputs_[this->inputs_[prev_index].switch_to_input]; - this->inputs_[prev_index].switch_to_input = sched_type_t::INVALID_INPUT_ORDINAL; + inputs_[prev_index].switch_to_input != sched_type_t::INVALID_INPUT_ORDINAL) { + input_info_t *target = &inputs_[inputs_[prev_index].switch_to_input]; + inputs_[prev_index].switch_to_input = sched_type_t::INVALID_INPUT_ORDINAL; std::unique_lock target_input_lock(*target->lock); // XXX i#5843: Add an invariant check that the next timestamp of the // target is later than the pre-switch-syscall timestamp? if (target->containing_output != sched_type_t::INVALID_OUTPUT_ORDINAL) { output_ordinal_t target_output = target->containing_output; - output_info_t &out = this->outputs_[target->containing_output]; + output_info_t &out = outputs_[target->containing_output]; // We cannot hold an input lock when we acquire an output lock. target_input_lock.unlock(); { @@ -110,7 +225,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( "input %d " "@%" PRIu64 "\n", output, prev_index, target->index, - this->inputs_[prev_index].reader->get_last_timestamp()); + inputs_[prev_index].reader->get_last_timestamp()); out.ready_queue.queue.erase(target); index = target->index; // Erase any remaining wait time for the target. @@ -125,10 +240,10 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( target->unscheduled = false; } if (target->containing_output != output) { - ++this->outputs_[output] + ++outputs_[output] .stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; } - ++this->outputs_[output] + ++outputs_[output] .stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; } // Else, actively running. target_input_lock.unlock(); @@ -146,12 +261,12 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( "was-unscheduled input %d " "@%" PRIu64 "\n", output, prev_index, target->index, - this->inputs_[prev_index].reader->get_last_timestamp()); + inputs_[prev_index].reader->get_last_timestamp()); if (target->prev_output != sched_type_t::INVALID_OUTPUT_ORDINAL && target->prev_output != output) { - ++this->outputs_[output].stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; + ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; } - ++this->outputs_[output] + ++outputs_[output] .stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; } if (index == sched_type_t::INVALID_INPUT_ORDINAL) { @@ -166,7 +281,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( "Direct switch (from %d) target input #%d is running " "elsewhere; picking a different target @%" PRIu64 "\n", prev_index, target->index, - this->inputs_[prev_index].reader->get_last_timestamp()); + inputs_[prev_index].reader->get_last_timestamp()); // We do ensure the missed target doesn't wait indefinitely. // XXX i#6822: It's not clear this is always the right thing to // do. @@ -189,21 +304,19 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( if (status != sched_type_t::STATUS_STOLE) return status; // eof_or_idle stole an input for us, now in .cur_input. - index = this->outputs_[output].cur_input; + index = outputs_[output].cur_input; return sched_type_t::STATUS_OK; } else { - auto lock = - std::unique_lock(*this->inputs_[prev_index].lock); + auto lock = std::unique_lock(*inputs_[prev_index].lock); // If we can't go back to the current input because it's EOF // or unscheduled indefinitely (we already checked blocked_time // above: it's 0 here), this output is either idle or EOF. - if (this->inputs_[prev_index].at_eof || - this->inputs_[prev_index].unscheduled) { + if (inputs_[prev_index].at_eof || inputs_[prev_index].unscheduled) { lock.unlock(); stream_status_t status = this->eof_or_idle(output, prev_index); if (status != sched_type_t::STATUS_STOLE) return status; - index = this->outputs_[output].cur_input; + index = outputs_[output].cur_input; return sched_type_t::STATUS_OK; } else index = prev_index; // Go back to prior. @@ -216,22 +329,22 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( // shouldn't switch. The queue preserves FIFO for same-priority // cases so we will switch if someone of equal priority is // waiting. - this->set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); + set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); input_info_t *queue_next = nullptr; stream_status_t status = this->pop_from_ready_queue(output, output, queue_next); if (status != sched_type_t::STATUS_OK) { if (status == sched_type_t::STATUS_IDLE) { - this->outputs_[output].waiting = true; - if (this->options_.schedule_record_ostream != nullptr) { + outputs_[output].waiting = true; + if (options_.schedule_record_ostream != nullptr) { stream_status_t record_status = this->record_schedule_segment( output, schedule_record_t::IDLE_BY_COUNT, 0, // Start prior to this idle. - this->outputs_[output].idle_count - 1, 0); + outputs_[output].idle_count - 1, 0); if (record_status != sched_type_t::STATUS_OK) return record_status; } if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL) { - ++this->outputs_[output] + ++outputs_[output] .stats[memtrace_stream_t::SCHED_STAT_SWITCH_INPUT_TO_IDLE]; } } @@ -241,7 +354,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( status = this->eof_or_idle(output, prev_index); if (status != sched_type_t::STATUS_STOLE) return status; - index = this->outputs_[output].cur_input; + index = outputs_[output].cur_input; return sched_type_t::STATUS_OK; } else index = queue_next->index; @@ -267,8 +380,7 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( // boundaries so we live with those being before the switch. // XXX: Once we insert kernel traces, we may have to try harder // to stop before the post-syscall records. - if (this->record_type_is_instr_boundary(record, - this->outputs_[output].last_record)) { + if (this->record_type_is_instr_boundary(record, outputs_[output].last_record)) { if (input->switch_to_input != sched_type_t::INVALID_INPUT_ORDINAL) { // The switch request overrides any latency threshold. need_new_input = true; @@ -300,12 +412,12 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( input->syscall_timeout_arg = 0; } } - if (this->outputs_[output].hit_switch_code_end) { + if (outputs_[output].hit_switch_code_end) { // We have to delay so the end marker is still in_context_switch_code. - this->outputs_[output].in_context_switch_code = false; - this->outputs_[output].hit_switch_code_end = false; + outputs_[output].in_context_switch_code = false; + outputs_[output].hit_switch_code_end = false; // We're now back "on the clock". - if (this->options_.quantum_unit == sched_type_t::QUANTUM_TIME) + if (options_.quantum_unit == sched_type_t::QUANTUM_TIME) input->prev_time_in_quantum = cur_time; // XXX: If we add a skip feature triggered on the output stream, // we'll want to make sure skipping while in these switch and kernel @@ -314,11 +426,11 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( if (this->record_type_is_marker(record, marker_type, marker_value)) { this->process_marker(*input, output, marker_type, marker_value); } - if (this->options_.quantum_unit == sched_type_t::QUANTUM_INSTRUCTIONS && - this->record_type_is_instr_boundary(record, this->outputs_[output].last_record) && - !this->outputs_[output].in_kernel_code) { + if (options_.quantum_unit == sched_type_t::QUANTUM_INSTRUCTIONS && + this->record_type_is_instr_boundary(record, outputs_[output].last_record) && + !outputs_[output].in_kernel_code) { ++input->instrs_in_quantum; - if (input->instrs_in_quantum > this->options_.quantum_duration_instrs) { + if (input->instrs_in_quantum > options_.quantum_duration_instrs) { // We again prefer to switch to another input even if the current // input has the oldest timestamp, prioritizing context switches // over timestamp ordering. @@ -327,10 +439,9 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( preempt = true; need_new_input = true; input->instrs_in_quantum = 0; - ++this->outputs_[output] - .stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS]; + ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS]; } - } else if (this->options_.quantum_unit == sched_type_t::QUANTUM_TIME) { + } else if (options_.quantum_unit == sched_type_t::QUANTUM_TIME) { if (cur_time == 0 || cur_time < input->prev_time_in_quantum) { VPRINT(this, 1, "next_record[%d]: invalid time %" PRIu64 " vs start %" PRIu64 "\n", @@ -340,13 +451,12 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( input->time_spent_in_quantum += cur_time - input->prev_time_in_quantum; input->prev_time_in_quantum = cur_time; double elapsed_micros = static_cast(input->time_spent_in_quantum) / - this->options_.time_units_per_us; - if (elapsed_micros >= this->options_.quantum_duration_us && + options_.time_units_per_us; + if (elapsed_micros >= options_.quantum_duration_us && // We only switch on instruction boundaries. We could possibly switch // in between (e.g., scatter/gather long sequence of reads/writes) by // setting input->switching_pre_instruction. - this->record_type_is_instr_boundary(record, - this->outputs_[output].last_record)) { + this->record_type_is_instr_boundary(record, outputs_[output].last_record)) { VPRINT(this, 4, "next_record[%d]: input %d hit end of time quantum after %" PRIu64 "\n", @@ -354,8 +464,7 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( preempt = true; need_new_input = true; input->time_spent_in_quantum = 0; - ++this->outputs_[output] - .stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS]; + ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS]; } } // For sched_type_t::DEPENDENCY_TIMESTAMPS: enforcing asked-for @@ -366,6 +475,147 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( return sched_type_t::STATUS_OK; } +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::rebalance_queues( + output_ordinal_t triggering_output, std::vector inputs_to_add) +{ + std::thread::id nobody; + if (!rebalancer_.compare_exchange_weak(nobody, std::this_thread::get_id(), + std::memory_order_release, + std::memory_order_relaxed)) { + // Someone else is rebalancing. + return sched_type_t::STATUS_OK; + } + stream_status_t status = sched_type_t::STATUS_OK; + assert(options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT); + VPRINT(this, 1, "Output %d triggered a rebalance @%" PRIu64 ":\n", triggering_output, + this->get_output_time(triggering_output)); + // First, update the time to avoid more threads coming here. + last_rebalance_time_.store(this->get_output_time(triggering_output), + std::memory_order_release); + VPRINT(this, 2, "Before rebalance:\n"); + VDO(this, 2, { this->print_queue_stats(); }); + ++outputs_[triggering_output] + .stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_REBALANCES]; + + // Workaround to avoid hangs when _SCHEDULE and/or _DIRECT_THREAD_SWITCH + // directives miss their targets (due to running with a subset of the + // original threads, or other scenarios) and we end up with no scheduled + // inputs but a set of unscheduled inputs who will never be scheduled. + // TODO i#6959: Just exit early instead, maybe under a flag. + // It would help to see what % of total records we've processed. + size_t unsched_size = 0; + { + std::lock_guard unsched_lock(*unscheduled_priority_.lock); + unsched_size = unscheduled_priority_.queue.size(); + } + if (this->live_input_count_.load(std::memory_order_acquire) == + static_cast(unsched_size)) { + VPRINT( + this, 1, + "rebalancing moving entire unscheduled queue (%zu entries) to ready_queues\n", + unsched_size); + { + std::lock_guard unsched_lock(*unscheduled_priority_.lock); + while (!unscheduled_priority_.queue.empty()) { + input_info_t *tomove = unscheduled_priority_.queue.top(); + inputs_to_add.push_back(tomove->index); + unscheduled_priority_.queue.pop(); + } + } + for (input_ordinal_t input : inputs_to_add) { + std::lock_guard lock(*inputs_[input].lock); + inputs_[input].unscheduled = false; + } + } + + int live_inputs = this->live_input_count_.load(std::memory_order_acquire); + int live_outputs = 0; + for (unsigned int i = 0; i < outputs_.size(); ++i) { + if (outputs_[i].active->load(std::memory_order_acquire)) + ++live_outputs; + } + double avg_per_output = live_inputs / static_cast(live_outputs); + unsigned int avg_ceiling = static_cast(std::ceil(avg_per_output)); + unsigned int avg_floor = static_cast(std::floor(avg_per_output)); + int iteration = 0; + do { + // Walk the outputs, filling too-short queues from inputs_to_add and + // shrinking too-long queues into inputs_to_add. We may need a 2nd pass + // for this; and a 3rd pass if bindings prevent even splitting. + VPRINT( + this, 3, + "Rebalance iteration %d inputs_to_add size=%zu avg_per_output=%4.1f %d-%d\n", + iteration, inputs_to_add.size(), avg_per_output, avg_floor, avg_ceiling); + // We're giving up the output locks as we go, so there may be some stealing + // in the middle of our operation, but the rebalancing is approximate anyway. + for (unsigned int i = 0; i < outputs_.size(); ++i) { + if (!outputs_[i].active->load(std::memory_order_acquire)) + continue; + auto lock = this->acquire_scoped_output_lock_if_necessary(i); + // Only remove on the 1st iteration; later we can exceed due to binding + // constraints. + while (iteration == 0 && outputs_[i].ready_queue.queue.size() > avg_ceiling) { + input_info_t *queue_next = nullptr; + // We use our regular pop_from_ready_queue which means we leave + // blocked inputs on the queue: those do not get rebalanced. + // XXX: Should we revisit that? + // + // We remove from the back to avoid penalizing the next-to-run entries + // at the front of the queue by putting them at the back of another + // queue. + status = this->pop_from_ready_queue_hold_locks( + i, sched_type_t::INVALID_OUTPUT_ORDINAL, queue_next, + /*from_back=*/true); + if (status == sched_type_t::STATUS_OK && queue_next != nullptr) { + VPRINT(this, 3, + "Rebalance iteration %d: output %d giving up input %d\n", + iteration, i, queue_next->index); + inputs_to_add.push_back(queue_next->index); + } else + break; + } + std::vector incompatible_inputs; + // If we reach the 3rd iteration, we have fussy inputs with bindings. + // Try to add them to every output. + while ( + (outputs_[i].ready_queue.queue.size() < avg_ceiling || iteration > 1) && + !inputs_to_add.empty()) { + input_ordinal_t ordinal = inputs_to_add.back(); + inputs_to_add.pop_back(); + input_info_t &input = inputs_[ordinal]; + std::lock_guard input_lock(*input.lock); + if (input.binding.empty() || + input.binding.find(i) != input.binding.end()) { + VPRINT(this, 3, "Rebalance iteration %d: output %d taking input %d\n", + iteration, i, ordinal); + this->add_to_ready_queue_hold_locks(i, &input); + } else { + incompatible_inputs.push_back(ordinal); + } + } + inputs_to_add.insert(inputs_to_add.end(), incompatible_inputs.begin(), + incompatible_inputs.end()); + } + ++iteration; + if (iteration >= 3 && !inputs_to_add.empty()) { + // This is possible with bindings limited to inactive outputs. + // XXX: Rather than return an error, we could add to the unscheduled queue, + // but do not mark the input unscheduled. Then when an output is + // marked active, we could walk the unscheduled queue and take + // inputs not marked unscheduled. + VPRINT(this, 1, "Rebalance hit impossible binding\n"); + status = sched_type_t::STATUS_IMPOSSIBLE_BINDING; + break; + } + } while (!inputs_to_add.empty()); + VPRINT(this, 2, "After:\n"); + VDO(this, 2, { this->print_queue_stats(); }); + rebalancer_.store(std::thread::id(), std::memory_order_release); + return status; +} + template class scheduler_dynamic_tmpl_t; template class scheduler_dynamic_tmpl_t; diff --git a/clients/drcachesim/scheduler/scheduler_fixed.cpp b/clients/drcachesim/scheduler/scheduler_fixed.cpp index e9c1ed9f9e7..2b7adee2be7 100644 --- a/clients/drcachesim/scheduler/scheduler_fixed.cpp +++ b/clients/drcachesim/scheduler/scheduler_fixed.cpp @@ -35,11 +35,9 @@ #include "scheduler.h" #include "scheduler_impl.h" -#include #include #include #include -#include #include "memref.h" #include "mutex_dbg_owned.h" @@ -50,19 +48,64 @@ namespace dynamorio { namespace drmemtrace { +template +typename scheduler_tmpl_t::scheduler_status_t +scheduler_fixed_tmpl_t::set_initial_schedule( + std::unordered_map> &workload2inputs) +{ + if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) { + // Assign the inputs up front to avoid locks once we're in parallel mode. + // We use a simple round-robin static assignment for now. + for (int i = 0; i < static_cast(inputs_.size()); ++i) { + size_t index = i % outputs_.size(); + if (outputs_[index].input_indices.empty()) + set_cur_input(static_cast(index), i); + outputs_[index].input_indices.push_back(i); + VPRINT(this, 2, "Assigning input #%d to output #%zd\n", i, index); + } + } else if (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT) { + if (options_.replay_as_traced_istream != nullptr) { + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } else if (outputs_.size() > 1) { + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } else if (inputs_.size() == 1) { + set_cur_input(0, 0); + } else { + // The old file_reader_t interleaving would output the top headers for every + // thread first and then pick the oldest timestamp once it reached a + // timestamp. We instead queue those headers so we can start directly with the + // oldest timestamp's thread. + uint64_t min_time = std::numeric_limits::max(); + input_ordinal_t min_input = -1; + for (int i = 0; i < static_cast(inputs_.size()); ++i) { + if (inputs_[i].next_timestamp < min_time) { + min_time = inputs_[i].next_timestamp; + min_input = i; + } + } + if (min_input < 0) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + set_cur_input(0, static_cast(min_input)); + } + } else { + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + return sched_type_t::STATUS_SUCCESS; +} + template typename scheduler_tmpl_t::stream_status_t scheduler_fixed_tmpl_t::pick_next_input_for_mode( output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) { - if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { + if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { uint64_t min_time = std::numeric_limits::max(); - for (size_t i = 0; i < this->inputs_.size(); ++i) { - std::lock_guard lock(*this->inputs_[i].lock); - if (!this->inputs_[i].at_eof && this->inputs_[i].next_timestamp > 0 && - this->inputs_[i].next_timestamp < min_time) { - min_time = this->inputs_[i].next_timestamp; + for (size_t i = 0; i < inputs_.size(); ++i) { + std::lock_guard lock(*inputs_[i].lock); + if (!inputs_[i].at_eof && inputs_[i].next_timestamp > 0 && + inputs_[i].next_timestamp < min_time) { + min_time = inputs_[i].next_timestamp; index = static_cast(i); } } @@ -70,25 +113,24 @@ scheduler_fixed_tmpl_t::pick_next_input_for_mode( stream_status_t status = this->eof_or_idle(output, prev_index); if (status != sched_type_t::STATUS_STOLE) return status; - index = this->outputs_[output].cur_input; + index = outputs_[output].cur_input; return sched_type_t::STATUS_OK; } VPRINT(this, 2, "next_record[%d]: advancing to timestamp %" PRIu64 " == input #%d\n", output, min_time, index); - } else if (this->options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) { + } else if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) { // We're done with the prior thread; take the next one that was // pre-allocated to this output (pre-allocated to avoid locks). Invariant: // the same output will not be accessed by two different threads // simultaneously in this mode, allowing us to support a lock-free // parallel-friendly increment here. - int indices_index = ++this->outputs_[output].input_indices_index; - if (indices_index >= - static_cast(this->outputs_[output].input_indices.size())) { + int indices_index = ++outputs_[output].input_indices_index; + if (indices_index >= static_cast(outputs_[output].input_indices.size())) { VPRINT(this, 2, "next_record[%d]: all at eof\n", output); return sched_type_t::STATUS_EOF; } - index = this->outputs_[output].input_indices[indices_index]; + index = outputs_[output].input_indices[indices_index]; VPRINT(this, 2, "next_record[%d]: advancing to local index %d == input #%d\n", output, indices_index, index); } else @@ -103,7 +145,7 @@ scheduler_fixed_tmpl_t::check_for_input_switch( output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time, bool &need_new_input, bool &preempt, uint64_t &blocked_time) { - if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS && + if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS && this->record_type_is_timestamp(record, input->next_timestamp)) need_new_input = true; return sched_type_t::STATUS_OK; diff --git a/clients/drcachesim/scheduler/scheduler_impl.cpp b/clients/drcachesim/scheduler/scheduler_impl.cpp index a5a553a3e67..10e53782773 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.cpp +++ b/clients/drcachesim/scheduler/scheduler_impl.cpp @@ -871,6 +871,35 @@ scheduler_impl_tmpl_t::init( if (res != sched_type_t::STATUS_SUCCESS) return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + // Determine whether we need to read ahead in the inputs. There are cases where we + // do not want to do that as it would block forever if the inputs are not available + // (e.g., online analysis IPC readers); it also complicates ordinals so we avoid it + // if we can and enumerate all the cases that do need it. + bool gather_timestamps = false; + if (((options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY || + options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT) && + options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) || + (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT && + options_.replay_as_traced_istream == nullptr && inputs_.size() > 1)) { + gather_timestamps = true; + if (!options_.read_inputs_in_init) { + error_string_ = "Timestamp dependencies require read_inputs_in_init"; + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + } + // The filetype, if present, is before the first timestamp. If we only need the + // filetype we avoid going as far as the timestamp. + bool gather_filetype = options_.read_inputs_in_init; + if (gather_filetype || gather_timestamps) { + scheduler_status_t res = this->get_initial_input_content(gather_timestamps); + if (res != sched_type_t::STATUS_SUCCESS) { + error_string_ = "Failed to read initial input contents for filetype"; + if (gather_timestamps) + error_string_ += " and initial timestamps"; + return res; + } + } + return set_initial_schedule(workload2inputs); } @@ -941,176 +970,6 @@ scheduler_impl_tmpl_t::legacy_field_support() return sched_type_t::STATUS_SUCCESS; } -template -typename scheduler_tmpl_t::scheduler_status_t -scheduler_impl_tmpl_t::set_initial_schedule( - std::unordered_map> &workload2inputs) -{ - // Determine whether we need to read ahead in the inputs. There are cases where we - // do not want to do that as it would block forever if the inputs are not available - // (e.g., online analysis IPC readers); it also complicates ordinals so we avoid it - // if we can and enumerate all the cases that do need it. - bool gather_timestamps = false; - if (((options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY || - options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT) && - options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) || - (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT && - options_.replay_as_traced_istream == nullptr && inputs_.size() > 1)) { - gather_timestamps = true; - if (!options_.read_inputs_in_init) { - error_string_ = "Timestamp dependencies require read_inputs_in_init"; - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } - } - // The filetype, if present, is before the first timestamp. If we only need the - // filetype we avoid going as far as the timestamp. - bool gather_filetype = options_.read_inputs_in_init; - if (gather_filetype || gather_timestamps) { - scheduler_status_t res = get_initial_input_content(gather_timestamps); - if (res != sched_type_t::STATUS_SUCCESS) { - error_string_ = "Failed to read initial input contents for filetype"; - if (gather_timestamps) - error_string_ += " and initial timestamps"; - return res; - } - } - - if (options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY) { - live_replay_output_count_.store(static_cast(outputs_.size()), - std::memory_order_release); - if (options_.schedule_replay_istream == nullptr || - options_.schedule_record_ostream != nullptr) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - scheduler_status_t status = read_recorded_schedule(); - if (status != sched_type_t::STATUS_SUCCESS) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { - // Match the ordinals from the original run by pre-reading the timestamps. - assert(gather_timestamps); - } - } else if (options_.schedule_replay_istream != nullptr) { - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } else if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) { - // Assign the inputs up front to avoid locks once we're in parallel mode. - // We use a simple round-robin static assignment for now. - for (int i = 0; i < static_cast(inputs_.size()); ++i) { - size_t index = i % outputs_.size(); - if (outputs_[index].input_indices.empty()) - set_cur_input(static_cast(index), i); - outputs_[index].input_indices.push_back(i); - VPRINT(this, 2, "Assigning input #%d to output #%zd\n", i, index); - } - } else if (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT) { - if (options_.replay_as_traced_istream != nullptr) { - // Even for just one output we honor a request to replay the schedule - // (although it should match the analyzer serial mode so there's no big - // benefit to reading the schedule file. The analyzer serial mode or other - // special cases of one output don't set the replay_as_traced_istream - // field.) - scheduler_status_t status = read_and_instantiate_traced_schedule(); - if (status != sched_type_t::STATUS_SUCCESS) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - // Now leverage the regular replay code. - options_.mapping = sched_type_t::MAP_AS_PREVIOUSLY; - } else if (outputs_.size() > 1) { - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } else if (inputs_.size() == 1) { - set_cur_input(0, 0); - } else { - // The old file_reader_t interleaving would output the top headers for every - // thread first and then pick the oldest timestamp once it reached a - // timestamp. We instead queue those headers so we can start directly with the - // oldest timestamp's thread. - assert(gather_timestamps); - uint64_t min_time = std::numeric_limits::max(); - input_ordinal_t min_input = -1; - for (int i = 0; i < static_cast(inputs_.size()); ++i) { - if (inputs_[i].next_timestamp < min_time) { - min_time = inputs_[i].next_timestamp; - min_input = i; - } - } - if (min_input < 0) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - set_cur_input(0, static_cast(min_input)); - } - } else { - // Assign initial inputs. - if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { - assert(gather_timestamps); - // Compute the min timestamp (==base_timestamp) per workload and sort - // all inputs by relative time from the base. - for (int workload_idx = 0; - workload_idx < static_cast(workload2inputs.size()); - ++workload_idx) { - uint64_t min_time = std::numeric_limits::max(); - input_ordinal_t min_input = -1; - for (int input_idx : workload2inputs[workload_idx]) { - if (inputs_[input_idx].next_timestamp < min_time) { - min_time = inputs_[input_idx].next_timestamp; - min_input = input_idx; - } - } - if (min_input < 0) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - for (int input_idx : workload2inputs[workload_idx]) { - VPRINT(this, 4, - "workload %d: setting input %d base_timestamp to %" PRIu64 - " vs next_timestamp %zu\n", - workload_idx, input_idx, min_time, - inputs_[input_idx].next_timestamp); - inputs_[input_idx].base_timestamp = min_time; - inputs_[input_idx].order_by_timestamp = true; - } - } - // We'll pick the starting inputs below by sorting by relative time from - // each workload's base_timestamp, which our queue does for us. - } - // First, put all inputs into a temporary queue to sort by priority and - // time for us. - flexible_queue_t allq; - for (int i = 0; i < static_cast(inputs_.size()); ++i) { - inputs_[i].queue_counter = i; - allq.push(&inputs_[i]); - } - // Now assign round-robin to the outputs. We have to obey bindings here: we - // just take the first. This isn't guaranteed to be perfect if there are - // many bindings, but we run a rebalancing afterward. - output_ordinal_t output = 0; - while (!allq.empty()) { - input_info_t *input = allq.top(); - allq.pop(); - output_ordinal_t target = output; - if (!input->binding.empty()) - target = *input->binding.begin(); - else - output = (output + 1) % outputs_.size(); - add_to_ready_queue(target, input); - } - stream_status_t status = rebalance_queues(0, {}); - if (status != sched_type_t::STATUS_OK) { - VPRINT(this, 0, "Failed to rebalance with status %d\n", status); - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } - for (int i = 0; i < static_cast(outputs_.size()); ++i) { - input_info_t *queue_next; -#ifndef NDEBUG - status = -#endif - pop_from_ready_queue(i, i, queue_next); - assert(status == sched_type_t::STATUS_OK || - status == sched_type_t::STATUS_IDLE); - if (queue_next == nullptr) - set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); - else - set_cur_input(i, queue_next->index); - } - VPRINT(this, 2, "Initial queues:\n"); - VDO(this, 2, { print_queue_stats(); }); - } - return sched_type_t::STATUS_SUCCESS; -} - template std::string scheduler_impl_tmpl_t::recorded_schedule_component_name( @@ -1149,211 +1008,6 @@ scheduler_impl_tmpl_t::write_recorded_schedule() return sched_type_t::STATUS_SUCCESS; } -template -typename scheduler_tmpl_t::scheduler_status_t -scheduler_impl_tmpl_t::read_recorded_schedule() -{ - if (options_.schedule_replay_istream == nullptr) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - - schedule_record_t record; - // We assume we can easily fit the whole context switch sequence in memory. - // If that turns out not to be the case for very long traces, we deliberately - // used an archive format so we could do parallel incremental reads. - // (Conversely, if we want to commit to storing in memory, we could use a - // non-archive format and store the output ordinal in the version record.) - for (int i = 0; i < static_cast(outputs_.size()); ++i) { - std::string err = options_.schedule_replay_istream->open_component( - recorded_schedule_component_name(i)); - if (!err.empty()) { - error_string_ = "Failed to open schedule_replay_istream component " + - recorded_schedule_component_name(i) + ": " + err; - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } - // XXX: This could be made more efficient if we stored the record count - // in the version field's stop_instruction field or something so we can - // size the vector up front. As this only happens once we do not bother - // and live with a few vector resizes. - bool saw_footer = false; - while (options_.schedule_replay_istream->read(reinterpret_cast(&record), - sizeof(record))) { - if (record.type == schedule_record_t::VERSION) { - if (record.key.version != schedule_record_t::VERSION_CURRENT) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } else if (record.type == schedule_record_t::FOOTER) { - saw_footer = true; - break; - } else - outputs_[i].record.push_back(record); - } - if (!saw_footer) { - error_string_ = "Record file missing footer"; - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } - VPRINT(this, 1, "Read %zu recorded records for output #%d\n", - outputs_[i].record.size(), i); - } - // See if there was more data in the file (we do this after reading to not - // mis-report i/o or path errors as this error). - std::string err = options_.schedule_replay_istream->open_component( - recorded_schedule_component_name(static_cast(outputs_.size()))); - if (err.empty()) { - error_string_ = "Not enough output streams for recorded file"; - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } - for (int i = 0; i < static_cast(outputs_.size()); ++i) { - if (outputs_[i].record.empty()) { - // XXX i#6630: We should auto-set the output count and avoid - // having extra outputs; these complicate idle computations, etc. - VPRINT(this, 1, "output %d empty: returning eof up front\n", i); - set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); - outputs_[i].at_eof = true; - } else if (outputs_[i].record[0].type == schedule_record_t::IDLE || - outputs_[i].record[0].type == schedule_record_t::IDLE_BY_COUNT) { - set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); - outputs_[i].waiting = true; - if (outputs_[i].record[0].type == schedule_record_t::IDLE) { - // Convert a legacy idle duration from microseconds to record counts. - outputs_[i].record[0].value.idle_duration = - static_cast(options_.time_units_per_us * - outputs_[i].record[0].value.idle_duration); - } - outputs_[i].idle_start_count = -1; // Updated on first next_record(). - VPRINT(this, 3, "output %d starting out idle\n", i); - } else { - assert(outputs_[i].record[0].type == schedule_record_t::DEFAULT); - set_cur_input(i, outputs_[i].record[0].key.input); - } - } - return sched_type_t::STATUS_SUCCESS; -} - -template -typename scheduler_tmpl_t::scheduler_status_t -scheduler_impl_tmpl_t::read_and_instantiate_traced_schedule() -{ - std::vector> start2stop(inputs_.size()); - // We also want to collapse same-cpu consecutive records so we start with - // a temporary local vector. - std::vector> all_sched(outputs_.size()); - // Work around i#6107 by tracking counts sorted by timestamp for each input. - std::vector> input_sched(inputs_.size()); - // These hold entries added in the on-disk (unsorted) order. - std::vector disk_ord2index; // Initially [i] holds i. - std::vector disk_ord2cpuid; // [i] holds cpuid for entry i. - scheduler_status_t res = read_traced_schedule(input_sched, start2stop, all_sched, - disk_ord2index, disk_ord2cpuid); - if (res != sched_type_t::STATUS_SUCCESS) - return res; - // Sort by cpuid to get a more natural ordering. - // Probably raw2trace should do this in the first place, but we have many - // schedule files already out there so we still need a sort here. - // If we didn't have cross-indices pointing at all_sched from input_sched, we - // would just sort all_sched: but instead we have to construct a separate - // ordering structure. - std::sort(disk_ord2index.begin(), disk_ord2index.end(), - [disk_ord2cpuid](const output_ordinal_t &l, const output_ordinal_t &r) { - return disk_ord2cpuid[l] < disk_ord2cpuid[r]; - }); - // disk_ord2index[i] used to hold i; now after sorting it holds the ordinal in - // the disk file that has the ith largest cpuid. We need to turn that into - // the output_idx ordinal for the cpu at ith ordinal in the disk file, for - // which we use a new vector disk_ord2output. - // E.g., if the original file was in this order disk_ord2cpuid = {6,2,3,7}, - // disk_ord2index after sorting would hold {1,2,0,3}, which we want to turn - // into disk_ord2output = {2,0,1,3}. - std::vector disk_ord2output(disk_ord2index.size()); - for (size_t i = 0; i < disk_ord2index.size(); ++i) { - disk_ord2output[disk_ord2index[i]] = static_cast(i); - } - for (int disk_idx = 0; disk_idx < static_cast(outputs_.size()); - ++disk_idx) { - if (disk_idx >= static_cast(disk_ord2index.size())) { - // XXX i#6630: We should auto-set the output count and avoid - // having extra ouputs; these complicate idle computations, etc. - VPRINT(this, 1, "Output %d empty: returning eof up front\n", disk_idx); - outputs_[disk_idx].at_eof = true; - set_cur_input(disk_idx, sched_type_t::INVALID_INPUT_ORDINAL); - continue; - } - output_ordinal_t output_idx = disk_ord2output[disk_idx]; - VPRINT(this, 1, "Read %zu as-traced records for output #%d\n", - all_sched[disk_idx].size(), output_idx); - outputs_[output_idx].as_traced_cpuid = disk_ord2cpuid[disk_idx]; - VPRINT(this, 1, "Output #%d is as-traced CPU #%" PRId64 "\n", output_idx, - outputs_[output_idx].as_traced_cpuid); - // Update the stop_instruction field and collapse consecutive entries while - // inserting into the final location. - int start_consec = -1; - for (int sched_idx = 0; sched_idx < static_cast(all_sched[disk_idx].size()); - ++sched_idx) { - auto &segment = all_sched[disk_idx][sched_idx]; - if (!segment.valid) - continue; - auto find = start2stop[segment.input].find(segment.start_instruction); - ++find; - if (find == start2stop[segment.input].end()) - segment.stop_instruction = std::numeric_limits::max(); - else - segment.stop_instruction = *find; - VPRINT(this, 4, - "as-read segment #%d: input=%d start=%" PRId64 " stop=%" PRId64 - " time=%" PRId64 "\n", - sched_idx, segment.input, segment.start_instruction, - segment.stop_instruction, segment.timestamp); - if (sched_idx + 1 < static_cast(all_sched[disk_idx].size()) && - segment.input == all_sched[disk_idx][sched_idx + 1].input && - segment.stop_instruction > - all_sched[disk_idx][sched_idx + 1].start_instruction) { - // A second sanity check. - error_string_ = "Invalid decreasing start field in schedule file"; - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } else if (sched_idx + 1 < static_cast(all_sched[disk_idx].size()) && - segment.input == all_sched[disk_idx][sched_idx + 1].input && - segment.stop_instruction == - all_sched[disk_idx][sched_idx + 1].start_instruction) { - // Collapse into next. - if (start_consec == -1) - start_consec = sched_idx; - } else { - schedule_output_tracker_t &toadd = start_consec >= 0 - ? all_sched[disk_idx][start_consec] - : all_sched[disk_idx][sched_idx]; - outputs_[output_idx].record.emplace_back( - schedule_record_t::DEFAULT, toadd.input, toadd.start_instruction, - all_sched[disk_idx][sched_idx].stop_instruction, toadd.timestamp); - start_consec = -1; - VDO(this, 3, { - auto &added = outputs_[output_idx].record.back(); - VPRINT(this, 3, - "segment #%zu: input=%d start=%" PRId64 " stop=%" PRId64 - " time=%" PRId64 "\n", - outputs_[output_idx].record.size() - 1, added.key.input, - added.value.start_instruction, added.stop_instruction, - added.timestamp); - }); - } - } - VPRINT(this, 1, "Collapsed duplicates for %zu as-traced records for output #%d\n", - outputs_[output_idx].record.size(), output_idx); - if (outputs_[output_idx].record.empty()) { - error_string_ = "Empty as-traced schedule"; - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } - if (outputs_[output_idx].record[0].value.start_instruction != 0) { - VPRINT(this, 1, "Initial input for output #%d is: wait state\n", output_idx); - set_cur_input(output_idx, sched_type_t::INVALID_INPUT_ORDINAL); - outputs_[output_idx].waiting = true; - outputs_[output_idx].record_index->store(-1, std::memory_order_release); - } else { - VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx, - outputs_[output_idx].record[0].key.input); - set_cur_input(output_idx, outputs_[output_idx].record[0].key.input); - } - } - return sched_type_t::STATUS_SUCCESS; -} - template typename scheduler_tmpl_t::scheduler_status_t scheduler_impl_tmpl_t::create_regions_from_times( @@ -3754,38 +3408,8 @@ typename scheduler_tmpl_t::stream_status_t scheduler_impl_tmpl_t::set_output_active(output_ordinal_t output, bool active) { - if (options_.mapping != sched_type_t::MAP_TO_ANY_OUTPUT) - return sched_type_t::STATUS_INVALID; - if (outputs_[output].active->load(std::memory_order_acquire) == active) - return sched_type_t::STATUS_OK; - outputs_[output].active->store(active, std::memory_order_release); - VPRINT(this, 2, "Output stream %d is now %s\n", output, - active ? "active" : "inactive"); - std::vector ordinals; - if (!active) { - // Make the now-inactive output's input available for other cores. - // This will reset its quantum too. - // We aren't switching on a just-read instruction not passed to the consumer, - // if the queue is empty. - input_ordinal_t cur_input = outputs_[output].cur_input; - if (cur_input != sched_type_t::INVALID_INPUT_ORDINAL) { - if (inputs_[cur_input].queue.empty()) - inputs_[cur_input].switching_pre_instruction = true; - set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); - } - // Move the ready_queue to other outputs. - { - auto lock = acquire_scoped_output_lock_if_necessary(output); - while (!outputs_[output].ready_queue.queue.empty()) { - input_info_t *tomove = outputs_[output].ready_queue.queue.top(); - ordinals.push_back(tomove->index); - outputs_[output].ready_queue.queue.pop(); - } - } - } else { - outputs_[output].waiting = true; - } - return rebalance_queues(output, ordinals); + // Only supported in scheduler_dynamic_tmpl_t subclass. + return sched_type_t::STATUS_INVALID; } template @@ -3828,147 +3452,6 @@ scheduler_impl_tmpl_t::print_queue_stats() VPRINT(this, 0, "%s\n", ostr.str().c_str()); } -template -typename scheduler_tmpl_t::stream_status_t -scheduler_impl_tmpl_t::rebalance_queues( - output_ordinal_t triggering_output, std::vector inputs_to_add) -{ - std::thread::id nobody; - if (!rebalancer_.compare_exchange_weak(nobody, std::this_thread::get_id(), - std::memory_order_release, - std::memory_order_relaxed)) { - // Someone else is rebalancing. - return sched_type_t::STATUS_OK; - } - stream_status_t status = sched_type_t::STATUS_OK; - assert(options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT); - VPRINT(this, 1, "Output %d triggered a rebalance @%" PRIu64 ":\n", triggering_output, - get_output_time(triggering_output)); - // First, update the time to avoid more threads coming here. - last_rebalance_time_.store(get_output_time(triggering_output), - std::memory_order_release); - VPRINT(this, 2, "Before rebalance:\n"); - VDO(this, 2, { print_queue_stats(); }); - ++outputs_[triggering_output] - .stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_REBALANCES]; - - // Workaround to avoid hangs when _SCHEDULE and/or _DIRECT_THREAD_SWITCH - // directives miss their targets (due to running with a subset of the - // original threads, or other scenarios) and we end up with no scheduled - // inputs but a set of unscheduled inputs who will never be scheduled. - // TODO i#6959: Just exit early instead, maybe under a flag. - // It would help to see what % of total records we've processed. - size_t unsched_size = 0; - { - std::lock_guard unsched_lock(*unscheduled_priority_.lock); - unsched_size = unscheduled_priority_.queue.size(); - } - if (live_input_count_.load(std::memory_order_acquire) == - static_cast(unsched_size)) { - VPRINT( - this, 1, - "rebalancing moving entire unscheduled queue (%zu entries) to ready_queues\n", - unsched_size); - { - std::lock_guard unsched_lock(*unscheduled_priority_.lock); - while (!unscheduled_priority_.queue.empty()) { - input_info_t *tomove = unscheduled_priority_.queue.top(); - inputs_to_add.push_back(tomove->index); - unscheduled_priority_.queue.pop(); - } - } - for (input_ordinal_t input : inputs_to_add) { - std::lock_guard lock(*inputs_[input].lock); - inputs_[input].unscheduled = false; - } - } - - int live_inputs = live_input_count_.load(std::memory_order_acquire); - int live_outputs = 0; - for (unsigned int i = 0; i < outputs_.size(); ++i) { - if (outputs_[i].active->load(std::memory_order_acquire)) - ++live_outputs; - } - double avg_per_output = live_inputs / static_cast(live_outputs); - unsigned int avg_ceiling = static_cast(std::ceil(avg_per_output)); - unsigned int avg_floor = static_cast(std::floor(avg_per_output)); - int iteration = 0; - do { - // Walk the outputs, filling too-short queues from inputs_to_add and - // shrinking too-long queues into inputs_to_add. We may need a 2nd pass - // for this; and a 3rd pass if bindings prevent even splitting. - VPRINT( - this, 3, - "Rebalance iteration %d inputs_to_add size=%zu avg_per_output=%4.1f %d-%d\n", - iteration, inputs_to_add.size(), avg_per_output, avg_floor, avg_ceiling); - // We're giving up the output locks as we go, so there may be some stealing - // in the middle of our operation, but the rebalancing is approximate anyway. - for (unsigned int i = 0; i < outputs_.size(); ++i) { - if (!outputs_[i].active->load(std::memory_order_acquire)) - continue; - auto lock = acquire_scoped_output_lock_if_necessary(i); - // Only remove on the 1st iteration; later we can exceed due to binding - // constraints. - while (iteration == 0 && outputs_[i].ready_queue.queue.size() > avg_ceiling) { - input_info_t *queue_next = nullptr; - // We use our regular pop_from_ready_queue which means we leave - // blocked inputs on the queue: those do not get rebalanced. - // XXX: Should we revisit that? - // - // We remove from the back to avoid penalizing the next-to-run entries - // at the front of the queue by putting them at the back of another - // queue. - status = pop_from_ready_queue_hold_locks( - i, sched_type_t::INVALID_OUTPUT_ORDINAL, queue_next, - /*from_back=*/true); - if (status == sched_type_t::STATUS_OK && queue_next != nullptr) { - VPRINT(this, 3, - "Rebalance iteration %d: output %d giving up input %d\n", - iteration, i, queue_next->index); - inputs_to_add.push_back(queue_next->index); - } else - break; - } - std::vector incompatible_inputs; - // If we reach the 3rd iteration, we have fussy inputs with bindings. - // Try to add them to every output. - while ( - (outputs_[i].ready_queue.queue.size() < avg_ceiling || iteration > 1) && - !inputs_to_add.empty()) { - input_ordinal_t ordinal = inputs_to_add.back(); - inputs_to_add.pop_back(); - input_info_t &input = inputs_[ordinal]; - std::lock_guard input_lock(*input.lock); - if (input.binding.empty() || - input.binding.find(i) != input.binding.end()) { - VPRINT(this, 3, "Rebalance iteration %d: output %d taking input %d\n", - iteration, i, ordinal); - add_to_ready_queue_hold_locks(i, &input); - } else { - incompatible_inputs.push_back(ordinal); - } - } - inputs_to_add.insert(inputs_to_add.end(), incompatible_inputs.begin(), - incompatible_inputs.end()); - } - ++iteration; - if (iteration >= 3 && !inputs_to_add.empty()) { - // This is possible with bindings limited to inactive outputs. - // XXX: Rather than return an error, we could add to the unscheduled queue, - // but do not mark the input unscheduled. Then when an output is - // marked active, we could walk the unscheduled queue and take - // inputs not marked unscheduled. - VPRINT(this, 1, "Rebalance hit impossible binding\n"); - status = sched_type_t::STATUS_IMPOSSIBLE_BINDING; - break; - } - } while (!inputs_to_add.empty()); - VPRINT(this, 2, "After:\n"); - VDO(this, 2, { print_queue_stats(); }); - rebalancer_.store(std::thread::id(), std::memory_order_release); - return status; -} - template class scheduler_impl_tmpl_t; template class scheduler_impl_tmpl_t; diff --git a/clients/drcachesim/scheduler/scheduler_impl.h b/clients/drcachesim/scheduler/scheduler_impl.h index 087579e8aca..a8c84f7cc55 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.h +++ b/clients/drcachesim/scheduler/scheduler_impl.h @@ -107,10 +107,7 @@ template class scheduler_impl_tmpl_t using switch_type_t = typename sched_type_t::switch_type_t; public: - scheduler_impl_tmpl_t() - { - last_rebalance_time_.store(0, std::memory_order_relaxed); - } + scheduler_impl_tmpl_t() = default; virtual ~scheduler_impl_tmpl_t(); virtual scheduler_status_t @@ -428,10 +425,6 @@ template class scheduler_impl_tmpl_t output_ordinal_t for_output, input_info_t *&new_input, bool from_back = false); - stream_status_t - rebalance_queues(output_ordinal_t triggering_output, - std::vector inputs_to_add); - // Up to the caller to check verbosity before calling. void print_queue_stats(); @@ -588,9 +581,9 @@ template class scheduler_impl_tmpl_t static constexpr uint64_t INSTRS_PER_US = 2000; // Called just once at initialization time to set the initial input-to-output - // mappings and state. - scheduler_status_t - set_initial_schedule(std::unordered_map> &workload2inputs); + // mappings and state for the particular mapping_t mode. + virtual scheduler_status_t + set_initial_schedule(std::unordered_map> &workload2inputs) = 0; // Assumed to only be called at initialization time. // Reads ahead in each input to find its filetype, and if "gather_timestamps" @@ -670,9 +663,6 @@ template class scheduler_impl_tmpl_t record_schedule_skip(output_ordinal_t output, input_ordinal_t input, uint64_t start_instruction, uint64_t stop_instruction); - scheduler_status_t - read_and_instantiate_traced_schedule(); - scheduler_status_t create_regions_from_times(const std::unordered_map &workload_tids, input_workload_t &workload); @@ -704,9 +694,6 @@ template class scheduler_impl_tmpl_t std::vector> &start2stop, std::vector> &all_sched); - scheduler_status_t - read_recorded_schedule(); - scheduler_status_t read_switch_sequences(); @@ -904,7 +891,7 @@ template class scheduler_impl_tmpl_t stream_status_t stop_speculation(output_ordinal_t output); - stream_status_t + virtual stream_status_t set_output_active(output_ordinal_t output, bool active); // Caller must hold the input's lock. @@ -950,9 +937,6 @@ template class scheduler_impl_tmpl_t mutex_dbg_owned unsched_lock_; // Inputs that are unscheduled indefinitely until directly targeted. input_queue_t unscheduled_priority_; - // Rebalancing coordination. - std::atomic rebalancer_; - std::atomic last_rebalance_time_; // Count of inputs not yet at eof. std::atomic live_input_count_; // In replay mode, count of outputs not yet at the end of the replay sequence. @@ -1010,26 +994,53 @@ typedef scheduler_impl_tmpl_t class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_t { +public: + scheduler_dynamic_tmpl_t() + { + last_rebalance_time_.store(0, std::memory_order_relaxed); + } + private: using sched_type_t = scheduler_tmpl_t; using input_ordinal_t = typename sched_type_t::input_ordinal_t; using output_ordinal_t = typename sched_type_t::output_ordinal_t; + using scheduler_status_t = typename sched_type_t::scheduler_status_t; using stream_status_t = typename sched_type_t::stream_status_t; - using input_info_t = - typename scheduler_impl_tmpl_t::input_info_t; - using output_info_t = - typename scheduler_impl_tmpl_t::output_info_t; - using schedule_record_t = - typename scheduler_impl_tmpl_t::schedule_record_t; + using typename scheduler_impl_tmpl_t::input_info_t; + using typename scheduler_impl_tmpl_t::output_info_t; + using typename scheduler_impl_tmpl_t::schedule_record_t; + using + typename scheduler_impl_tmpl_t::InputTimestampComparator; + using scheduler_impl_tmpl_t::options_; + using scheduler_impl_tmpl_t::outputs_; + using scheduler_impl_tmpl_t::inputs_; + using scheduler_impl_tmpl_t::error_string_; + using scheduler_impl_tmpl_t::unscheduled_priority_; + using scheduler_impl_tmpl_t::set_cur_input; protected: + scheduler_status_t + set_initial_schedule( + std::unordered_map> &workload2inputs) override; + stream_status_t pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) override; + stream_status_t check_for_input_switch(output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time, bool &need_new_input, bool &preempt, uint64_t &blocked_time) override; + stream_status_t + set_output_active(output_ordinal_t output, bool active) override; + + stream_status_t + rebalance_queues(output_ordinal_t triggering_output, + std::vector inputs_to_add); + + // Rebalancing coordination. + std::atomic rebalancer_; + std::atomic last_rebalance_time_; }; // Specialized code for replaying schedules: either a recorded dynamic schedule @@ -1040,20 +1051,38 @@ class scheduler_replay_tmpl_t : public scheduler_impl_tmpl_t; using input_ordinal_t = typename sched_type_t::input_ordinal_t; using output_ordinal_t = typename sched_type_t::output_ordinal_t; + using scheduler_status_t = typename sched_type_t::scheduler_status_t; using stream_status_t = typename sched_type_t::stream_status_t; - using schedule_record_t = - typename scheduler_impl_tmpl_t::schedule_record_t; - using input_info_t = - typename scheduler_impl_tmpl_t::input_info_t; + using typename scheduler_impl_tmpl_t::schedule_record_t; + using typename scheduler_impl_tmpl_t::input_info_t; + using + typename scheduler_impl_tmpl_t::schedule_output_tracker_t; + using + typename scheduler_impl_tmpl_t::schedule_input_tracker_t; + using scheduler_impl_tmpl_t::options_; + using scheduler_impl_tmpl_t::outputs_; + using scheduler_impl_tmpl_t::inputs_; + using scheduler_impl_tmpl_t::error_string_; + using scheduler_impl_tmpl_t::set_cur_input; protected: + scheduler_status_t + set_initial_schedule( + std::unordered_map> &workload2inputs) override; + stream_status_t pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) override; + stream_status_t check_for_input_switch(output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time, bool &need_new_input, bool &preempt, uint64_t &blocked_time) override; + scheduler_status_t + read_recorded_schedule(); + + scheduler_status_t + read_and_instantiate_traced_schedule(); }; // Specialized code for fixed "schedules": typically serial or parallel analyzer @@ -1064,14 +1093,24 @@ class scheduler_fixed_tmpl_t : public scheduler_impl_tmpl_t; using input_ordinal_t = typename sched_type_t::input_ordinal_t; using output_ordinal_t = typename sched_type_t::output_ordinal_t; + using scheduler_status_t = typename sched_type_t::scheduler_status_t; using stream_status_t = typename sched_type_t::stream_status_t; - using input_info_t = - typename scheduler_impl_tmpl_t::input_info_t; + using typename scheduler_impl_tmpl_t::input_info_t; + using scheduler_impl_tmpl_t::options_; + using scheduler_impl_tmpl_t::outputs_; + using scheduler_impl_tmpl_t::inputs_; + using scheduler_impl_tmpl_t::error_string_; + using scheduler_impl_tmpl_t::set_cur_input; protected: + scheduler_status_t + set_initial_schedule( + std::unordered_map> &workload2inputs) override; + stream_status_t pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) override; + stream_status_t check_for_input_switch(output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time, bool &need_new_input, diff --git a/clients/drcachesim/scheduler/scheduler_replay.cpp b/clients/drcachesim/scheduler/scheduler_replay.cpp index dd3a3758672..63cccfdc7c6 100644 --- a/clients/drcachesim/scheduler/scheduler_replay.cpp +++ b/clients/drcachesim/scheduler/scheduler_replay.cpp @@ -35,11 +35,10 @@ #include "scheduler.h" #include "scheduler_impl.h" -#include +#include #include #include #include -#include #include "memref.h" #include "mutex_dbg_owned.h" @@ -50,6 +49,246 @@ namespace dynamorio { namespace drmemtrace { +template +typename scheduler_tmpl_t::scheduler_status_t +scheduler_replay_tmpl_t::set_initial_schedule( + std::unordered_map> &workload2inputs) +{ + if (options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY) { + this->live_replay_output_count_.store(static_cast(outputs_.size()), + std::memory_order_release); + if (options_.schedule_replay_istream == nullptr || + options_.schedule_record_ostream != nullptr) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + scheduler_status_t status = read_recorded_schedule(); + if (status != sched_type_t::STATUS_SUCCESS) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } else if (options_.schedule_replay_istream != nullptr) { + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } else if (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT && + options_.replay_as_traced_istream != nullptr) { + // Even for just one output we honor a request to replay the schedule + // (although it should match the analyzer serial mode so there's no big + // benefit to reading the schedule file. The analyzer serial mode or other + // special cases of one output don't set the replay_as_traced_istream + // field.) + scheduler_status_t status = read_and_instantiate_traced_schedule(); + if (status != sched_type_t::STATUS_SUCCESS) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + // Now leverage the regular replay code. + options_.mapping = sched_type_t::MAP_AS_PREVIOUSLY; + } else { + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + return sched_type_t::STATUS_SUCCESS; +} + +template +typename scheduler_tmpl_t::scheduler_status_t +scheduler_replay_tmpl_t::read_recorded_schedule() +{ + if (options_.schedule_replay_istream == nullptr) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + + schedule_record_t record; + // We assume we can easily fit the whole context switch sequence in memory. + // If that turns out not to be the case for very long traces, we deliberately + // used an archive format so we could do parallel incremental reads. + // (Conversely, if we want to commit to storing in memory, we could use a + // non-archive format and store the output ordinal in the version record.) + for (int i = 0; i < static_cast(outputs_.size()); ++i) { + std::string err = options_.schedule_replay_istream->open_component( + this->recorded_schedule_component_name(i)); + if (!err.empty()) { + error_string_ = "Failed to open schedule_replay_istream component " + + this->recorded_schedule_component_name(i) + ": " + err; + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + // XXX: This could be made more efficient if we stored the record count + // in the version field's stop_instruction field or something so we can + // size the vector up front. As this only happens once we do not bother + // and live with a few vector resizes. + bool saw_footer = false; + while (options_.schedule_replay_istream->read(reinterpret_cast(&record), + sizeof(record))) { + if (record.type == schedule_record_t::VERSION) { + if (record.key.version != schedule_record_t::VERSION_CURRENT) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } else if (record.type == schedule_record_t::FOOTER) { + saw_footer = true; + break; + } else + outputs_[i].record.push_back(record); + } + if (!saw_footer) { + error_string_ = "Record file missing footer"; + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + VPRINT(this, 1, "Read %zu recorded records for output #%d\n", + outputs_[i].record.size(), i); + } + // See if there was more data in the file (we do this after reading to not + // mis-report i/o or path errors as this error). + std::string err = options_.schedule_replay_istream->open_component( + this->recorded_schedule_component_name( + static_cast(outputs_.size()))); + if (err.empty()) { + error_string_ = "Not enough output streams for recorded file"; + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + for (int i = 0; i < static_cast(outputs_.size()); ++i) { + if (outputs_[i].record.empty()) { + // XXX i#6630: We should auto-set the output count and avoid + // having extra outputs; these complicate idle computations, etc. + VPRINT(this, 1, "output %d empty: returning eof up front\n", i); + set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); + outputs_[i].at_eof = true; + } else if (outputs_[i].record[0].type == schedule_record_t::IDLE || + outputs_[i].record[0].type == schedule_record_t::IDLE_BY_COUNT) { + set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); + outputs_[i].waiting = true; + if (outputs_[i].record[0].type == schedule_record_t::IDLE) { + // Convert a legacy idle duration from microseconds to record counts. + outputs_[i].record[0].value.idle_duration = + static_cast(options_.time_units_per_us * + outputs_[i].record[0].value.idle_duration); + } + outputs_[i].idle_start_count = -1; // Updated on first next_record(). + VPRINT(this, 3, "output %d starting out idle\n", i); + } else { + assert(outputs_[i].record[0].type == schedule_record_t::DEFAULT); + set_cur_input(i, outputs_[i].record[0].key.input); + } + } + return sched_type_t::STATUS_SUCCESS; +} + +template +typename scheduler_tmpl_t::scheduler_status_t +scheduler_replay_tmpl_t::read_and_instantiate_traced_schedule() +{ + std::vector> start2stop(inputs_.size()); + // We also want to collapse same-cpu consecutive records so we start with + // a temporary local vector. + std::vector> all_sched(outputs_.size()); + // Work around i#6107 by tracking counts sorted by timestamp for each input. + std::vector> input_sched(inputs_.size()); + // These hold entries added in the on-disk (unsorted) order. + std::vector disk_ord2index; // Initially [i] holds i. + std::vector disk_ord2cpuid; // [i] holds cpuid for entry i. + scheduler_status_t res = this->read_traced_schedule( + input_sched, start2stop, all_sched, disk_ord2index, disk_ord2cpuid); + if (res != sched_type_t::STATUS_SUCCESS) + return res; + // Sort by cpuid to get a more natural ordering. + // Probably raw2trace should do this in the first place, but we have many + // schedule files already out there so we still need a sort here. + // If we didn't have cross-indices pointing at all_sched from input_sched, we + // would just sort all_sched: but instead we have to construct a separate + // ordering structure. + std::sort(disk_ord2index.begin(), disk_ord2index.end(), + [disk_ord2cpuid](const output_ordinal_t &l, const output_ordinal_t &r) { + return disk_ord2cpuid[l] < disk_ord2cpuid[r]; + }); + // disk_ord2index[i] used to hold i; now after sorting it holds the ordinal in + // the disk file that has the ith largest cpuid. We need to turn that into + // the output_idx ordinal for the cpu at ith ordinal in the disk file, for + // which we use a new vector disk_ord2output. + // E.g., if the original file was in this order disk_ord2cpuid = {6,2,3,7}, + // disk_ord2index after sorting would hold {1,2,0,3}, which we want to turn + // into disk_ord2output = {2,0,1,3}. + std::vector disk_ord2output(disk_ord2index.size()); + for (size_t i = 0; i < disk_ord2index.size(); ++i) { + disk_ord2output[disk_ord2index[i]] = static_cast(i); + } + for (int disk_idx = 0; disk_idx < static_cast(outputs_.size()); + ++disk_idx) { + if (disk_idx >= static_cast(disk_ord2index.size())) { + // XXX i#6630: We should auto-set the output count and avoid + // having extra ouputs; these complicate idle computations, etc. + VPRINT(this, 1, "Output %d empty: returning eof up front\n", disk_idx); + outputs_[disk_idx].at_eof = true; + set_cur_input(disk_idx, sched_type_t::INVALID_INPUT_ORDINAL); + continue; + } + output_ordinal_t output_idx = disk_ord2output[disk_idx]; + VPRINT(this, 1, "Read %zu as-traced records for output #%d\n", + all_sched[disk_idx].size(), output_idx); + outputs_[output_idx].as_traced_cpuid = disk_ord2cpuid[disk_idx]; + VPRINT(this, 1, "Output #%d is as-traced CPU #%" PRId64 "\n", output_idx, + outputs_[output_idx].as_traced_cpuid); + // Update the stop_instruction field and collapse consecutive entries while + // inserting into the final location. + int start_consec = -1; + for (int sched_idx = 0; sched_idx < static_cast(all_sched[disk_idx].size()); + ++sched_idx) { + auto &segment = all_sched[disk_idx][sched_idx]; + if (!segment.valid) + continue; + auto find = start2stop[segment.input].find(segment.start_instruction); + ++find; + if (find == start2stop[segment.input].end()) + segment.stop_instruction = std::numeric_limits::max(); + else + segment.stop_instruction = *find; + VPRINT(this, 4, + "as-read segment #%d: input=%d start=%" PRId64 " stop=%" PRId64 + " time=%" PRId64 "\n", + sched_idx, segment.input, segment.start_instruction, + segment.stop_instruction, segment.timestamp); + if (sched_idx + 1 < static_cast(all_sched[disk_idx].size()) && + segment.input == all_sched[disk_idx][sched_idx + 1].input && + segment.stop_instruction > + all_sched[disk_idx][sched_idx + 1].start_instruction) { + // A second sanity check. + error_string_ = "Invalid decreasing start field in schedule file"; + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } else if (sched_idx + 1 < static_cast(all_sched[disk_idx].size()) && + segment.input == all_sched[disk_idx][sched_idx + 1].input && + segment.stop_instruction == + all_sched[disk_idx][sched_idx + 1].start_instruction) { + // Collapse into next. + if (start_consec == -1) + start_consec = sched_idx; + } else { + schedule_output_tracker_t &toadd = start_consec >= 0 + ? all_sched[disk_idx][start_consec] + : all_sched[disk_idx][sched_idx]; + outputs_[output_idx].record.emplace_back( + schedule_record_t::DEFAULT, toadd.input, toadd.start_instruction, + all_sched[disk_idx][sched_idx].stop_instruction, toadd.timestamp); + start_consec = -1; + VDO(this, 3, { + auto &added = outputs_[output_idx].record.back(); + VPRINT(this, 3, + "segment #%zu: input=%d start=%" PRId64 " stop=%" PRId64 + " time=%" PRId64 "\n", + outputs_[output_idx].record.size() - 1, added.key.input, + added.value.start_instruction, added.stop_instruction, + added.timestamp); + }); + } + } + VPRINT(this, 1, "Collapsed duplicates for %zu as-traced records for output #%d\n", + outputs_[output_idx].record.size(), output_idx); + if (outputs_[output_idx].record.empty()) { + error_string_ = "Empty as-traced schedule"; + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + if (outputs_[output_idx].record[0].value.start_instruction != 0) { + VPRINT(this, 1, "Initial input for output #%d is: wait state\n", output_idx); + set_cur_input(output_idx, sched_type_t::INVALID_INPUT_ORDINAL); + outputs_[output_idx].waiting = true; + outputs_[output_idx].record_index->store(-1, std::memory_order_release); + } else { + VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx, + outputs_[output_idx].record[0].key.input); + set_cur_input(output_idx, outputs_[output_idx].record[0].key.input); + } + } + return sched_type_t::STATUS_SUCCESS; +} + template typename scheduler_tmpl_t::stream_status_t scheduler_replay_tmpl_t::pick_next_input_for_mode( @@ -57,27 +296,26 @@ scheduler_replay_tmpl_t::pick_next_input_for_mode( input_ordinal_t &index) { // Our own index is only modified by us so we can cache it here. - int record_index = - this->outputs_[output].record_index->load(std::memory_order_acquire); - if (record_index + 1 >= static_cast(this->outputs_[output].record.size())) { - if (!this->outputs_[output].at_eof) { - this->outputs_[output].at_eof = true; + int record_index = outputs_[output].record_index->load(std::memory_order_acquire); + if (record_index + 1 >= static_cast(outputs_[output].record.size())) { + if (!outputs_[output].at_eof) { + outputs_[output].at_eof = true; this->live_replay_output_count_.fetch_add(-1, std::memory_order_release); } - return this->eof_or_idle(output, this->outputs_[output].cur_input); + return this->eof_or_idle(output, outputs_[output].cur_input); } - schedule_record_t &segment = this->outputs_[output].record[record_index + 1]; + schedule_record_t &segment = outputs_[output].record[record_index + 1]; if (segment.type == schedule_record_t::IDLE || segment.type == schedule_record_t::IDLE_BY_COUNT) { - this->outputs_[output].waiting = true; + outputs_[output].waiting = true; if (segment.type == schedule_record_t::IDLE) { // Convert a legacy idle duration from microseconds to record counts. segment.value.idle_duration = static_cast( - this->options_.time_units_per_us * segment.value.idle_duration); + options_.time_units_per_us * segment.value.idle_duration); } - this->outputs_[output].idle_start_count = this->outputs_[output].idle_count; - this->outputs_[output].record_index->fetch_add(1, std::memory_order_release); - ++this->outputs_[output].idle_count; + outputs_[output].idle_start_count = outputs_[output].idle_count; + outputs_[output].record_index->fetch_add(1, std::memory_order_release); + ++outputs_[output].idle_count; VPRINT(this, 5, "%s[%d]: next replay segment idle for %" PRIu64 "\n", __FUNCTION__, output, segment.value.idle_duration); return sched_type_t::STATUS_IDLE; @@ -86,28 +324,25 @@ scheduler_replay_tmpl_t::pick_next_input_for_mode( VPRINT(this, 5, "%s[%d]: next replay segment in=%d (@%" PRId64 ") type=%d start=%" PRId64 " end=%" PRId64 "\n", - __FUNCTION__, output, index, this->get_instr_ordinal(this->inputs_[index]), + __FUNCTION__, output, index, this->get_instr_ordinal(inputs_[index]), segment.type, segment.value.start_instruction, segment.stop_instruction); { - std::lock_guard lock(*this->inputs_[index].lock); - if (this->get_instr_ordinal(this->inputs_[index]) > - segment.value.start_instruction) { + std::lock_guard lock(*inputs_[index].lock); + if (this->get_instr_ordinal(inputs_[index]) > segment.value.start_instruction) { VPRINT(this, 1, "WARNING: next_record[%d]: input %d wants instr #%" PRId64 " but it is already at #%" PRId64 "\n", output, index, segment.value.start_instruction, - this->get_instr_ordinal(this->inputs_[index])); + this->get_instr_ordinal(inputs_[index])); } - if (this->get_instr_ordinal(this->inputs_[index]) < - segment.value.start_instruction && + if (this->get_instr_ordinal(inputs_[index]) < segment.value.start_instruction && // Don't wait for an ROI that starts at the beginning. segment.value.start_instruction > 1 && // The output may have begun in the wait state. (record_index == -1 || // When we skip our separator+timestamp markers are at the // prior instr ord so do not wait for that. - (this->outputs_[output].record[record_index].type != - schedule_record_t::SKIP && + (outputs_[output].record[record_index].type != schedule_record_t::SKIP && // Don't wait if we're at the end and just need the end record. segment.type != schedule_record_t::SYNTHETIC_END))) { // Some other output stream has not advanced far enough, and we do @@ -124,29 +359,28 @@ scheduler_replay_tmpl_t::pick_next_input_for_mode( output, index, segment.value.start_instruction); // Give up this input and go into a wait state. // We'll come back here on the next next_record() call. - this->set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL, - // Avoid livelock if prev input == cur input which happens - // with back-to-back segments with the same input. - index == this->outputs_[output].cur_input); - this->outputs_[output].waiting = true; + set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL, + // Avoid livelock if prev input == cur input which happens + // with back-to-back segments with the same input. + index == outputs_[output].cur_input); + outputs_[output].waiting = true; return sched_type_t::STATUS_WAIT; } } // Also wait if this segment is ahead of the next-up segment on another // output. We only have a timestamp per context switch so we can't // enforce finer-grained timing replay. - if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { - for (int i = 0; i < static_cast(this->outputs_.size()); ++i) { + if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { + for (int i = 0; i < static_cast(outputs_.size()); ++i) { if (i == output) continue; // Do an atomic load once and use it to de-reference if it's not at the end. // This is safe because if the target advances to the end concurrently it // will only cause an extra wait that will just come back here and then // continue. - int other_index = - this->outputs_[i].record_index->load(std::memory_order_acquire); - if (other_index + 1 < static_cast(this->outputs_[i].record.size()) && - segment.timestamp > this->outputs_[i].record[other_index + 1].timestamp) { + int other_index = outputs_[i].record_index->load(std::memory_order_acquire); + if (other_index + 1 < static_cast(outputs_[i].record.size()) && + segment.timestamp > outputs_[i].record[other_index + 1].timestamp) { VPRINT(this, 3, "next_record[%d]: waiting because timestamp %" PRIu64 " is ahead of output %d\n", @@ -157,64 +391,60 @@ scheduler_replay_tmpl_t::pick_next_input_for_mode( // order due to using prior values, to avoid hanging. We try to avoid // this by using wall-clock time in record_schedule_segment() rather than // the stored output time. - this->set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); - this->outputs_[output].waiting = true; + set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); + outputs_[output].waiting = true; return sched_type_t::STATUS_WAIT; } } } if (segment.type == schedule_record_t::SYNTHETIC_END) { - std::lock_guard lock(*this->inputs_[index].lock); + std::lock_guard lock(*inputs_[index].lock); // We're past the final region of interest and we need to insert // a synthetic thread exit record. We need to first throw out the // queued candidate record, if any. - this->clear_input_queue(this->inputs_[index]); - this->inputs_[index].queue.push_back( - this->create_thread_exit(this->inputs_[index].tid)); + this->clear_input_queue(inputs_[index]); + inputs_[index].queue.push_back(this->create_thread_exit(inputs_[index].tid)); VPRINT(this, 2, "early end for input %d\n", index); // We're done with this entry but we need the queued record to be read, // so we do not move past the entry. - this->outputs_[output].record_index->fetch_add(1, std::memory_order_release); - stream_status_t status = this->mark_input_eof(this->inputs_[index]); + outputs_[output].record_index->fetch_add(1, std::memory_order_release); + stream_status_t status = this->mark_input_eof(inputs_[index]); if (status != sched_type_t::STATUS_OK) return status; return sched_type_t::STATUS_SKIPPED; } else if (segment.type == schedule_record_t::SKIP) { - std::lock_guard lock(*this->inputs_[index].lock); - uint64_t cur_reader_instr = - this->inputs_[index].reader->get_instruction_ordinal(); + std::lock_guard lock(*inputs_[index].lock); + uint64_t cur_reader_instr = inputs_[index].reader->get_instruction_ordinal(); VPRINT(this, 2, "next_record[%d]: skipping from %" PRId64 " to %" PRId64 " in %d for schedule\n", output, cur_reader_instr, segment.stop_instruction, index); - auto status = this->skip_instructions(this->inputs_[index], + auto status = this->skip_instructions(inputs_[index], segment.stop_instruction - cur_reader_instr - 1 /*exclusive*/); // Increment the region to get window id markers with ordinals. - this->inputs_[index].cur_region++; + inputs_[index].cur_region++; if (status != sched_type_t::STATUS_SKIPPED) return sched_type_t::STATUS_INVALID; // We're done with the skip so move to and past it. - this->outputs_[output].record_index->fetch_add(2, std::memory_order_release); + outputs_[output].record_index->fetch_add(2, std::memory_order_release); return sched_type_t::STATUS_SKIPPED; } else { VPRINT(this, 2, "next_record[%d]: advancing to input %d instr #%" PRId64 "\n", output, index, segment.value.start_instruction); } - this->outputs_[output].record_index->fetch_add(1, std::memory_order_release); + outputs_[output].record_index->fetch_add(1, std::memory_order_release); VDO(this, 2, { // Our own index is only modified by us so we can cache it here. - int local_index = - this->outputs_[output].record_index->load(std::memory_order_acquire); + int local_index = outputs_[output].record_index->load(std::memory_order_acquire); if (local_index >= 0 && - local_index < static_cast(this->outputs_[output].record.size())) { - const schedule_record_t &local_segment = - this->outputs_[output].record[local_index]; + local_index < static_cast(outputs_[output].record.size())) { + const schedule_record_t &local_segment = outputs_[output].record[local_index]; int input = local_segment.key.input; VPRINT(this, 2, "next_record[%d]: replay segment in=%d (@%" PRId64 ") type=%d start=%" PRId64 " end=%" PRId64 "\n", - output, input, this->get_instr_ordinal(this->inputs_[input]), + output, input, this->get_instr_ordinal(inputs_[input]), local_segment.type, local_segment.value.start_instruction, local_segment.stop_instruction); } @@ -229,21 +459,19 @@ scheduler_replay_tmpl_t::check_for_input_switch( bool &need_new_input, bool &preempt, uint64_t &blocked_time) { // Our own index is only modified by us so we can cache it here. - int record_index = - this->outputs_[output].record_index->load(std::memory_order_acquire); + int record_index = outputs_[output].record_index->load(std::memory_order_acquire); assert(record_index >= 0); - if (record_index >= static_cast(this->outputs_[output].record.size())) { + if (record_index >= static_cast(outputs_[output].record.size())) { // We're on the last record. VPRINT(this, 4, "next_record[%d]: on last record\n", output); - } else if (this->outputs_[output].record[record_index].type == - schedule_record_t::SKIP) { + } else if (outputs_[output].record[record_index].type == schedule_record_t::SKIP) { VPRINT(this, 5, "next_record[%d]: need new input after skip\n", output); need_new_input = true; - } else if (this->outputs_[output].record[record_index].type == + } else if (outputs_[output].record[record_index].type == schedule_record_t::SYNTHETIC_END) { VPRINT(this, 5, "next_record[%d]: at synthetic end\n", output); } else { - const schedule_record_t &segment = this->outputs_[output].record[record_index]; + const schedule_record_t &segment = outputs_[output].record[record_index]; assert(segment.type == schedule_record_t::DEFAULT); uint64_t start = segment.value.start_instruction; uint64_t stop = segment.stop_instruction; diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index b256fb60e33..586807c192d 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -2966,7 +2966,30 @@ test_replay_multi_threaded(const char *testdir) #ifdef HAS_ZIP // We subclass scheduler_impl_t to access its record struct and functions. -class test_scheduler_t : public scheduler_impl_t { +// First we fill in pure-virtuals to share with a similar class below: +class test_scheduler_base_t : public scheduler_impl_t { +public: + scheduler_status_t + set_initial_schedule( + std::unordered_map> &workload2inputs) override + { + return sched_type_t::STATUS_ERROR_NOT_IMPLEMENTED; + } + stream_status_t + pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, + input_ordinal_t prev_index, input_ordinal_t &index) override + { + return sched_type_t::STATUS_NOT_IMPLEMENTED; + } + stream_status_t + check_for_input_switch(output_ordinal_t output, memref_t &record, input_info_t *input, + uint64_t cur_time, bool &need_new_input, bool &preempt, + uint64_t &blocked_time) override + { + return sched_type_t::STATUS_NOT_IMPLEMENTED; + } +}; +class test_scheduler_t : public test_scheduler_base_t { public: void write_test_schedule(std::string record_fname) @@ -3012,19 +3035,6 @@ class test_scheduler_t : public scheduler_impl_t { sched1.size() * sizeof(sched1[0]))) assert(false); } - stream_status_t - pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, - input_ordinal_t prev_index, input_ordinal_t &index) override - { - return sched_type_t::STATUS_NOT_IMPLEMENTED; - } - stream_status_t - check_for_input_switch(output_ordinal_t output, memref_t &record, input_info_t *input, - uint64_t cur_time, bool &need_new_input, bool &preempt, - uint64_t &blocked_time) override - { - return sched_type_t::STATUS_NOT_IMPLEMENTED; - } }; #endif @@ -3090,7 +3100,8 @@ test_replay_timestamps() #ifdef HAS_ZIP // We subclass scheduler_impl_t to access its record struct and functions. -class test_noeof_scheduler_t : public scheduler_impl_t { +// We subclass scheduler_impl_t to access its record struct and functions. +class test_noeof_scheduler_t : public test_scheduler_base_t { public: void write_test_schedule(std::string record_fname) @@ -3137,19 +3148,6 @@ class test_noeof_scheduler_t : public scheduler_impl_t { sched1.size() * sizeof(sched1[0]))) assert(false); } - stream_status_t - pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, - input_ordinal_t prev_index, input_ordinal_t &index) override - { - return sched_type_t::STATUS_NOT_IMPLEMENTED; - } - stream_status_t - check_for_input_switch(output_ordinal_t output, memref_t &record, input_info_t *input, - uint64_t cur_time, bool &need_new_input, bool &preempt, - uint64_t &blocked_time) override - { - return sched_type_t::STATUS_NOT_IMPLEMENTED; - } }; #endif From ae960a043fe45033b60d8a9dadf7b27d10c8808c Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Wed, 13 Nov 2024 22:00:00 -0500 Subject: [PATCH 2/5] Fix Windows warning --- clients/drcachesim/scheduler/scheduler_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/drcachesim/scheduler/scheduler_impl.cpp b/clients/drcachesim/scheduler/scheduler_impl.cpp index 10e53782773..89f5d65c1a7 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.cpp +++ b/clients/drcachesim/scheduler/scheduler_impl.cpp @@ -891,7 +891,7 @@ scheduler_impl_tmpl_t::init( // filetype we avoid going as far as the timestamp. bool gather_filetype = options_.read_inputs_in_init; if (gather_filetype || gather_timestamps) { - scheduler_status_t res = this->get_initial_input_content(gather_timestamps); + res = this->get_initial_input_content(gather_timestamps); if (res != sched_type_t::STATUS_SUCCESS) { error_string_ = "Failed to read initial input contents for filetype"; if (gather_timestamps) From 36971151c91ca74c18e625d765d32157d337b1f9 Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Wed, 13 Nov 2024 22:36:44 -0500 Subject: [PATCH 3/5] i#6831 sched refactor, step 7: Split eof_or_idle Splits eof_or_idle() into separate overrides in the 3 scheduler subclasses. This then allows moving pop_from_ready_queue*() into the dynamic subclass. Moves process_markers(), ready_queue_empty(), and syscall_incurs_switch() into the dynamic subclass. Issue: #6831 --- .../scheduler/scheduler_dynamic.cpp | 492 +++++++++++++++++- .../drcachesim/scheduler/scheduler_fixed.cpp | 12 + .../drcachesim/scheduler/scheduler_impl.cpp | 487 +---------------- clients/drcachesim/scheduler/scheduler_impl.h | 78 +-- .../drcachesim/scheduler/scheduler_replay.cpp | 18 + .../drcachesim/tests/scheduler_unit_tests.cpp | 5 + 6 files changed, 572 insertions(+), 520 deletions(-) diff --git a/clients/drcachesim/scheduler/scheduler_dynamic.cpp b/clients/drcachesim/scheduler/scheduler_dynamic.cpp index 9897d010f01..7867f811a4e 100644 --- a/clients/drcachesim/scheduler/scheduler_dynamic.cpp +++ b/clients/drcachesim/scheduler/scheduler_dynamic.cpp @@ -155,7 +155,7 @@ scheduler_dynamic_tmpl_t::set_output_active( } // Move the ready_queue to other outputs. { - auto lock = this->acquire_scoped_output_lock_if_necessary(output); + auto lock = acquire_scoped_output_lock_if_necessary(output); while (!outputs_[output].ready_queue.queue.empty()) { input_info_t *tomove = outputs_[output].ready_queue.queue.top(); ordinals.push_back(tomove->index); @@ -174,7 +174,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) { - uint64_t cur_time = this->get_output_time(output); + uint64_t cur_time = get_output_time(output); uint64_t last_time = last_rebalance_time_.load(std::memory_order_acquire); if (last_time == 0) { // Initialize. @@ -200,7 +200,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( VPRINT(this, 2, "next_record[%d]: blocked time %" PRIu64 "\n", output, blocked_time); inputs_[prev_index].blocked_time = blocked_time; - inputs_[prev_index].blocked_start_time = this->get_output_time(output); + inputs_[prev_index].blocked_start_time = get_output_time(output); } } if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL && @@ -217,7 +217,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( target_input_lock.unlock(); { auto target_output_lock = - this->acquire_scoped_output_lock_if_necessary(target_output); + acquire_scoped_output_lock_if_necessary(target_output); target_input_lock.lock(); if (out.ready_queue.queue.find(target)) { VPRINT(this, 2, @@ -400,7 +400,7 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( need_new_input = true; VPRINT(this, 3, "next_record[%d]: input %d going unscheduled\n", output, input->index); - } else if (this->syscall_incurs_switch(input, blocked_time)) { + } else if (syscall_incurs_switch(input, blocked_time)) { // Model as blocking and should switch to a different input. need_new_input = true; VPRINT(this, 3, "next_record[%d]: hit blocking syscall in input %d\n", @@ -475,6 +475,215 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( return sched_type_t::STATUS_OK; } +template +void +scheduler_dynamic_tmpl_t::process_marker( + input_info_t &input, output_ordinal_t output, trace_marker_type_t marker_type, + uintptr_t marker_value) +{ + assert(input.lock->owned_by_cur_thread()); + switch (marker_type) { + case TRACE_MARKER_TYPE_SYSCALL: + input.processing_syscall = true; + input.pre_syscall_timestamp = input.reader->get_last_timestamp(); + break; + case TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL: + input.processing_maybe_blocking_syscall = true; + // Generally we should already have the timestamp from a just-prior + // syscall marker, but we support tests and other synthetic sequences + // with just a maybe-blocking. + input.pre_syscall_timestamp = input.reader->get_last_timestamp(); + break; + case TRACE_MARKER_TYPE_CONTEXT_SWITCH_START: + outputs_[output].in_context_switch_code = true; + ANNOTATE_FALLTHROUGH; + case TRACE_MARKER_TYPE_SYSCALL_TRACE_START: + outputs_[output].in_kernel_code = true; + break; + case TRACE_MARKER_TYPE_CONTEXT_SWITCH_END: + // We have to delay until the next record. + outputs_[output].hit_switch_code_end = true; + ANNOTATE_FALLTHROUGH; + case TRACE_MARKER_TYPE_SYSCALL_TRACE_END: + outputs_[output].in_kernel_code = false; + break; + case TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH: { + if (!options_.honor_direct_switches) + break; + ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_ATTEMPTS]; + memref_tid_t target_tid = marker_value; + auto it = this->tid2input_.find(workload_tid_t(input.workload, target_tid)); + if (it == this->tid2input_.end()) { + VPRINT(this, 1, "Failed to find input for target switch thread %" PRId64 "\n", + target_tid); + } else { + input.switch_to_input = it->second; + } + // Trigger a switch either indefinitely or until timeout. + if (input.skip_next_unscheduled) { + // The underlying kernel mechanism being modeled only supports a single + // request: they cannot accumulate. Timing differences in the trace could + // perhaps result in multiple lining up when the didn't in the real app; + // but changing the scheme here could also push representatives in the + // other direction. + input.skip_next_unscheduled = false; + VPRINT(this, 3, + "input %d unschedule request ignored due to prior schedule request " + "@%" PRIu64 "\n", + input.index, input.reader->get_last_timestamp()); + break; + } + input.unscheduled = true; + if (!options_.honor_infinite_timeouts && input.syscall_timeout_arg == 0) { + // As our scheduling is imperfect we do not risk things being blocked + // indefinitely: we instead have a timeout, but the maximum value. + input.syscall_timeout_arg = options_.block_time_max_us; + if (input.syscall_timeout_arg == 0) + input.syscall_timeout_arg = 1; + } + if (input.syscall_timeout_arg > 0) { + input.blocked_time = scale_blocked_time(input.syscall_timeout_arg); + // Clamp at 1 since 0 means an infinite timeout for unscheduled=true. + if (input.blocked_time == 0) + input.blocked_time = 1; + input.blocked_start_time = get_output_time(output); + VPRINT(this, 3, "input %d unscheduled for %" PRIu64 " @%" PRIu64 "\n", + input.index, input.blocked_time, input.reader->get_last_timestamp()); + } else { + VPRINT(this, 3, "input %d unscheduled indefinitely @%" PRIu64 "\n", + input.index, input.reader->get_last_timestamp()); + } + break; + } + case TRACE_MARKER_TYPE_SYSCALL_ARG_TIMEOUT: + // This is cleared at the post-syscall instr. + input.syscall_timeout_arg = static_cast(marker_value); + break; + case TRACE_MARKER_TYPE_SYSCALL_UNSCHEDULE: + if (!options_.honor_direct_switches) + break; + if (input.skip_next_unscheduled) { + input.skip_next_unscheduled = false; + VPRINT(this, 3, + "input %d unschedule request ignored due to prior schedule request " + "@%" PRIu64 "\n", + input.index, input.reader->get_last_timestamp()); + break; + } + // Trigger a switch either indefinitely or until timeout. + input.unscheduled = true; + if (!options_.honor_infinite_timeouts && input.syscall_timeout_arg == 0) { + // As our scheduling is imperfect we do not risk things being blocked + // indefinitely: we instead have a timeout, but the maximum value. + input.syscall_timeout_arg = options_.block_time_max_us; + if (input.syscall_timeout_arg == 0) + input.syscall_timeout_arg = 1; + } + if (input.syscall_timeout_arg > 0) { + input.blocked_time = scale_blocked_time(input.syscall_timeout_arg); + // Clamp at 1 since 0 means an infinite timeout for unscheduled=true. + if (input.blocked_time == 0) + input.blocked_time = 1; + input.blocked_start_time = get_output_time(output); + VPRINT(this, 3, "input %d unscheduled for %" PRIu64 " @%" PRIu64 "\n", + input.index, input.blocked_time, input.reader->get_last_timestamp()); + } else { + VPRINT(this, 3, "input %d unscheduled indefinitely @%" PRIu64 "\n", + input.index, input.reader->get_last_timestamp()); + } + break; + case TRACE_MARKER_TYPE_SYSCALL_SCHEDULE: { + if (!options_.honor_direct_switches) + break; + memref_tid_t target_tid = marker_value; + auto it = this->tid2input_.find(workload_tid_t(input.workload, target_tid)); + if (it == this->tid2input_.end()) { + VPRINT(this, 1, + "Failed to find input for switchto::resume target tid %" PRId64 "\n", + target_tid); + return; + } + input_ordinal_t target_idx = it->second; + VPRINT(this, 3, "input %d re-scheduling input %d @%" PRIu64 "\n", input.index, + target_idx, input.reader->get_last_timestamp()); + // Release the input lock before acquiring more input locks + input.lock->unlock(); + { + input_info_t *target = &inputs_[target_idx]; + std::unique_lock target_lock(*target->lock); + if (target->at_eof) { + VPRINT(this, 3, "input %d at eof ignoring re-schedule\n", target_idx); + } else if (target->unscheduled) { + target->unscheduled = false; + bool on_unsched_queue = false; + { + std::lock_guard unsched_lock( + *unscheduled_priority_.lock); + if (unscheduled_priority_.queue.find(target)) { + unscheduled_priority_.queue.erase(target); + on_unsched_queue = true; + } + } + // We have to give up the unsched lock before calling add_to_ready_queue + // as it acquires the output lock. + if (on_unsched_queue) { + output_ordinal_t resume_output = target->prev_output; + if (resume_output == sched_type_t::INVALID_OUTPUT_ORDINAL) + resume_output = output; + // We can't hold any locks when calling add_to_ready_queue. + // This input is no longer on any queue, so few things can happen + // while we don't hold the input lock: a competing _SCHEDULE will + // not find the output and it can't have blocked_time>0 (invariant + // for things on unsched q); once it's on the new queue we don't + // do anything further here so we're good to go. + target_lock.unlock(); + this->add_to_ready_queue(resume_output, target); + target_lock.lock(); + } else { + // We assume blocked_time is from _ARG_TIMEOUT and is not from + // regularly-blocking i/o. We assume i/o getting into the mix is + // rare enough or does not matter enough to try to have separate + // timeouts. + if (target->blocked_time > 0) { + VPRINT(this, 3, + "switchto::resume erasing blocked time for target " + "input %d\n", + target->index); + output_ordinal_t target_output = target->containing_output; + // There could be no output owner if we're mid-rebalance. + if (target_output != sched_type_t::INVALID_OUTPUT_ORDINAL) { + // We can't hold the input lock to acquire the output lock. + target_lock.unlock(); + { + auto scoped_output_lock = + acquire_scoped_output_lock_if_necessary( + target_output); + output_info_t &out = outputs_[target_output]; + if (out.ready_queue.queue.find(target)) { + --out.ready_queue.num_blocked; + } + // Decrement this holding the lock to synch with + // pop_from_ready_queue(). + target->blocked_time = 0; + } + target_lock.lock(); + } else + target->blocked_time = 0; + } + } + } else { + VPRINT(this, 3, "input %d will skip next unschedule\n", target_idx); + target->skip_next_unscheduled = true; + } + } + input.lock->lock(); + break; + } + default: // Nothing to do. + break; + } +} + template typename scheduler_tmpl_t::stream_status_t scheduler_dynamic_tmpl_t::rebalance_queues( @@ -490,9 +699,9 @@ scheduler_dynamic_tmpl_t::rebalance_queues( stream_status_t status = sched_type_t::STATUS_OK; assert(options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT); VPRINT(this, 1, "Output %d triggered a rebalance @%" PRIu64 ":\n", triggering_output, - this->get_output_time(triggering_output)); + get_output_time(triggering_output)); // First, update the time to avoid more threads coming here. - last_rebalance_time_.store(this->get_output_time(triggering_output), + last_rebalance_time_.store(get_output_time(triggering_output), std::memory_order_release); VPRINT(this, 2, "Before rebalance:\n"); VDO(this, 2, { this->print_queue_stats(); }); @@ -553,7 +762,7 @@ scheduler_dynamic_tmpl_t::rebalance_queues( for (unsigned int i = 0; i < outputs_.size(); ++i) { if (!outputs_[i].active->load(std::memory_order_acquire)) continue; - auto lock = this->acquire_scoped_output_lock_if_necessary(i); + auto lock = acquire_scoped_output_lock_if_necessary(i); // Only remove on the 1st iteration; later we can exceed due to binding // constraints. while (iteration == 0 && outputs_[i].ready_queue.queue.size() > avg_ceiling) { @@ -616,6 +825,273 @@ scheduler_dynamic_tmpl_t::rebalance_queues( return status; } +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::eof_or_idle_for_mode( + output_ordinal_t output, input_ordinal_t prev_input) +{ + int live_inputs = this->live_input_count_.load(std::memory_order_acquire); + if (live_inputs == 0) { + return sched_type_t::STATUS_EOF; + } + if (live_inputs <= + static_cast(inputs_.size() * options_.exit_if_fraction_inputs_left)) { + VPRINT(this, 1, "output %d exiting early with %d live inputs left\n", output, + live_inputs); + return sched_type_t::STATUS_EOF; + } + // Before going idle, try to steal work from another output. + // We start with us+1 to avoid everyone stealing from the low-numbered outputs. + // We only try when we first transition to idle; we rely on rebalancing after + // that, to avoid repeatededly grabbing other output's locks over and over. + if (!outputs_[output].tried_to_steal_on_idle) { + outputs_[output].tried_to_steal_on_idle = true; + for (unsigned int i = 1; i < outputs_.size(); ++i) { + output_ordinal_t target = (output + i) % outputs_.size(); + assert(target != output); // Sanity check (we won't reach "output"). + input_info_t *queue_next = nullptr; + stream_status_t status = pop_from_ready_queue(target, output, queue_next); + if (status == sched_type_t::STATUS_OK && queue_next != nullptr) { + set_cur_input(output, queue_next->index); + ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_STEALS]; + VPRINT(this, 2, + "eof_or_idle: output %d stole input %d from %d's ready_queue\n", + output, queue_next->index, target); + return sched_type_t::STATUS_STOLE; + } + // We didn't find anything; loop and check another output. + } + VPRINT(this, 3, "eof_or_idle: output %d failed to steal from anyone\n", output); + } + return sched_type_t::STATUS_IDLE; +} + +template +bool +scheduler_dynamic_tmpl_t::syscall_incurs_switch( + input_info_t *input, uint64_t &blocked_time) +{ + assert(input->lock->owned_by_cur_thread()); + uint64_t post_time = input->reader->get_last_timestamp(); + assert(input->processing_syscall || input->processing_maybe_blocking_syscall); + if (input->reader->get_version() < TRACE_ENTRY_VERSION_FREQUENT_TIMESTAMPS) { + // This is a legacy trace that does not have timestamps bracketing syscalls. + // We switch on every maybe-blocking syscall in this case and have a simplified + // blocking model. + blocked_time = options_.blocking_switch_threshold; + return input->processing_maybe_blocking_syscall; + } + assert(input->pre_syscall_timestamp > 0); + assert(input->pre_syscall_timestamp <= post_time); + uint64_t latency = post_time - input->pre_syscall_timestamp; + uint64_t threshold = input->processing_maybe_blocking_syscall + ? options_.blocking_switch_threshold + : options_.syscall_switch_threshold; + blocked_time = scale_blocked_time(latency); + VPRINT(this, 3, + "input %d %ssyscall latency %" PRIu64 " * scale %6.3f => blocked time %" PRIu64 + "\n", + input->index, + input->processing_maybe_blocking_syscall ? "maybe-blocking " : "", latency, + options_.block_time_multiplier, blocked_time); + return latency >= threshold; +} + +template +bool +scheduler_dynamic_tmpl_t::ready_queue_empty( + output_ordinal_t output) +{ + auto lock = acquire_scoped_output_lock_if_necessary(output); + return outputs_[output].ready_queue.queue.empty(); +} + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::pop_from_ready_queue_hold_locks( + output_ordinal_t from_output, output_ordinal_t for_output, input_info_t *&new_input, + bool from_back) +{ + assert(!this->need_output_lock() || + (outputs_[from_output].ready_queue.lock->owned_by_cur_thread() && + (from_output == for_output || + for_output == sched_type_t::INVALID_OUTPUT_ORDINAL || + outputs_[for_output].ready_queue.lock->owned_by_cur_thread()))); + std::set skipped; + std::set blocked; + input_info_t *res = nullptr; + stream_status_t status = sched_type_t::STATUS_OK; + uint64_t cur_time = get_output_time(from_output); + while (!outputs_[from_output].ready_queue.queue.empty()) { + if (from_back) { + res = outputs_[from_output].ready_queue.queue.back(); + outputs_[from_output].ready_queue.queue.erase(res); + } else if (options_.randomize_next_input) { + res = outputs_[from_output].ready_queue.queue.get_random_entry(); + outputs_[from_output].ready_queue.queue.erase(res); + } else { + res = outputs_[from_output].ready_queue.queue.top(); + outputs_[from_output].ready_queue.queue.pop(); + } + std::lock_guard input_lock(*res->lock); + assert(!res->unscheduled || + res->blocked_time > 0); // Should be in unscheduled_priority_. + if (res->binding.empty() || for_output == sched_type_t::INVALID_OUTPUT_ORDINAL || + res->binding.find(for_output) != res->binding.end()) { + // For blocked inputs, as we don't have interrupts or other regular + // control points we only check for being unblocked when an input + // would be chosen to run. We thus keep blocked inputs in the ready queue. + if (res->blocked_time > 0) { + --outputs_[from_output].ready_queue.num_blocked; + if (!options_.honor_infinite_timeouts) { + // cur_time can be 0 at initialization time. + if (res->blocked_start_time == 0 && cur_time > 0) { + // This was a start-unscheduled input: we didn't have a valid + // time at initialization. + res->blocked_start_time = cur_time; + } + } else + assert(cur_time > 0); + } + if (res->blocked_time > 0 && + // cur_time can be 0 at initialization time. + (cur_time == 0 || + // Guard against time going backward (happens for wall-clock: i#6966). + cur_time < res->blocked_start_time || + cur_time - res->blocked_start_time < res->blocked_time)) { + VPRINT(this, 4, "pop queue: %d still blocked for %" PRIu64 "\n", + res->index, + res->blocked_time - (cur_time - res->blocked_start_time)); + // We keep searching for a suitable input. + blocked.insert(res); + } else { + // This input is no longer blocked. + res->blocked_time = 0; + res->unscheduled = false; + VPRINT(this, 4, "pop queue: %d @ %" PRIu64 " no longer blocked\n", + res->index, cur_time); + // We've found a candidate. One final check if this is a migration. + bool found_candidate = false; + if (from_output == for_output) + found_candidate = true; + else { + assert(cur_time > 0 || res->last_run_time == 0); + if (res->last_run_time == 0) { + // For never-executed inputs we consider their last execution + // to be the very first simulation time, which we can't + // easily initialize until here. + res->last_run_time = outputs_[from_output].initial_cur_time->load( + std::memory_order_acquire); + } + VPRINT(this, 5, + "migration check %d to %d: cur=%" PRIu64 " last=%" PRIu64 + " delta=%" PRId64 " vs thresh %" PRIu64 "\n", + from_output, for_output, cur_time, res->last_run_time, + cur_time - res->last_run_time, + options_.migration_threshold_us); + // Guard against time going backward (happens for wall-clock: i#6966). + if (options_.migration_threshold_us == 0 || + // Allow free movement for the initial load balance at init time. + cur_time == 0 || + (cur_time > res->last_run_time && + cur_time - res->last_run_time >= + static_cast(options_.migration_threshold_us * + options_.time_units_per_us))) { + VPRINT(this, 2, "migrating %d to %d\n", from_output, for_output); + found_candidate = true; + // Do not count an initial rebalance as a migration. + if (cur_time > 0) { + ++outputs_[from_output] + .stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; + } + } + } + if (found_candidate) + break; + else + skipped.insert(res); + } + } else { + // We keep searching for a suitable input. + skipped.insert(res); + } + res = nullptr; + } + if (res == nullptr && !blocked.empty()) { + // Do not hand out EOF thinking we're done: we still have inputs blocked + // on i/o, so just wait and retry. + if (for_output != sched_type_t::INVALID_OUTPUT_ORDINAL) + ++outputs_[for_output].idle_count; + status = sched_type_t::STATUS_IDLE; + } + // Re-add the ones we skipped, but without changing their counters so we preserve + // the prior FIFO order. + for (input_info_t *save : skipped) + outputs_[from_output].ready_queue.queue.push(save); + // Re-add the blocked ones to the back. + for (input_info_t *save : blocked) { + std::lock_guard input_lock(*save->lock); + this->add_to_ready_queue_hold_locks(from_output, save); + } + auto res_lock = (res == nullptr) ? std::unique_lock() + : std::unique_lock(*res->lock); + VDO(this, 1, { + static int output_heartbeat; + // We are ok with races as the cadence is approximate. + if (++output_heartbeat % 2000 == 0) { + size_t unsched_size = 0; + { + std::lock_guard unsched_lock( + *unscheduled_priority_.lock); + unsched_size = unscheduled_priority_.queue.size(); + } + VPRINT(this, 1, + "heartbeat[%d] %zd in queue; %d blocked; %zd unscheduled => %d %d\n", + from_output, outputs_[from_output].ready_queue.queue.size(), + outputs_[from_output].ready_queue.num_blocked, unsched_size, + res == nullptr ? -1 : res->index, status); + } + }); + if (res != nullptr) { + VPRINT(this, 4, + "pop_from_ready_queue[%d] (post-size %zu): input %d priority %d timestamp " + "delta %" PRIu64 "\n", + from_output, outputs_[from_output].ready_queue.queue.size(), res->index, + res->priority, res->reader->get_last_timestamp() - res->base_timestamp); + res->unscheduled = false; + res->prev_output = res->containing_output; + res->containing_output = for_output; + } + new_input = res; + return status; +} + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::pop_from_ready_queue( + output_ordinal_t from_output, output_ordinal_t for_output, input_info_t *&new_input) +{ + stream_status_t status = sched_type_t::STATUS_OK; + { + std::unique_lock from_lock; + std::unique_lock for_lock; + // If we need both locks, acquire in increasing output order to avoid deadlocks if + // two outputs try to steal from each other. + if (from_output == for_output || + for_output == sched_type_t::INVALID_OUTPUT_ORDINAL) { + from_lock = acquire_scoped_output_lock_if_necessary(from_output); + } else if (from_output < for_output) { + from_lock = acquire_scoped_output_lock_if_necessary(from_output); + for_lock = acquire_scoped_output_lock_if_necessary(for_output); + } else { + for_lock = acquire_scoped_output_lock_if_necessary(for_output); + from_lock = acquire_scoped_output_lock_if_necessary(from_output); + } + status = pop_from_ready_queue_hold_locks(from_output, for_output, new_input); + } + return status; +} + template class scheduler_dynamic_tmpl_t; template class scheduler_dynamic_tmpl_t; diff --git a/clients/drcachesim/scheduler/scheduler_fixed.cpp b/clients/drcachesim/scheduler/scheduler_fixed.cpp index 2b7adee2be7..93227e62ee0 100644 --- a/clients/drcachesim/scheduler/scheduler_fixed.cpp +++ b/clients/drcachesim/scheduler/scheduler_fixed.cpp @@ -151,6 +151,18 @@ scheduler_fixed_tmpl_t::check_for_input_switch( return sched_type_t::STATUS_OK; } +template +typename scheduler_tmpl_t::stream_status_t +scheduler_fixed_tmpl_t::eof_or_idle_for_mode( + output_ordinal_t output, input_ordinal_t prev_input) +{ + if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT || + this->live_input_count_.load(std::memory_order_acquire) == 0) { + return sched_type_t::STATUS_EOF; + } + return sched_type_t::STATUS_IDLE; +} + template class scheduler_fixed_tmpl_t; template class scheduler_fixed_tmpl_t; diff --git a/clients/drcachesim/scheduler/scheduler_impl.cpp b/clients/drcachesim/scheduler/scheduler_impl.cpp index 89f5d65c1a7..77e845e9c7d 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.cpp +++ b/clients/drcachesim/scheduler/scheduler_impl.cpp @@ -2184,14 +2184,6 @@ scheduler_impl_tmpl_t::close_schedule_segment( return sched_type_t::STATUS_OK; } -template -bool -scheduler_impl_tmpl_t::ready_queue_empty(output_ordinal_t output) -{ - auto lock = acquire_scoped_output_lock_if_necessary(output); - return outputs_[output].ready_queue.queue.empty(); -} - template void scheduler_impl_tmpl_t::add_to_unscheduled_queue( @@ -2249,192 +2241,6 @@ scheduler_impl_tmpl_t::add_to_ready_queue(output_ordinal add_to_ready_queue_hold_locks(output, input); } -template -typename scheduler_tmpl_t::stream_status_t -scheduler_impl_tmpl_t::pop_from_ready_queue_hold_locks( - output_ordinal_t from_output, output_ordinal_t for_output, input_info_t *&new_input, - bool from_back) -{ - assert(!need_output_lock() || - (outputs_[from_output].ready_queue.lock->owned_by_cur_thread() && - (from_output == for_output || - for_output == sched_type_t::INVALID_OUTPUT_ORDINAL || - outputs_[for_output].ready_queue.lock->owned_by_cur_thread()))); - std::set skipped; - std::set blocked; - input_info_t *res = nullptr; - stream_status_t status = sched_type_t::STATUS_OK; - uint64_t cur_time = get_output_time(from_output); - while (!outputs_[from_output].ready_queue.queue.empty()) { - if (from_back) { - res = outputs_[from_output].ready_queue.queue.back(); - outputs_[from_output].ready_queue.queue.erase(res); - } else if (options_.randomize_next_input) { - res = outputs_[from_output].ready_queue.queue.get_random_entry(); - outputs_[from_output].ready_queue.queue.erase(res); - } else { - res = outputs_[from_output].ready_queue.queue.top(); - outputs_[from_output].ready_queue.queue.pop(); - } - std::lock_guard input_lock(*res->lock); - assert(!res->unscheduled || - res->blocked_time > 0); // Should be in unscheduled_priority_. - if (res->binding.empty() || for_output == sched_type_t::INVALID_OUTPUT_ORDINAL || - res->binding.find(for_output) != res->binding.end()) { - // For blocked inputs, as we don't have interrupts or other regular - // control points we only check for being unblocked when an input - // would be chosen to run. We thus keep blocked inputs in the ready queue. - if (res->blocked_time > 0) { - --outputs_[from_output].ready_queue.num_blocked; - if (!options_.honor_infinite_timeouts) { - // cur_time can be 0 at initialization time. - if (res->blocked_start_time == 0 && cur_time > 0) { - // This was a start-unscheduled input: we didn't have a valid - // time at initialization. - res->blocked_start_time = cur_time; - } - } else - assert(cur_time > 0); - } - if (res->blocked_time > 0 && - // cur_time can be 0 at initialization time. - (cur_time == 0 || - // Guard against time going backward (happens for wall-clock: i#6966). - cur_time < res->blocked_start_time || - cur_time - res->blocked_start_time < res->blocked_time)) { - VPRINT(this, 4, "pop queue: %d still blocked for %" PRIu64 "\n", - res->index, - res->blocked_time - (cur_time - res->blocked_start_time)); - // We keep searching for a suitable input. - blocked.insert(res); - } else { - // This input is no longer blocked. - res->blocked_time = 0; - res->unscheduled = false; - VPRINT(this, 4, "pop queue: %d @ %" PRIu64 " no longer blocked\n", - res->index, cur_time); - // We've found a candidate. One final check if this is a migration. - bool found_candidate = false; - if (from_output == for_output) - found_candidate = true; - else { - assert(cur_time > 0 || res->last_run_time == 0); - if (res->last_run_time == 0) { - // For never-executed inputs we consider their last execution - // to be the very first simulation time, which we can't - // easily initialize until here. - res->last_run_time = outputs_[from_output].initial_cur_time->load( - std::memory_order_acquire); - } - VPRINT(this, 5, - "migration check %d to %d: cur=%" PRIu64 " last=%" PRIu64 - " delta=%" PRId64 " vs thresh %" PRIu64 "\n", - from_output, for_output, cur_time, res->last_run_time, - cur_time - res->last_run_time, - options_.migration_threshold_us); - // Guard against time going backward (happens for wall-clock: i#6966). - if (options_.migration_threshold_us == 0 || - // Allow free movement for the initial load balance at init time. - cur_time == 0 || - (cur_time > res->last_run_time && - cur_time - res->last_run_time >= - static_cast(options_.migration_threshold_us * - options_.time_units_per_us))) { - VPRINT(this, 2, "migrating %d to %d\n", from_output, for_output); - found_candidate = true; - // Do not count an initial rebalance as a migration. - if (cur_time > 0) { - ++outputs_[from_output] - .stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; - } - } - } - if (found_candidate) - break; - else - skipped.insert(res); - } - } else { - // We keep searching for a suitable input. - skipped.insert(res); - } - res = nullptr; - } - if (res == nullptr && !blocked.empty()) { - // Do not hand out EOF thinking we're done: we still have inputs blocked - // on i/o, so just wait and retry. - if (for_output != sched_type_t::INVALID_OUTPUT_ORDINAL) - ++outputs_[for_output].idle_count; - status = sched_type_t::STATUS_IDLE; - } - // Re-add the ones we skipped, but without changing their counters so we preserve - // the prior FIFO order. - for (input_info_t *save : skipped) - outputs_[from_output].ready_queue.queue.push(save); - // Re-add the blocked ones to the back. - for (input_info_t *save : blocked) { - std::lock_guard input_lock(*save->lock); - add_to_ready_queue_hold_locks(from_output, save); - } - auto res_lock = (res == nullptr) ? std::unique_lock() - : std::unique_lock(*res->lock); - VDO(this, 1, { - static int output_heartbeat; - // We are ok with races as the cadence is approximate. - if (++output_heartbeat % 2000 == 0) { - size_t unsched_size = 0; - { - std::lock_guard unsched_lock( - *unscheduled_priority_.lock); - unsched_size = unscheduled_priority_.queue.size(); - } - VPRINT(this, 1, - "heartbeat[%d] %zd in queue; %d blocked; %zd unscheduled => %d %d\n", - from_output, outputs_[from_output].ready_queue.queue.size(), - outputs_[from_output].ready_queue.num_blocked, unsched_size, - res == nullptr ? -1 : res->index, status); - } - }); - if (res != nullptr) { - VPRINT(this, 4, - "pop_from_ready_queue[%d] (post-size %zu): input %d priority %d timestamp " - "delta %" PRIu64 "\n", - from_output, outputs_[from_output].ready_queue.queue.size(), res->index, - res->priority, res->reader->get_last_timestamp() - res->base_timestamp); - res->unscheduled = false; - res->prev_output = res->containing_output; - res->containing_output = for_output; - } - new_input = res; - return status; -} - -template -typename scheduler_tmpl_t::stream_status_t -scheduler_impl_tmpl_t::pop_from_ready_queue( - output_ordinal_t from_output, output_ordinal_t for_output, input_info_t *&new_input) -{ - stream_status_t status = sched_type_t::STATUS_OK; - { - std::unique_lock from_lock; - std::unique_lock for_lock; - // If we need both locks, acquire in increasing output order to avoid deadlocks if - // two outputs try to steal from each other. - if (from_output == for_output || - for_output == sched_type_t::INVALID_OUTPUT_ORDINAL) { - from_lock = acquire_scoped_output_lock_if_necessary(from_output); - } else if (from_output < for_output) { - from_lock = acquire_scoped_output_lock_if_necessary(from_output); - for_lock = acquire_scoped_output_lock_if_necessary(for_output); - } else { - for_lock = acquire_scoped_output_lock_if_necessary(for_output); - from_lock = acquire_scoped_output_lock_if_necessary(from_output); - } - status = pop_from_ready_queue_hold_locks(from_output, for_output, new_input); - } - return status; -} - template uint64_t scheduler_impl_tmpl_t::scale_blocked_time( @@ -2453,37 +2259,6 @@ scheduler_impl_tmpl_t::scale_blocked_time( return static_cast(scaled_us * options_.time_units_per_us); } -template -bool -scheduler_impl_tmpl_t::syscall_incurs_switch( - input_info_t *input, uint64_t &blocked_time) -{ - assert(input->lock->owned_by_cur_thread()); - uint64_t post_time = input->reader->get_last_timestamp(); - assert(input->processing_syscall || input->processing_maybe_blocking_syscall); - if (input->reader->get_version() < TRACE_ENTRY_VERSION_FREQUENT_TIMESTAMPS) { - // This is a legacy trace that does not have timestamps bracketing syscalls. - // We switch on every maybe-blocking syscall in this case and have a simplified - // blocking model. - blocked_time = options_.blocking_switch_threshold; - return input->processing_maybe_blocking_syscall; - } - assert(input->pre_syscall_timestamp > 0); - assert(input->pre_syscall_timestamp <= post_time); - uint64_t latency = post_time - input->pre_syscall_timestamp; - uint64_t threshold = input->processing_maybe_blocking_syscall - ? options_.blocking_switch_threshold - : options_.syscall_switch_threshold; - blocked_time = scale_blocked_time(latency); - VPRINT(this, 3, - "input %d %ssyscall latency %" PRIu64 " * scale %6.3f => blocked time %" PRIu64 - "\n", - input->index, - input->processing_maybe_blocking_syscall ? "maybe-blocking " : "", latency, - options_.block_time_multiplier, blocked_time); - return latency >= threshold; -} - template typename scheduler_tmpl_t::stream_status_t scheduler_impl_tmpl_t::set_cur_input( @@ -2716,215 +2491,6 @@ scheduler_impl_tmpl_t::update_switch_stats( } } -template -void -scheduler_impl_tmpl_t::process_marker( - input_info_t &input, output_ordinal_t output, trace_marker_type_t marker_type, - uintptr_t marker_value) -{ - assert(input.lock->owned_by_cur_thread()); - switch (marker_type) { - case TRACE_MARKER_TYPE_SYSCALL: - input.processing_syscall = true; - input.pre_syscall_timestamp = input.reader->get_last_timestamp(); - break; - case TRACE_MARKER_TYPE_MAYBE_BLOCKING_SYSCALL: - input.processing_maybe_blocking_syscall = true; - // Generally we should already have the timestamp from a just-prior - // syscall marker, but we support tests and other synthetic sequences - // with just a maybe-blocking. - input.pre_syscall_timestamp = input.reader->get_last_timestamp(); - break; - case TRACE_MARKER_TYPE_CONTEXT_SWITCH_START: - outputs_[output].in_context_switch_code = true; - ANNOTATE_FALLTHROUGH; - case TRACE_MARKER_TYPE_SYSCALL_TRACE_START: - outputs_[output].in_kernel_code = true; - break; - case TRACE_MARKER_TYPE_CONTEXT_SWITCH_END: - // We have to delay until the next record. - outputs_[output].hit_switch_code_end = true; - ANNOTATE_FALLTHROUGH; - case TRACE_MARKER_TYPE_SYSCALL_TRACE_END: - outputs_[output].in_kernel_code = false; - break; - case TRACE_MARKER_TYPE_DIRECT_THREAD_SWITCH: { - if (!options_.honor_direct_switches) - break; - ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_ATTEMPTS]; - memref_tid_t target_tid = marker_value; - auto it = tid2input_.find(workload_tid_t(input.workload, target_tid)); - if (it == tid2input_.end()) { - VPRINT(this, 1, "Failed to find input for target switch thread %" PRId64 "\n", - target_tid); - } else { - input.switch_to_input = it->second; - } - // Trigger a switch either indefinitely or until timeout. - if (input.skip_next_unscheduled) { - // The underlying kernel mechanism being modeled only supports a single - // request: they cannot accumulate. Timing differences in the trace could - // perhaps result in multiple lining up when the didn't in the real app; - // but changing the scheme here could also push representatives in the - // other direction. - input.skip_next_unscheduled = false; - VPRINT(this, 3, - "input %d unschedule request ignored due to prior schedule request " - "@%" PRIu64 "\n", - input.index, input.reader->get_last_timestamp()); - break; - } - input.unscheduled = true; - if (!options_.honor_infinite_timeouts && input.syscall_timeout_arg == 0) { - // As our scheduling is imperfect we do not risk things being blocked - // indefinitely: we instead have a timeout, but the maximum value. - input.syscall_timeout_arg = options_.block_time_max_us; - if (input.syscall_timeout_arg == 0) - input.syscall_timeout_arg = 1; - } - if (input.syscall_timeout_arg > 0) { - input.blocked_time = scale_blocked_time(input.syscall_timeout_arg); - // Clamp at 1 since 0 means an infinite timeout for unscheduled=true. - if (input.blocked_time == 0) - input.blocked_time = 1; - input.blocked_start_time = get_output_time(output); - VPRINT(this, 3, "input %d unscheduled for %" PRIu64 " @%" PRIu64 "\n", - input.index, input.blocked_time, input.reader->get_last_timestamp()); - } else { - VPRINT(this, 3, "input %d unscheduled indefinitely @%" PRIu64 "\n", - input.index, input.reader->get_last_timestamp()); - } - break; - } - case TRACE_MARKER_TYPE_SYSCALL_ARG_TIMEOUT: - // This is cleared at the post-syscall instr. - input.syscall_timeout_arg = static_cast(marker_value); - break; - case TRACE_MARKER_TYPE_SYSCALL_UNSCHEDULE: - if (!options_.honor_direct_switches) - break; - if (input.skip_next_unscheduled) { - input.skip_next_unscheduled = false; - VPRINT(this, 3, - "input %d unschedule request ignored due to prior schedule request " - "@%" PRIu64 "\n", - input.index, input.reader->get_last_timestamp()); - break; - } - // Trigger a switch either indefinitely or until timeout. - input.unscheduled = true; - if (!options_.honor_infinite_timeouts && input.syscall_timeout_arg == 0) { - // As our scheduling is imperfect we do not risk things being blocked - // indefinitely: we instead have a timeout, but the maximum value. - input.syscall_timeout_arg = options_.block_time_max_us; - if (input.syscall_timeout_arg == 0) - input.syscall_timeout_arg = 1; - } - if (input.syscall_timeout_arg > 0) { - input.blocked_time = scale_blocked_time(input.syscall_timeout_arg); - // Clamp at 1 since 0 means an infinite timeout for unscheduled=true. - if (input.blocked_time == 0) - input.blocked_time = 1; - input.blocked_start_time = get_output_time(output); - VPRINT(this, 3, "input %d unscheduled for %" PRIu64 " @%" PRIu64 "\n", - input.index, input.blocked_time, input.reader->get_last_timestamp()); - } else { - VPRINT(this, 3, "input %d unscheduled indefinitely @%" PRIu64 "\n", - input.index, input.reader->get_last_timestamp()); - } - break; - case TRACE_MARKER_TYPE_SYSCALL_SCHEDULE: { - if (!options_.honor_direct_switches) - break; - memref_tid_t target_tid = marker_value; - auto it = tid2input_.find(workload_tid_t(input.workload, target_tid)); - if (it == tid2input_.end()) { - VPRINT(this, 1, - "Failed to find input for switchto::resume target tid %" PRId64 "\n", - target_tid); - return; - } - input_ordinal_t target_idx = it->second; - VPRINT(this, 3, "input %d re-scheduling input %d @%" PRIu64 "\n", input.index, - target_idx, input.reader->get_last_timestamp()); - // Release the input lock before acquiring more input locks - input.lock->unlock(); - { - input_info_t *target = &inputs_[target_idx]; - std::unique_lock target_lock(*target->lock); - if (target->at_eof) { - VPRINT(this, 3, "input %d at eof ignoring re-schedule\n", target_idx); - } else if (target->unscheduled) { - target->unscheduled = false; - bool on_unsched_queue = false; - { - std::lock_guard unsched_lock( - *unscheduled_priority_.lock); - if (unscheduled_priority_.queue.find(target)) { - unscheduled_priority_.queue.erase(target); - on_unsched_queue = true; - } - } - // We have to give up the unsched lock before calling add_to_ready_queue - // as it acquires the output lock. - if (on_unsched_queue) { - output_ordinal_t resume_output = target->prev_output; - if (resume_output == sched_type_t::INVALID_OUTPUT_ORDINAL) - resume_output = output; - // We can't hold any locks when calling add_to_ready_queue. - // This input is no longer on any queue, so few things can happen - // while we don't hold the input lock: a competing _SCHEDULE will - // not find the output and it can't have blocked_time>0 (invariant - // for things on unsched q); once it's on the new queue we don't - // do anything further here so we're good to go. - target_lock.unlock(); - add_to_ready_queue(resume_output, target); - target_lock.lock(); - } else { - // We assume blocked_time is from _ARG_TIMEOUT and is not from - // regularly-blocking i/o. We assume i/o getting into the mix is - // rare enough or does not matter enough to try to have separate - // timeouts. - if (target->blocked_time > 0) { - VPRINT(this, 3, - "switchto::resume erasing blocked time for target " - "input %d\n", - target->index); - output_ordinal_t target_output = target->containing_output; - // There could be no output owner if we're mid-rebalance. - if (target_output != sched_type_t::INVALID_OUTPUT_ORDINAL) { - // We can't hold the input lock to acquire the output lock. - target_lock.unlock(); - { - auto scoped_output_lock = - acquire_scoped_output_lock_if_necessary( - target_output); - output_info_t &out = outputs_[target_output]; - if (out.ready_queue.queue.find(target)) { - --out.ready_queue.num_blocked; - } - // Decrement this holding the lock to synch with - // pop_from_ready_queue(). - target->blocked_time = 0; - } - target_lock.lock(); - } else - target->blocked_time = 0; - } - } - } else { - VPRINT(this, 3, "input %d will skip next unschedule\n", target_idx); - target->skip_next_unscheduled = true; - } - } - input.lock->lock(); - break; - } - default: // Nothing to do. - break; - } -} - template typename scheduler_tmpl_t::stream_status_t scheduler_impl_tmpl_t::next_record(output_ordinal_t output, @@ -3325,55 +2891,10 @@ typename scheduler_tmpl_t::stream_status_t scheduler_impl_tmpl_t::eof_or_idle(output_ordinal_t output, input_ordinal_t prev_input) { - // XXX i#6831: Refactor to use subclasses or templates to specialize - // scheduler code based on mapping options, to avoid these top-level - // conditionals in many functions? - int live_inputs = live_input_count_.load(std::memory_order_acquire); - if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT || live_inputs == 0 || - // While a full schedule recorded should have each input hit either its - // EOF or ROI end, we have a fallback to avoid hangs for possible recorded - // schedules that end an input early deliberately without an ROI. - (options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY && - live_replay_output_count_.load(std::memory_order_acquire) == 0)) { - assert(options_.mapping != sched_type_t::MAP_AS_PREVIOUSLY || - outputs_[output].at_eof); - return sched_type_t::STATUS_EOF; - } - if (options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT && - live_inputs <= - static_cast(inputs_.size() * options_.exit_if_fraction_inputs_left)) { - VPRINT(this, 1, "output %d exiting early with %d live inputs left\n", output, - live_inputs); - return sched_type_t::STATUS_EOF; - } - if (options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT) { - // Before going idle, try to steal work from another output. - // We start with us+1 to avoid everyone stealing from the low-numbered outputs. - // We only try when we first transition to idle; we rely on rebalancing after - // that, to avoid repeatededly grabbing other output's locks over and over. - if (!outputs_[output].tried_to_steal_on_idle) { - outputs_[output].tried_to_steal_on_idle = true; - for (unsigned int i = 1; i < outputs_.size(); ++i) { - output_ordinal_t target = (output + i) % outputs_.size(); - assert(target != output); // Sanity check (we won't reach "output"). - input_info_t *queue_next = nullptr; - stream_status_t status = pop_from_ready_queue(target, output, queue_next); - if (status == sched_type_t::STATUS_OK && queue_next != nullptr) { - set_cur_input(output, queue_next->index); - ++outputs_[output] - .stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_STEALS]; - VPRINT( - this, 2, - "eof_or_idle: output %d stole input %d from %d's ready_queue\n", - output, queue_next->index, target); - return sched_type_t::STATUS_STOLE; - } - // We didn't find anything; loop and check another output. - } - VPRINT(this, 3, "eof_or_idle: output %d failed to steal from anyone\n", - output); - } - } + stream_status_t res = eof_or_idle_for_mode(output, prev_input); + assert(res != sched_type_t::STATUS_OK); + if (res != sched_type_t::STATUS_IDLE) + return res; // We rely on rebalancing to handle the case of every input being unscheduled. outputs_[output].waiting = true; if (prev_input != sched_type_t::INVALID_INPUT_ORDINAL) diff --git a/clients/drcachesim/scheduler/scheduler_impl.h b/clients/drcachesim/scheduler/scheduler_impl.h index a8c84f7cc55..eb7e51ffe11 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.h +++ b/clients/drcachesim/scheduler/scheduler_impl.h @@ -383,9 +383,6 @@ template class scheduler_impl_tmpl_t std::unique_lock acquire_scoped_output_lock_if_necessary(output_ordinal_t output); - bool - ready_queue_empty(output_ordinal_t output); - // If input->unscheduled is true and input->blocked_time is 0, input // is placed on the unscheduled_priority_ queue instead. // The caller cannot hold the input's lock: this routine will acquire it. @@ -405,26 +402,6 @@ template class scheduler_impl_tmpl_t uint64_t scale_blocked_time(uint64_t initial_time) const; - // The input's lock must be held by the caller. - // Returns a multiplier for how long the input should be considered blocked. - bool - syscall_incurs_switch(input_info_t *input, uint64_t &blocked_time); - - // "for_output" is which output stream is looking for a new input; only an - // input which is able to run on that output will be selected. - // for_output can be INVALID_OUTPUT_ORDINAL, which will ignore bindings. - // If from_output != for_output (including for_output == INVALID_OUTPUT_ORDINAL) - // this is a migration and only migration-ready inputs will be picked. - stream_status_t - pop_from_ready_queue(output_ordinal_t from_output, output_ordinal_t for_output, - input_info_t *&new_input); - - // Identical to pop_from_ready_queue but the caller must hold both output locks. - stream_status_t - pop_from_ready_queue_hold_locks(output_ordinal_t from_output, - output_ordinal_t for_output, input_info_t *&new_input, - bool from_back = false); - // Up to the caller to check verbosity before calling. void print_queue_stats(); @@ -821,12 +798,6 @@ template class scheduler_impl_tmpl_t void print_record(const RecordType &record); - // Process each marker seen for MAP_TO_ANY_OUTPUT during next_record(). - // The input's lock must be held by the caller. - virtual void - process_marker(input_info_t &input, output_ordinal_t output, - trace_marker_type_t marker_type, uintptr_t marker_value); - // Returns the get_stream_name() value for the current input stream scheduled on // the 'output_ordinal'-th output stream. std::string @@ -909,6 +880,11 @@ template class scheduler_impl_tmpl_t stream_status_t eof_or_idle(output_ordinal_t output, input_ordinal_t prev_input); + // mapping_t-mode specific actions when one output runs out of things to do. + // Success return values are either STATUS_IDLE or STATUS_EOF. + virtual stream_status_t + eof_or_idle_for_mode(output_ordinal_t output, input_ordinal_t prev_input) = 0; + // Returns whether the current record for the current input stream scheduled on // the 'output_ordinal'-th output stream is from a part of the trace corresponding // to kernel execution. @@ -1011,12 +987,17 @@ class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_t::schedule_record_t; using typename scheduler_impl_tmpl_t::InputTimestampComparator; + using typename scheduler_impl_tmpl_t::workload_tid_t; using scheduler_impl_tmpl_t::options_; using scheduler_impl_tmpl_t::outputs_; using scheduler_impl_tmpl_t::inputs_; using scheduler_impl_tmpl_t::error_string_; using scheduler_impl_tmpl_t::unscheduled_priority_; using scheduler_impl_tmpl_t::set_cur_input; + using scheduler_impl_tmpl_t::acquire_scoped_output_lock_if_necessary; + using scheduler_impl_tmpl_t::get_output_time; + using scheduler_impl_tmpl_t::scale_blocked_time; protected: scheduler_status_t @@ -1031,13 +1012,46 @@ class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_t inputs_to_add); + bool + ready_queue_empty(output_ordinal_t output); + + // "for_output" is which output stream is looking for a new input; only an + // input which is able to run on that output will be selected. + // for_output can be INVALID_OUTPUT_ORDINAL, which will ignore bindings. + // If from_output != for_output (including for_output == INVALID_OUTPUT_ORDINAL) + // this is a migration and only migration-ready inputs will be picked. + stream_status_t + pop_from_ready_queue(output_ordinal_t from_output, output_ordinal_t for_output, + input_info_t *&new_input); + + // Identical to pop_from_ready_queue but the caller must hold both output locks. + stream_status_t + pop_from_ready_queue_hold_locks(output_ordinal_t from_output, + output_ordinal_t for_output, input_info_t *&new_input, + bool from_back = false); + // Rebalancing coordination. std::atomic rebalancer_; std::atomic last_rebalance_time_; @@ -1078,6 +1092,10 @@ class scheduler_replay_tmpl_t : public scheduler_impl_tmpl_t::check_for_input_switch( return sched_type_t::STATUS_OK; } +template +typename scheduler_tmpl_t::stream_status_t +scheduler_replay_tmpl_t::eof_or_idle_for_mode( + output_ordinal_t output, input_ordinal_t prev_input) +{ + if (this->live_input_count_.load(std::memory_order_acquire) == 0 || + // While a full schedule recorded should have each input hit either its + // EOF or ROI end, we have a fallback to avoid hangs for possible recorded + // schedules that end an input early deliberately without an ROI. + (options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY && + this->live_replay_output_count_.load(std::memory_order_acquire) == 0)) { + assert(options_.mapping != sched_type_t::MAP_AS_PREVIOUSLY || + outputs_[output].at_eof); + return sched_type_t::STATUS_EOF; + } + return sched_type_t::STATUS_IDLE; +} + template class scheduler_replay_tmpl_t; template class scheduler_replay_tmpl_t; diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index 586807c192d..082f64b89b3 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -2988,6 +2988,11 @@ class test_scheduler_base_t : public scheduler_impl_t { { return sched_type_t::STATUS_NOT_IMPLEMENTED; } + stream_status_t + eof_or_idle_for_mode(output_ordinal_t output, input_ordinal_t prev_input) override + { + return sched_type_t::STATUS_NOT_IMPLEMENTED; + } }; class test_scheduler_t : public test_scheduler_base_t { public: From ed1841728b3644ed2a5198f0062faeab77396ce4 Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Thu, 14 Nov 2024 15:27:45 -0500 Subject: [PATCH 4/5] Add missing headers --- clients/drcachesim/scheduler/scheduler_dynamic.cpp | 5 +++++ clients/drcachesim/scheduler/scheduler_fixed.cpp | 4 ++++ clients/drcachesim/scheduler/scheduler_replay.cpp | 7 +++++++ 3 files changed, 16 insertions(+) diff --git a/clients/drcachesim/scheduler/scheduler_dynamic.cpp b/clients/drcachesim/scheduler/scheduler_dynamic.cpp index 91aca6431d5..b2a63a6ed90 100644 --- a/clients/drcachesim/scheduler/scheduler_dynamic.cpp +++ b/clients/drcachesim/scheduler/scheduler_dynamic.cpp @@ -37,9 +37,14 @@ #include #include +#include +#include #include +#include #include #include +#include +#include #include "flexible_queue.h" #include "memref.h" diff --git a/clients/drcachesim/scheduler/scheduler_fixed.cpp b/clients/drcachesim/scheduler/scheduler_fixed.cpp index 93227e62ee0..c8f4a53b222 100644 --- a/clients/drcachesim/scheduler/scheduler_fixed.cpp +++ b/clients/drcachesim/scheduler/scheduler_fixed.cpp @@ -36,8 +36,12 @@ #include "scheduler_impl.h" #include +#include #include +#include #include +#include +#include #include "memref.h" #include "mutex_dbg_owned.h" diff --git a/clients/drcachesim/scheduler/scheduler_replay.cpp b/clients/drcachesim/scheduler/scheduler_replay.cpp index 61301400806..b27a2aa6d41 100644 --- a/clients/drcachesim/scheduler/scheduler_replay.cpp +++ b/clients/drcachesim/scheduler/scheduler_replay.cpp @@ -36,9 +36,16 @@ #include "scheduler_impl.h" #include +#include #include +#include #include +#include #include +#include +#include +#include +#include #include "memref.h" #include "mutex_dbg_owned.h" From be55d5be76ca258ca9c0fa6c546a37bb44652bec Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Thu, 14 Nov 2024 18:30:13 -0500 Subject: [PATCH 5/5] Review requests: added several comments --- clients/drcachesim/scheduler/scheduler_impl.cpp | 3 +++ clients/drcachesim/scheduler/scheduler_impl.h | 7 +++++-- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/clients/drcachesim/scheduler/scheduler_impl.cpp b/clients/drcachesim/scheduler/scheduler_impl.cpp index 4096a65fbde..88c9d1eb3ae 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.cpp +++ b/clients/drcachesim/scheduler/scheduler_impl.cpp @@ -2892,6 +2892,9 @@ scheduler_impl_tmpl_t::eof_or_idle(output_ordinal_t outp input_ordinal_t prev_input) { stream_status_t res = eof_or_idle_for_mode(output, prev_input); + // We should either get STATUS_IDLE (success, and we continue below) or + // STATUS_EOF (success, and we exit this funcion) or some error (and we exit). + // A return value of STATUS_OK is not allowed, as documented. assert(res != sched_type_t::STATUS_OK); if (res != sched_type_t::STATUS_IDLE) return res; diff --git a/clients/drcachesim/scheduler/scheduler_impl.h b/clients/drcachesim/scheduler/scheduler_impl.h index dc1aa187de9..a65dc9923c7 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.h +++ b/clients/drcachesim/scheduler/scheduler_impl.h @@ -606,8 +606,10 @@ template class scheduler_impl_tmpl_t return sched_type_t::STATUS_INVALID; } - // mapping_t-mode specific actions when one output runs out of things to do. - // Success return values are either STATUS_IDLE or STATUS_EOF. + // mapping_t-mode specific actions when one output runs out of things to do + // (pick_next_input_for_mode() has nothing left in this output's queue). + // Success return values are either STATUS_IDLE (asking the user to keep + // polling as more work may show up in the future) or STATUS_EOF. virtual stream_status_t eof_or_idle_for_mode(output_ordinal_t output, input_ordinal_t prev_input) = 0; @@ -1043,6 +1045,7 @@ class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_t