From 388ffb025f8d4d21ac08422c27f85e34a0d1d567 Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Wed, 13 Nov 2024 10:19:19 -0500 Subject: [PATCH 1/3] i#6831 sched refactor, step 6: Split set_initial_schedule() Splits set_initial_schedule() into separate overrides in the 3 scheduler subclasses, sharing the initial content reading in init() prior to calling set_initial_schedule(). Splits set_output_active(): it is only valid in dynamic. Moves rebalance_queues() and two rebalance vars to dynamic. Moves read_recorded_schedule() and read_and_instantiate_traced_schedule() to replay. Adds more using statements to remove the need for "this->" prefixes on frequent base class member field references. Issue: #6831 --- clients/drcachesim/scheduler/scheduler.cpp | 3 +- .../scheduler/scheduler_dynamic.cpp | 356 +++++++++-- .../drcachesim/scheduler/scheduler_fixed.cpp | 72 ++- .../drcachesim/scheduler/scheduler_impl.cpp | 579 +----------------- clients/drcachesim/scheduler/scheduler_impl.h | 105 +++- .../drcachesim/scheduler/scheduler_replay.cpp | 352 +++++++++-- .../drcachesim/tests/scheduler_unit_tests.cpp | 54 +- 7 files changed, 781 insertions(+), 740 deletions(-) diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index cc1c6ddfd72..828c59be514 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -60,7 +60,8 @@ scheduler_tmpl_t::init( new scheduler_replay_tmpl_t); } else { // Non-dynamic and non-replay fixed modes such as analyzer serial and - // parallel modes. + // parallel modes. Although serial does do interleaving by timestamp + // it is closer to parallel mode than the file-based replay modes. impl_ = std::unique_ptr, scheduler_impl_deleter_t>( new scheduler_fixed_tmpl_t); diff --git a/clients/drcachesim/scheduler/scheduler_dynamic.cpp b/clients/drcachesim/scheduler/scheduler_dynamic.cpp index 92d47b1591d..9897d010f01 100644 --- a/clients/drcachesim/scheduler/scheduler_dynamic.cpp +++ b/clients/drcachesim/scheduler/scheduler_dynamic.cpp @@ -51,6 +51,123 @@ namespace dynamorio { namespace drmemtrace { +template +typename scheduler_tmpl_t::scheduler_status_t +scheduler_dynamic_tmpl_t::set_initial_schedule( + std::unordered_map> &workload2inputs) +{ + if (options_.mapping != sched_type_t::MAP_TO_ANY_OUTPUT) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + // Assign initial inputs. + if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { + // Compute the min timestamp (==base_timestamp) per workload and sort + // all inputs by relative time from the base. + for (int workload_idx = 0; + workload_idx < static_cast(workload2inputs.size()); ++workload_idx) { + uint64_t min_time = std::numeric_limits::max(); + input_ordinal_t min_input = -1; + for (int input_idx : workload2inputs[workload_idx]) { + if (inputs_[input_idx].next_timestamp < min_time) { + min_time = inputs_[input_idx].next_timestamp; + min_input = input_idx; + } + } + if (min_input < 0) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + for (int input_idx : workload2inputs[workload_idx]) { + VPRINT(this, 4, + "workload %d: setting input %d base_timestamp to %" PRIu64 + " vs next_timestamp %zu\n", + workload_idx, input_idx, min_time, + inputs_[input_idx].next_timestamp); + inputs_[input_idx].base_timestamp = min_time; + inputs_[input_idx].order_by_timestamp = true; + } + } + // We'll pick the starting inputs below by sorting by relative time from + // each workload's base_timestamp, which our queue does for us. + } + // First, put all inputs into a temporary queue to sort by priority and + // time for us. + flexible_queue_t allq; + for (int i = 0; i < static_cast(inputs_.size()); ++i) { + inputs_[i].queue_counter = i; + allq.push(&inputs_[i]); + } + // Now assign round-robin to the outputs. We have to obey bindings here: we + // just take the first. This isn't guaranteed to be perfect if there are + // many bindings, but we run a rebalancing afterward. + output_ordinal_t output = 0; + while (!allq.empty()) { + input_info_t *input = allq.top(); + allq.pop(); + output_ordinal_t target = output; + if (!input->binding.empty()) + target = *input->binding.begin(); + else + output = (output + 1) % outputs_.size(); + this->add_to_ready_queue(target, input); + } + stream_status_t status = rebalance_queues(0, {}); + if (status != sched_type_t::STATUS_OK) { + VPRINT(this, 0, "Failed to rebalance with status %d\n", status); + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + for (int i = 0; i < static_cast(outputs_.size()); ++i) { + input_info_t *queue_next; +#ifndef NDEBUG + status = +#endif + this->pop_from_ready_queue(i, i, queue_next); + assert(status == sched_type_t::STATUS_OK || status == sched_type_t::STATUS_IDLE); + if (queue_next == nullptr) + set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); + else + set_cur_input(i, queue_next->index); + } + VPRINT(this, 2, "Initial queues:\n"); + VDO(this, 2, { this->print_queue_stats(); }); + + return sched_type_t::STATUS_SUCCESS; +} + +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::set_output_active( + output_ordinal_t output, bool active) +{ + if (outputs_[output].active->load(std::memory_order_acquire) == active) + return sched_type_t::STATUS_OK; + outputs_[output].active->store(active, std::memory_order_release); + VPRINT(this, 2, "Output stream %d is now %s\n", output, + active ? "active" : "inactive"); + std::vector ordinals; + if (!active) { + // Make the now-inactive output's input available for other cores. + // This will reset its quantum too. + // We aren't switching on a just-read instruction not passed to the consumer, + // if the queue is empty. + input_ordinal_t cur_input = outputs_[output].cur_input; + if (cur_input != sched_type_t::INVALID_INPUT_ORDINAL) { + if (inputs_[cur_input].queue.empty()) + inputs_[cur_input].switching_pre_instruction = true; + set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); + } + // Move the ready_queue to other outputs. + { + auto lock = this->acquire_scoped_output_lock_if_necessary(output); + while (!outputs_[output].ready_queue.queue.empty()) { + input_info_t *tomove = outputs_[output].ready_queue.queue.top(); + ordinals.push_back(tomove->index); + outputs_[output].ready_queue.queue.pop(); + } + } + } else { + outputs_[output].waiting = true; + } + return rebalance_queues(output, ordinals); +} + template typename scheduler_tmpl_t::stream_status_t scheduler_dynamic_tmpl_t::pick_next_input_for_mode( @@ -58,17 +175,16 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( input_ordinal_t &index) { uint64_t cur_time = this->get_output_time(output); - uint64_t last_time = this->last_rebalance_time_.load(std::memory_order_acquire); + uint64_t last_time = last_rebalance_time_.load(std::memory_order_acquire); if (last_time == 0) { // Initialize. - this->last_rebalance_time_.store(cur_time, std::memory_order_release); + last_rebalance_time_.store(cur_time, std::memory_order_release); } else { // Guard against time going backward, which happens: i#6966. if (cur_time > last_time && - cur_time - last_time >= - static_cast(this->options_.rebalance_period_us * - this->options_.time_units_per_us) && - this->rebalancer_.load(std::memory_order_acquire) == std::thread::id()) { + cur_time - last_time >= static_cast(options_.rebalance_period_us * + options_.time_units_per_us) && + rebalancer_.load(std::memory_order_acquire) == std::thread::id()) { VPRINT(this, 2, "Output %d hit rebalance period @%" PRIu64 " (last rebalance @%" PRIu64 ")\n", @@ -79,25 +195,24 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( } } if (blocked_time > 0 && prev_index != sched_type_t::INVALID_INPUT_ORDINAL) { - std::lock_guard lock(*this->inputs_[prev_index].lock); - if (this->inputs_[prev_index].blocked_time == 0) { + std::lock_guard lock(*inputs_[prev_index].lock); + if (inputs_[prev_index].blocked_time == 0) { VPRINT(this, 2, "next_record[%d]: blocked time %" PRIu64 "\n", output, blocked_time); - this->inputs_[prev_index].blocked_time = blocked_time; - this->inputs_[prev_index].blocked_start_time = this->get_output_time(output); + inputs_[prev_index].blocked_time = blocked_time; + inputs_[prev_index].blocked_start_time = this->get_output_time(output); } } if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL && - this->inputs_[prev_index].switch_to_input != - sched_type_t::INVALID_INPUT_ORDINAL) { - input_info_t *target = &this->inputs_[this->inputs_[prev_index].switch_to_input]; - this->inputs_[prev_index].switch_to_input = sched_type_t::INVALID_INPUT_ORDINAL; + inputs_[prev_index].switch_to_input != sched_type_t::INVALID_INPUT_ORDINAL) { + input_info_t *target = &inputs_[inputs_[prev_index].switch_to_input]; + inputs_[prev_index].switch_to_input = sched_type_t::INVALID_INPUT_ORDINAL; std::unique_lock target_input_lock(*target->lock); // XXX i#5843: Add an invariant check that the next timestamp of the // target is later than the pre-switch-syscall timestamp? if (target->containing_output != sched_type_t::INVALID_OUTPUT_ORDINAL) { output_ordinal_t target_output = target->containing_output; - output_info_t &out = this->outputs_[target->containing_output]; + output_info_t &out = outputs_[target->containing_output]; // We cannot hold an input lock when we acquire an output lock. target_input_lock.unlock(); { @@ -110,7 +225,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( "input %d " "@%" PRIu64 "\n", output, prev_index, target->index, - this->inputs_[prev_index].reader->get_last_timestamp()); + inputs_[prev_index].reader->get_last_timestamp()); out.ready_queue.queue.erase(target); index = target->index; // Erase any remaining wait time for the target. @@ -125,10 +240,10 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( target->unscheduled = false; } if (target->containing_output != output) { - ++this->outputs_[output] + ++outputs_[output] .stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; } - ++this->outputs_[output] + ++outputs_[output] .stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; } // Else, actively running. target_input_lock.unlock(); @@ -146,12 +261,12 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( "was-unscheduled input %d " "@%" PRIu64 "\n", output, prev_index, target->index, - this->inputs_[prev_index].reader->get_last_timestamp()); + inputs_[prev_index].reader->get_last_timestamp()); if (target->prev_output != sched_type_t::INVALID_OUTPUT_ORDINAL && target->prev_output != output) { - ++this->outputs_[output].stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; + ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_MIGRATIONS]; } - ++this->outputs_[output] + ++outputs_[output] .stats[memtrace_stream_t::SCHED_STAT_DIRECT_SWITCH_SUCCESSES]; } if (index == sched_type_t::INVALID_INPUT_ORDINAL) { @@ -166,7 +281,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( "Direct switch (from %d) target input #%d is running " "elsewhere; picking a different target @%" PRIu64 "\n", prev_index, target->index, - this->inputs_[prev_index].reader->get_last_timestamp()); + inputs_[prev_index].reader->get_last_timestamp()); // We do ensure the missed target doesn't wait indefinitely. // XXX i#6822: It's not clear this is always the right thing to // do. @@ -189,21 +304,19 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( if (status != sched_type_t::STATUS_STOLE) return status; // eof_or_idle stole an input for us, now in .cur_input. - index = this->outputs_[output].cur_input; + index = outputs_[output].cur_input; return sched_type_t::STATUS_OK; } else { - auto lock = - std::unique_lock(*this->inputs_[prev_index].lock); + auto lock = std::unique_lock(*inputs_[prev_index].lock); // If we can't go back to the current input because it's EOF // or unscheduled indefinitely (we already checked blocked_time // above: it's 0 here), this output is either idle or EOF. - if (this->inputs_[prev_index].at_eof || - this->inputs_[prev_index].unscheduled) { + if (inputs_[prev_index].at_eof || inputs_[prev_index].unscheduled) { lock.unlock(); stream_status_t status = this->eof_or_idle(output, prev_index); if (status != sched_type_t::STATUS_STOLE) return status; - index = this->outputs_[output].cur_input; + index = outputs_[output].cur_input; return sched_type_t::STATUS_OK; } else index = prev_index; // Go back to prior. @@ -216,22 +329,22 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( // shouldn't switch. The queue preserves FIFO for same-priority // cases so we will switch if someone of equal priority is // waiting. - this->set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); + set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); input_info_t *queue_next = nullptr; stream_status_t status = this->pop_from_ready_queue(output, output, queue_next); if (status != sched_type_t::STATUS_OK) { if (status == sched_type_t::STATUS_IDLE) { - this->outputs_[output].waiting = true; - if (this->options_.schedule_record_ostream != nullptr) { + outputs_[output].waiting = true; + if (options_.schedule_record_ostream != nullptr) { stream_status_t record_status = this->record_schedule_segment( output, schedule_record_t::IDLE_BY_COUNT, 0, // Start prior to this idle. - this->outputs_[output].idle_count - 1, 0); + outputs_[output].idle_count - 1, 0); if (record_status != sched_type_t::STATUS_OK) return record_status; } if (prev_index != sched_type_t::INVALID_INPUT_ORDINAL) { - ++this->outputs_[output] + ++outputs_[output] .stats[memtrace_stream_t::SCHED_STAT_SWITCH_INPUT_TO_IDLE]; } } @@ -241,7 +354,7 @@ scheduler_dynamic_tmpl_t::pick_next_input_for_mode( status = this->eof_or_idle(output, prev_index); if (status != sched_type_t::STATUS_STOLE) return status; - index = this->outputs_[output].cur_input; + index = outputs_[output].cur_input; return sched_type_t::STATUS_OK; } else index = queue_next->index; @@ -267,8 +380,7 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( // boundaries so we live with those being before the switch. // XXX: Once we insert kernel traces, we may have to try harder // to stop before the post-syscall records. - if (this->record_type_is_instr_boundary(record, - this->outputs_[output].last_record)) { + if (this->record_type_is_instr_boundary(record, outputs_[output].last_record)) { if (input->switch_to_input != sched_type_t::INVALID_INPUT_ORDINAL) { // The switch request overrides any latency threshold. need_new_input = true; @@ -300,12 +412,12 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( input->syscall_timeout_arg = 0; } } - if (this->outputs_[output].hit_switch_code_end) { + if (outputs_[output].hit_switch_code_end) { // We have to delay so the end marker is still in_context_switch_code. - this->outputs_[output].in_context_switch_code = false; - this->outputs_[output].hit_switch_code_end = false; + outputs_[output].in_context_switch_code = false; + outputs_[output].hit_switch_code_end = false; // We're now back "on the clock". - if (this->options_.quantum_unit == sched_type_t::QUANTUM_TIME) + if (options_.quantum_unit == sched_type_t::QUANTUM_TIME) input->prev_time_in_quantum = cur_time; // XXX: If we add a skip feature triggered on the output stream, // we'll want to make sure skipping while in these switch and kernel @@ -314,11 +426,11 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( if (this->record_type_is_marker(record, marker_type, marker_value)) { this->process_marker(*input, output, marker_type, marker_value); } - if (this->options_.quantum_unit == sched_type_t::QUANTUM_INSTRUCTIONS && - this->record_type_is_instr_boundary(record, this->outputs_[output].last_record) && - !this->outputs_[output].in_kernel_code) { + if (options_.quantum_unit == sched_type_t::QUANTUM_INSTRUCTIONS && + this->record_type_is_instr_boundary(record, outputs_[output].last_record) && + !outputs_[output].in_kernel_code) { ++input->instrs_in_quantum; - if (input->instrs_in_quantum > this->options_.quantum_duration_instrs) { + if (input->instrs_in_quantum > options_.quantum_duration_instrs) { // We again prefer to switch to another input even if the current // input has the oldest timestamp, prioritizing context switches // over timestamp ordering. @@ -327,10 +439,9 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( preempt = true; need_new_input = true; input->instrs_in_quantum = 0; - ++this->outputs_[output] - .stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS]; + ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS]; } - } else if (this->options_.quantum_unit == sched_type_t::QUANTUM_TIME) { + } else if (options_.quantum_unit == sched_type_t::QUANTUM_TIME) { if (cur_time == 0 || cur_time < input->prev_time_in_quantum) { VPRINT(this, 1, "next_record[%d]: invalid time %" PRIu64 " vs start %" PRIu64 "\n", @@ -340,13 +451,12 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( input->time_spent_in_quantum += cur_time - input->prev_time_in_quantum; input->prev_time_in_quantum = cur_time; double elapsed_micros = static_cast(input->time_spent_in_quantum) / - this->options_.time_units_per_us; - if (elapsed_micros >= this->options_.quantum_duration_us && + options_.time_units_per_us; + if (elapsed_micros >= options_.quantum_duration_us && // We only switch on instruction boundaries. We could possibly switch // in between (e.g., scatter/gather long sequence of reads/writes) by // setting input->switching_pre_instruction. - this->record_type_is_instr_boundary(record, - this->outputs_[output].last_record)) { + this->record_type_is_instr_boundary(record, outputs_[output].last_record)) { VPRINT(this, 4, "next_record[%d]: input %d hit end of time quantum after %" PRIu64 "\n", @@ -354,8 +464,7 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( preempt = true; need_new_input = true; input->time_spent_in_quantum = 0; - ++this->outputs_[output] - .stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS]; + ++outputs_[output].stats[memtrace_stream_t::SCHED_STAT_QUANTUM_PREEMPTS]; } } // For sched_type_t::DEPENDENCY_TIMESTAMPS: enforcing asked-for @@ -366,6 +475,147 @@ scheduler_dynamic_tmpl_t::check_for_input_switch( return sched_type_t::STATUS_OK; } +template +typename scheduler_tmpl_t::stream_status_t +scheduler_dynamic_tmpl_t::rebalance_queues( + output_ordinal_t triggering_output, std::vector inputs_to_add) +{ + std::thread::id nobody; + if (!rebalancer_.compare_exchange_weak(nobody, std::this_thread::get_id(), + std::memory_order_release, + std::memory_order_relaxed)) { + // Someone else is rebalancing. + return sched_type_t::STATUS_OK; + } + stream_status_t status = sched_type_t::STATUS_OK; + assert(options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT); + VPRINT(this, 1, "Output %d triggered a rebalance @%" PRIu64 ":\n", triggering_output, + this->get_output_time(triggering_output)); + // First, update the time to avoid more threads coming here. + last_rebalance_time_.store(this->get_output_time(triggering_output), + std::memory_order_release); + VPRINT(this, 2, "Before rebalance:\n"); + VDO(this, 2, { this->print_queue_stats(); }); + ++outputs_[triggering_output] + .stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_REBALANCES]; + + // Workaround to avoid hangs when _SCHEDULE and/or _DIRECT_THREAD_SWITCH + // directives miss their targets (due to running with a subset of the + // original threads, or other scenarios) and we end up with no scheduled + // inputs but a set of unscheduled inputs who will never be scheduled. + // TODO i#6959: Just exit early instead, maybe under a flag. + // It would help to see what % of total records we've processed. + size_t unsched_size = 0; + { + std::lock_guard unsched_lock(*unscheduled_priority_.lock); + unsched_size = unscheduled_priority_.queue.size(); + } + if (this->live_input_count_.load(std::memory_order_acquire) == + static_cast(unsched_size)) { + VPRINT( + this, 1, + "rebalancing moving entire unscheduled queue (%zu entries) to ready_queues\n", + unsched_size); + { + std::lock_guard unsched_lock(*unscheduled_priority_.lock); + while (!unscheduled_priority_.queue.empty()) { + input_info_t *tomove = unscheduled_priority_.queue.top(); + inputs_to_add.push_back(tomove->index); + unscheduled_priority_.queue.pop(); + } + } + for (input_ordinal_t input : inputs_to_add) { + std::lock_guard lock(*inputs_[input].lock); + inputs_[input].unscheduled = false; + } + } + + int live_inputs = this->live_input_count_.load(std::memory_order_acquire); + int live_outputs = 0; + for (unsigned int i = 0; i < outputs_.size(); ++i) { + if (outputs_[i].active->load(std::memory_order_acquire)) + ++live_outputs; + } + double avg_per_output = live_inputs / static_cast(live_outputs); + unsigned int avg_ceiling = static_cast(std::ceil(avg_per_output)); + unsigned int avg_floor = static_cast(std::floor(avg_per_output)); + int iteration = 0; + do { + // Walk the outputs, filling too-short queues from inputs_to_add and + // shrinking too-long queues into inputs_to_add. We may need a 2nd pass + // for this; and a 3rd pass if bindings prevent even splitting. + VPRINT( + this, 3, + "Rebalance iteration %d inputs_to_add size=%zu avg_per_output=%4.1f %d-%d\n", + iteration, inputs_to_add.size(), avg_per_output, avg_floor, avg_ceiling); + // We're giving up the output locks as we go, so there may be some stealing + // in the middle of our operation, but the rebalancing is approximate anyway. + for (unsigned int i = 0; i < outputs_.size(); ++i) { + if (!outputs_[i].active->load(std::memory_order_acquire)) + continue; + auto lock = this->acquire_scoped_output_lock_if_necessary(i); + // Only remove on the 1st iteration; later we can exceed due to binding + // constraints. + while (iteration == 0 && outputs_[i].ready_queue.queue.size() > avg_ceiling) { + input_info_t *queue_next = nullptr; + // We use our regular pop_from_ready_queue which means we leave + // blocked inputs on the queue: those do not get rebalanced. + // XXX: Should we revisit that? + // + // We remove from the back to avoid penalizing the next-to-run entries + // at the front of the queue by putting them at the back of another + // queue. + status = this->pop_from_ready_queue_hold_locks( + i, sched_type_t::INVALID_OUTPUT_ORDINAL, queue_next, + /*from_back=*/true); + if (status == sched_type_t::STATUS_OK && queue_next != nullptr) { + VPRINT(this, 3, + "Rebalance iteration %d: output %d giving up input %d\n", + iteration, i, queue_next->index); + inputs_to_add.push_back(queue_next->index); + } else + break; + } + std::vector incompatible_inputs; + // If we reach the 3rd iteration, we have fussy inputs with bindings. + // Try to add them to every output. + while ( + (outputs_[i].ready_queue.queue.size() < avg_ceiling || iteration > 1) && + !inputs_to_add.empty()) { + input_ordinal_t ordinal = inputs_to_add.back(); + inputs_to_add.pop_back(); + input_info_t &input = inputs_[ordinal]; + std::lock_guard input_lock(*input.lock); + if (input.binding.empty() || + input.binding.find(i) != input.binding.end()) { + VPRINT(this, 3, "Rebalance iteration %d: output %d taking input %d\n", + iteration, i, ordinal); + this->add_to_ready_queue_hold_locks(i, &input); + } else { + incompatible_inputs.push_back(ordinal); + } + } + inputs_to_add.insert(inputs_to_add.end(), incompatible_inputs.begin(), + incompatible_inputs.end()); + } + ++iteration; + if (iteration >= 3 && !inputs_to_add.empty()) { + // This is possible with bindings limited to inactive outputs. + // XXX: Rather than return an error, we could add to the unscheduled queue, + // but do not mark the input unscheduled. Then when an output is + // marked active, we could walk the unscheduled queue and take + // inputs not marked unscheduled. + VPRINT(this, 1, "Rebalance hit impossible binding\n"); + status = sched_type_t::STATUS_IMPOSSIBLE_BINDING; + break; + } + } while (!inputs_to_add.empty()); + VPRINT(this, 2, "After:\n"); + VDO(this, 2, { this->print_queue_stats(); }); + rebalancer_.store(std::thread::id(), std::memory_order_release); + return status; +} + template class scheduler_dynamic_tmpl_t; template class scheduler_dynamic_tmpl_t; diff --git a/clients/drcachesim/scheduler/scheduler_fixed.cpp b/clients/drcachesim/scheduler/scheduler_fixed.cpp index e9c1ed9f9e7..2b7adee2be7 100644 --- a/clients/drcachesim/scheduler/scheduler_fixed.cpp +++ b/clients/drcachesim/scheduler/scheduler_fixed.cpp @@ -35,11 +35,9 @@ #include "scheduler.h" #include "scheduler_impl.h" -#include #include #include #include -#include #include "memref.h" #include "mutex_dbg_owned.h" @@ -50,19 +48,64 @@ namespace dynamorio { namespace drmemtrace { +template +typename scheduler_tmpl_t::scheduler_status_t +scheduler_fixed_tmpl_t::set_initial_schedule( + std::unordered_map> &workload2inputs) +{ + if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) { + // Assign the inputs up front to avoid locks once we're in parallel mode. + // We use a simple round-robin static assignment for now. + for (int i = 0; i < static_cast(inputs_.size()); ++i) { + size_t index = i % outputs_.size(); + if (outputs_[index].input_indices.empty()) + set_cur_input(static_cast(index), i); + outputs_[index].input_indices.push_back(i); + VPRINT(this, 2, "Assigning input #%d to output #%zd\n", i, index); + } + } else if (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT) { + if (options_.replay_as_traced_istream != nullptr) { + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } else if (outputs_.size() > 1) { + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } else if (inputs_.size() == 1) { + set_cur_input(0, 0); + } else { + // The old file_reader_t interleaving would output the top headers for every + // thread first and then pick the oldest timestamp once it reached a + // timestamp. We instead queue those headers so we can start directly with the + // oldest timestamp's thread. + uint64_t min_time = std::numeric_limits::max(); + input_ordinal_t min_input = -1; + for (int i = 0; i < static_cast(inputs_.size()); ++i) { + if (inputs_[i].next_timestamp < min_time) { + min_time = inputs_[i].next_timestamp; + min_input = i; + } + } + if (min_input < 0) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + set_cur_input(0, static_cast(min_input)); + } + } else { + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + return sched_type_t::STATUS_SUCCESS; +} + template typename scheduler_tmpl_t::stream_status_t scheduler_fixed_tmpl_t::pick_next_input_for_mode( output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) { - if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { + if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { uint64_t min_time = std::numeric_limits::max(); - for (size_t i = 0; i < this->inputs_.size(); ++i) { - std::lock_guard lock(*this->inputs_[i].lock); - if (!this->inputs_[i].at_eof && this->inputs_[i].next_timestamp > 0 && - this->inputs_[i].next_timestamp < min_time) { - min_time = this->inputs_[i].next_timestamp; + for (size_t i = 0; i < inputs_.size(); ++i) { + std::lock_guard lock(*inputs_[i].lock); + if (!inputs_[i].at_eof && inputs_[i].next_timestamp > 0 && + inputs_[i].next_timestamp < min_time) { + min_time = inputs_[i].next_timestamp; index = static_cast(i); } } @@ -70,25 +113,24 @@ scheduler_fixed_tmpl_t::pick_next_input_for_mode( stream_status_t status = this->eof_or_idle(output, prev_index); if (status != sched_type_t::STATUS_STOLE) return status; - index = this->outputs_[output].cur_input; + index = outputs_[output].cur_input; return sched_type_t::STATUS_OK; } VPRINT(this, 2, "next_record[%d]: advancing to timestamp %" PRIu64 " == input #%d\n", output, min_time, index); - } else if (this->options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) { + } else if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) { // We're done with the prior thread; take the next one that was // pre-allocated to this output (pre-allocated to avoid locks). Invariant: // the same output will not be accessed by two different threads // simultaneously in this mode, allowing us to support a lock-free // parallel-friendly increment here. - int indices_index = ++this->outputs_[output].input_indices_index; - if (indices_index >= - static_cast(this->outputs_[output].input_indices.size())) { + int indices_index = ++outputs_[output].input_indices_index; + if (indices_index >= static_cast(outputs_[output].input_indices.size())) { VPRINT(this, 2, "next_record[%d]: all at eof\n", output); return sched_type_t::STATUS_EOF; } - index = this->outputs_[output].input_indices[indices_index]; + index = outputs_[output].input_indices[indices_index]; VPRINT(this, 2, "next_record[%d]: advancing to local index %d == input #%d\n", output, indices_index, index); } else @@ -103,7 +145,7 @@ scheduler_fixed_tmpl_t::check_for_input_switch( output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time, bool &need_new_input, bool &preempt, uint64_t &blocked_time) { - if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS && + if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS && this->record_type_is_timestamp(record, input->next_timestamp)) need_new_input = true; return sched_type_t::STATUS_OK; diff --git a/clients/drcachesim/scheduler/scheduler_impl.cpp b/clients/drcachesim/scheduler/scheduler_impl.cpp index a5a553a3e67..10e53782773 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.cpp +++ b/clients/drcachesim/scheduler/scheduler_impl.cpp @@ -871,6 +871,35 @@ scheduler_impl_tmpl_t::init( if (res != sched_type_t::STATUS_SUCCESS) return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + // Determine whether we need to read ahead in the inputs. There are cases where we + // do not want to do that as it would block forever if the inputs are not available + // (e.g., online analysis IPC readers); it also complicates ordinals so we avoid it + // if we can and enumerate all the cases that do need it. + bool gather_timestamps = false; + if (((options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY || + options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT) && + options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) || + (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT && + options_.replay_as_traced_istream == nullptr && inputs_.size() > 1)) { + gather_timestamps = true; + if (!options_.read_inputs_in_init) { + error_string_ = "Timestamp dependencies require read_inputs_in_init"; + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + } + // The filetype, if present, is before the first timestamp. If we only need the + // filetype we avoid going as far as the timestamp. + bool gather_filetype = options_.read_inputs_in_init; + if (gather_filetype || gather_timestamps) { + scheduler_status_t res = this->get_initial_input_content(gather_timestamps); + if (res != sched_type_t::STATUS_SUCCESS) { + error_string_ = "Failed to read initial input contents for filetype"; + if (gather_timestamps) + error_string_ += " and initial timestamps"; + return res; + } + } + return set_initial_schedule(workload2inputs); } @@ -941,176 +970,6 @@ scheduler_impl_tmpl_t::legacy_field_support() return sched_type_t::STATUS_SUCCESS; } -template -typename scheduler_tmpl_t::scheduler_status_t -scheduler_impl_tmpl_t::set_initial_schedule( - std::unordered_map> &workload2inputs) -{ - // Determine whether we need to read ahead in the inputs. There are cases where we - // do not want to do that as it would block forever if the inputs are not available - // (e.g., online analysis IPC readers); it also complicates ordinals so we avoid it - // if we can and enumerate all the cases that do need it. - bool gather_timestamps = false; - if (((options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY || - options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT) && - options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) || - (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT && - options_.replay_as_traced_istream == nullptr && inputs_.size() > 1)) { - gather_timestamps = true; - if (!options_.read_inputs_in_init) { - error_string_ = "Timestamp dependencies require read_inputs_in_init"; - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } - } - // The filetype, if present, is before the first timestamp. If we only need the - // filetype we avoid going as far as the timestamp. - bool gather_filetype = options_.read_inputs_in_init; - if (gather_filetype || gather_timestamps) { - scheduler_status_t res = get_initial_input_content(gather_timestamps); - if (res != sched_type_t::STATUS_SUCCESS) { - error_string_ = "Failed to read initial input contents for filetype"; - if (gather_timestamps) - error_string_ += " and initial timestamps"; - return res; - } - } - - if (options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY) { - live_replay_output_count_.store(static_cast(outputs_.size()), - std::memory_order_release); - if (options_.schedule_replay_istream == nullptr || - options_.schedule_record_ostream != nullptr) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - scheduler_status_t status = read_recorded_schedule(); - if (status != sched_type_t::STATUS_SUCCESS) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { - // Match the ordinals from the original run by pre-reading the timestamps. - assert(gather_timestamps); - } - } else if (options_.schedule_replay_istream != nullptr) { - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } else if (options_.mapping == sched_type_t::MAP_TO_CONSISTENT_OUTPUT) { - // Assign the inputs up front to avoid locks once we're in parallel mode. - // We use a simple round-robin static assignment for now. - for (int i = 0; i < static_cast(inputs_.size()); ++i) { - size_t index = i % outputs_.size(); - if (outputs_[index].input_indices.empty()) - set_cur_input(static_cast(index), i); - outputs_[index].input_indices.push_back(i); - VPRINT(this, 2, "Assigning input #%d to output #%zd\n", i, index); - } - } else if (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT) { - if (options_.replay_as_traced_istream != nullptr) { - // Even for just one output we honor a request to replay the schedule - // (although it should match the analyzer serial mode so there's no big - // benefit to reading the schedule file. The analyzer serial mode or other - // special cases of one output don't set the replay_as_traced_istream - // field.) - scheduler_status_t status = read_and_instantiate_traced_schedule(); - if (status != sched_type_t::STATUS_SUCCESS) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - // Now leverage the regular replay code. - options_.mapping = sched_type_t::MAP_AS_PREVIOUSLY; - } else if (outputs_.size() > 1) { - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } else if (inputs_.size() == 1) { - set_cur_input(0, 0); - } else { - // The old file_reader_t interleaving would output the top headers for every - // thread first and then pick the oldest timestamp once it reached a - // timestamp. We instead queue those headers so we can start directly with the - // oldest timestamp's thread. - assert(gather_timestamps); - uint64_t min_time = std::numeric_limits::max(); - input_ordinal_t min_input = -1; - for (int i = 0; i < static_cast(inputs_.size()); ++i) { - if (inputs_[i].next_timestamp < min_time) { - min_time = inputs_[i].next_timestamp; - min_input = i; - } - } - if (min_input < 0) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - set_cur_input(0, static_cast(min_input)); - } - } else { - // Assign initial inputs. - if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { - assert(gather_timestamps); - // Compute the min timestamp (==base_timestamp) per workload and sort - // all inputs by relative time from the base. - for (int workload_idx = 0; - workload_idx < static_cast(workload2inputs.size()); - ++workload_idx) { - uint64_t min_time = std::numeric_limits::max(); - input_ordinal_t min_input = -1; - for (int input_idx : workload2inputs[workload_idx]) { - if (inputs_[input_idx].next_timestamp < min_time) { - min_time = inputs_[input_idx].next_timestamp; - min_input = input_idx; - } - } - if (min_input < 0) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - for (int input_idx : workload2inputs[workload_idx]) { - VPRINT(this, 4, - "workload %d: setting input %d base_timestamp to %" PRIu64 - " vs next_timestamp %zu\n", - workload_idx, input_idx, min_time, - inputs_[input_idx].next_timestamp); - inputs_[input_idx].base_timestamp = min_time; - inputs_[input_idx].order_by_timestamp = true; - } - } - // We'll pick the starting inputs below by sorting by relative time from - // each workload's base_timestamp, which our queue does for us. - } - // First, put all inputs into a temporary queue to sort by priority and - // time for us. - flexible_queue_t allq; - for (int i = 0; i < static_cast(inputs_.size()); ++i) { - inputs_[i].queue_counter = i; - allq.push(&inputs_[i]); - } - // Now assign round-robin to the outputs. We have to obey bindings here: we - // just take the first. This isn't guaranteed to be perfect if there are - // many bindings, but we run a rebalancing afterward. - output_ordinal_t output = 0; - while (!allq.empty()) { - input_info_t *input = allq.top(); - allq.pop(); - output_ordinal_t target = output; - if (!input->binding.empty()) - target = *input->binding.begin(); - else - output = (output + 1) % outputs_.size(); - add_to_ready_queue(target, input); - } - stream_status_t status = rebalance_queues(0, {}); - if (status != sched_type_t::STATUS_OK) { - VPRINT(this, 0, "Failed to rebalance with status %d\n", status); - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } - for (int i = 0; i < static_cast(outputs_.size()); ++i) { - input_info_t *queue_next; -#ifndef NDEBUG - status = -#endif - pop_from_ready_queue(i, i, queue_next); - assert(status == sched_type_t::STATUS_OK || - status == sched_type_t::STATUS_IDLE); - if (queue_next == nullptr) - set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); - else - set_cur_input(i, queue_next->index); - } - VPRINT(this, 2, "Initial queues:\n"); - VDO(this, 2, { print_queue_stats(); }); - } - return sched_type_t::STATUS_SUCCESS; -} - template std::string scheduler_impl_tmpl_t::recorded_schedule_component_name( @@ -1149,211 +1008,6 @@ scheduler_impl_tmpl_t::write_recorded_schedule() return sched_type_t::STATUS_SUCCESS; } -template -typename scheduler_tmpl_t::scheduler_status_t -scheduler_impl_tmpl_t::read_recorded_schedule() -{ - if (options_.schedule_replay_istream == nullptr) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - - schedule_record_t record; - // We assume we can easily fit the whole context switch sequence in memory. - // If that turns out not to be the case for very long traces, we deliberately - // used an archive format so we could do parallel incremental reads. - // (Conversely, if we want to commit to storing in memory, we could use a - // non-archive format and store the output ordinal in the version record.) - for (int i = 0; i < static_cast(outputs_.size()); ++i) { - std::string err = options_.schedule_replay_istream->open_component( - recorded_schedule_component_name(i)); - if (!err.empty()) { - error_string_ = "Failed to open schedule_replay_istream component " + - recorded_schedule_component_name(i) + ": " + err; - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } - // XXX: This could be made more efficient if we stored the record count - // in the version field's stop_instruction field or something so we can - // size the vector up front. As this only happens once we do not bother - // and live with a few vector resizes. - bool saw_footer = false; - while (options_.schedule_replay_istream->read(reinterpret_cast(&record), - sizeof(record))) { - if (record.type == schedule_record_t::VERSION) { - if (record.key.version != schedule_record_t::VERSION_CURRENT) - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } else if (record.type == schedule_record_t::FOOTER) { - saw_footer = true; - break; - } else - outputs_[i].record.push_back(record); - } - if (!saw_footer) { - error_string_ = "Record file missing footer"; - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } - VPRINT(this, 1, "Read %zu recorded records for output #%d\n", - outputs_[i].record.size(), i); - } - // See if there was more data in the file (we do this after reading to not - // mis-report i/o or path errors as this error). - std::string err = options_.schedule_replay_istream->open_component( - recorded_schedule_component_name(static_cast(outputs_.size()))); - if (err.empty()) { - error_string_ = "Not enough output streams for recorded file"; - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } - for (int i = 0; i < static_cast(outputs_.size()); ++i) { - if (outputs_[i].record.empty()) { - // XXX i#6630: We should auto-set the output count and avoid - // having extra outputs; these complicate idle computations, etc. - VPRINT(this, 1, "output %d empty: returning eof up front\n", i); - set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); - outputs_[i].at_eof = true; - } else if (outputs_[i].record[0].type == schedule_record_t::IDLE || - outputs_[i].record[0].type == schedule_record_t::IDLE_BY_COUNT) { - set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); - outputs_[i].waiting = true; - if (outputs_[i].record[0].type == schedule_record_t::IDLE) { - // Convert a legacy idle duration from microseconds to record counts. - outputs_[i].record[0].value.idle_duration = - static_cast(options_.time_units_per_us * - outputs_[i].record[0].value.idle_duration); - } - outputs_[i].idle_start_count = -1; // Updated on first next_record(). - VPRINT(this, 3, "output %d starting out idle\n", i); - } else { - assert(outputs_[i].record[0].type == schedule_record_t::DEFAULT); - set_cur_input(i, outputs_[i].record[0].key.input); - } - } - return sched_type_t::STATUS_SUCCESS; -} - -template -typename scheduler_tmpl_t::scheduler_status_t -scheduler_impl_tmpl_t::read_and_instantiate_traced_schedule() -{ - std::vector> start2stop(inputs_.size()); - // We also want to collapse same-cpu consecutive records so we start with - // a temporary local vector. - std::vector> all_sched(outputs_.size()); - // Work around i#6107 by tracking counts sorted by timestamp for each input. - std::vector> input_sched(inputs_.size()); - // These hold entries added in the on-disk (unsorted) order. - std::vector disk_ord2index; // Initially [i] holds i. - std::vector disk_ord2cpuid; // [i] holds cpuid for entry i. - scheduler_status_t res = read_traced_schedule(input_sched, start2stop, all_sched, - disk_ord2index, disk_ord2cpuid); - if (res != sched_type_t::STATUS_SUCCESS) - return res; - // Sort by cpuid to get a more natural ordering. - // Probably raw2trace should do this in the first place, but we have many - // schedule files already out there so we still need a sort here. - // If we didn't have cross-indices pointing at all_sched from input_sched, we - // would just sort all_sched: but instead we have to construct a separate - // ordering structure. - std::sort(disk_ord2index.begin(), disk_ord2index.end(), - [disk_ord2cpuid](const output_ordinal_t &l, const output_ordinal_t &r) { - return disk_ord2cpuid[l] < disk_ord2cpuid[r]; - }); - // disk_ord2index[i] used to hold i; now after sorting it holds the ordinal in - // the disk file that has the ith largest cpuid. We need to turn that into - // the output_idx ordinal for the cpu at ith ordinal in the disk file, for - // which we use a new vector disk_ord2output. - // E.g., if the original file was in this order disk_ord2cpuid = {6,2,3,7}, - // disk_ord2index after sorting would hold {1,2,0,3}, which we want to turn - // into disk_ord2output = {2,0,1,3}. - std::vector disk_ord2output(disk_ord2index.size()); - for (size_t i = 0; i < disk_ord2index.size(); ++i) { - disk_ord2output[disk_ord2index[i]] = static_cast(i); - } - for (int disk_idx = 0; disk_idx < static_cast(outputs_.size()); - ++disk_idx) { - if (disk_idx >= static_cast(disk_ord2index.size())) { - // XXX i#6630: We should auto-set the output count and avoid - // having extra ouputs; these complicate idle computations, etc. - VPRINT(this, 1, "Output %d empty: returning eof up front\n", disk_idx); - outputs_[disk_idx].at_eof = true; - set_cur_input(disk_idx, sched_type_t::INVALID_INPUT_ORDINAL); - continue; - } - output_ordinal_t output_idx = disk_ord2output[disk_idx]; - VPRINT(this, 1, "Read %zu as-traced records for output #%d\n", - all_sched[disk_idx].size(), output_idx); - outputs_[output_idx].as_traced_cpuid = disk_ord2cpuid[disk_idx]; - VPRINT(this, 1, "Output #%d is as-traced CPU #%" PRId64 "\n", output_idx, - outputs_[output_idx].as_traced_cpuid); - // Update the stop_instruction field and collapse consecutive entries while - // inserting into the final location. - int start_consec = -1; - for (int sched_idx = 0; sched_idx < static_cast(all_sched[disk_idx].size()); - ++sched_idx) { - auto &segment = all_sched[disk_idx][sched_idx]; - if (!segment.valid) - continue; - auto find = start2stop[segment.input].find(segment.start_instruction); - ++find; - if (find == start2stop[segment.input].end()) - segment.stop_instruction = std::numeric_limits::max(); - else - segment.stop_instruction = *find; - VPRINT(this, 4, - "as-read segment #%d: input=%d start=%" PRId64 " stop=%" PRId64 - " time=%" PRId64 "\n", - sched_idx, segment.input, segment.start_instruction, - segment.stop_instruction, segment.timestamp); - if (sched_idx + 1 < static_cast(all_sched[disk_idx].size()) && - segment.input == all_sched[disk_idx][sched_idx + 1].input && - segment.stop_instruction > - all_sched[disk_idx][sched_idx + 1].start_instruction) { - // A second sanity check. - error_string_ = "Invalid decreasing start field in schedule file"; - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } else if (sched_idx + 1 < static_cast(all_sched[disk_idx].size()) && - segment.input == all_sched[disk_idx][sched_idx + 1].input && - segment.stop_instruction == - all_sched[disk_idx][sched_idx + 1].start_instruction) { - // Collapse into next. - if (start_consec == -1) - start_consec = sched_idx; - } else { - schedule_output_tracker_t &toadd = start_consec >= 0 - ? all_sched[disk_idx][start_consec] - : all_sched[disk_idx][sched_idx]; - outputs_[output_idx].record.emplace_back( - schedule_record_t::DEFAULT, toadd.input, toadd.start_instruction, - all_sched[disk_idx][sched_idx].stop_instruction, toadd.timestamp); - start_consec = -1; - VDO(this, 3, { - auto &added = outputs_[output_idx].record.back(); - VPRINT(this, 3, - "segment #%zu: input=%d start=%" PRId64 " stop=%" PRId64 - " time=%" PRId64 "\n", - outputs_[output_idx].record.size() - 1, added.key.input, - added.value.start_instruction, added.stop_instruction, - added.timestamp); - }); - } - } - VPRINT(this, 1, "Collapsed duplicates for %zu as-traced records for output #%d\n", - outputs_[output_idx].record.size(), output_idx); - if (outputs_[output_idx].record.empty()) { - error_string_ = "Empty as-traced schedule"; - return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; - } - if (outputs_[output_idx].record[0].value.start_instruction != 0) { - VPRINT(this, 1, "Initial input for output #%d is: wait state\n", output_idx); - set_cur_input(output_idx, sched_type_t::INVALID_INPUT_ORDINAL); - outputs_[output_idx].waiting = true; - outputs_[output_idx].record_index->store(-1, std::memory_order_release); - } else { - VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx, - outputs_[output_idx].record[0].key.input); - set_cur_input(output_idx, outputs_[output_idx].record[0].key.input); - } - } - return sched_type_t::STATUS_SUCCESS; -} - template typename scheduler_tmpl_t::scheduler_status_t scheduler_impl_tmpl_t::create_regions_from_times( @@ -3754,38 +3408,8 @@ typename scheduler_tmpl_t::stream_status_t scheduler_impl_tmpl_t::set_output_active(output_ordinal_t output, bool active) { - if (options_.mapping != sched_type_t::MAP_TO_ANY_OUTPUT) - return sched_type_t::STATUS_INVALID; - if (outputs_[output].active->load(std::memory_order_acquire) == active) - return sched_type_t::STATUS_OK; - outputs_[output].active->store(active, std::memory_order_release); - VPRINT(this, 2, "Output stream %d is now %s\n", output, - active ? "active" : "inactive"); - std::vector ordinals; - if (!active) { - // Make the now-inactive output's input available for other cores. - // This will reset its quantum too. - // We aren't switching on a just-read instruction not passed to the consumer, - // if the queue is empty. - input_ordinal_t cur_input = outputs_[output].cur_input; - if (cur_input != sched_type_t::INVALID_INPUT_ORDINAL) { - if (inputs_[cur_input].queue.empty()) - inputs_[cur_input].switching_pre_instruction = true; - set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); - } - // Move the ready_queue to other outputs. - { - auto lock = acquire_scoped_output_lock_if_necessary(output); - while (!outputs_[output].ready_queue.queue.empty()) { - input_info_t *tomove = outputs_[output].ready_queue.queue.top(); - ordinals.push_back(tomove->index); - outputs_[output].ready_queue.queue.pop(); - } - } - } else { - outputs_[output].waiting = true; - } - return rebalance_queues(output, ordinals); + // Only supported in scheduler_dynamic_tmpl_t subclass. + return sched_type_t::STATUS_INVALID; } template @@ -3828,147 +3452,6 @@ scheduler_impl_tmpl_t::print_queue_stats() VPRINT(this, 0, "%s\n", ostr.str().c_str()); } -template -typename scheduler_tmpl_t::stream_status_t -scheduler_impl_tmpl_t::rebalance_queues( - output_ordinal_t triggering_output, std::vector inputs_to_add) -{ - std::thread::id nobody; - if (!rebalancer_.compare_exchange_weak(nobody, std::this_thread::get_id(), - std::memory_order_release, - std::memory_order_relaxed)) { - // Someone else is rebalancing. - return sched_type_t::STATUS_OK; - } - stream_status_t status = sched_type_t::STATUS_OK; - assert(options_.mapping == sched_type_t::MAP_TO_ANY_OUTPUT); - VPRINT(this, 1, "Output %d triggered a rebalance @%" PRIu64 ":\n", triggering_output, - get_output_time(triggering_output)); - // First, update the time to avoid more threads coming here. - last_rebalance_time_.store(get_output_time(triggering_output), - std::memory_order_release); - VPRINT(this, 2, "Before rebalance:\n"); - VDO(this, 2, { print_queue_stats(); }); - ++outputs_[triggering_output] - .stats[memtrace_stream_t::SCHED_STAT_RUNQUEUE_REBALANCES]; - - // Workaround to avoid hangs when _SCHEDULE and/or _DIRECT_THREAD_SWITCH - // directives miss their targets (due to running with a subset of the - // original threads, or other scenarios) and we end up with no scheduled - // inputs but a set of unscheduled inputs who will never be scheduled. - // TODO i#6959: Just exit early instead, maybe under a flag. - // It would help to see what % of total records we've processed. - size_t unsched_size = 0; - { - std::lock_guard unsched_lock(*unscheduled_priority_.lock); - unsched_size = unscheduled_priority_.queue.size(); - } - if (live_input_count_.load(std::memory_order_acquire) == - static_cast(unsched_size)) { - VPRINT( - this, 1, - "rebalancing moving entire unscheduled queue (%zu entries) to ready_queues\n", - unsched_size); - { - std::lock_guard unsched_lock(*unscheduled_priority_.lock); - while (!unscheduled_priority_.queue.empty()) { - input_info_t *tomove = unscheduled_priority_.queue.top(); - inputs_to_add.push_back(tomove->index); - unscheduled_priority_.queue.pop(); - } - } - for (input_ordinal_t input : inputs_to_add) { - std::lock_guard lock(*inputs_[input].lock); - inputs_[input].unscheduled = false; - } - } - - int live_inputs = live_input_count_.load(std::memory_order_acquire); - int live_outputs = 0; - for (unsigned int i = 0; i < outputs_.size(); ++i) { - if (outputs_[i].active->load(std::memory_order_acquire)) - ++live_outputs; - } - double avg_per_output = live_inputs / static_cast(live_outputs); - unsigned int avg_ceiling = static_cast(std::ceil(avg_per_output)); - unsigned int avg_floor = static_cast(std::floor(avg_per_output)); - int iteration = 0; - do { - // Walk the outputs, filling too-short queues from inputs_to_add and - // shrinking too-long queues into inputs_to_add. We may need a 2nd pass - // for this; and a 3rd pass if bindings prevent even splitting. - VPRINT( - this, 3, - "Rebalance iteration %d inputs_to_add size=%zu avg_per_output=%4.1f %d-%d\n", - iteration, inputs_to_add.size(), avg_per_output, avg_floor, avg_ceiling); - // We're giving up the output locks as we go, so there may be some stealing - // in the middle of our operation, but the rebalancing is approximate anyway. - for (unsigned int i = 0; i < outputs_.size(); ++i) { - if (!outputs_[i].active->load(std::memory_order_acquire)) - continue; - auto lock = acquire_scoped_output_lock_if_necessary(i); - // Only remove on the 1st iteration; later we can exceed due to binding - // constraints. - while (iteration == 0 && outputs_[i].ready_queue.queue.size() > avg_ceiling) { - input_info_t *queue_next = nullptr; - // We use our regular pop_from_ready_queue which means we leave - // blocked inputs on the queue: those do not get rebalanced. - // XXX: Should we revisit that? - // - // We remove from the back to avoid penalizing the next-to-run entries - // at the front of the queue by putting them at the back of another - // queue. - status = pop_from_ready_queue_hold_locks( - i, sched_type_t::INVALID_OUTPUT_ORDINAL, queue_next, - /*from_back=*/true); - if (status == sched_type_t::STATUS_OK && queue_next != nullptr) { - VPRINT(this, 3, - "Rebalance iteration %d: output %d giving up input %d\n", - iteration, i, queue_next->index); - inputs_to_add.push_back(queue_next->index); - } else - break; - } - std::vector incompatible_inputs; - // If we reach the 3rd iteration, we have fussy inputs with bindings. - // Try to add them to every output. - while ( - (outputs_[i].ready_queue.queue.size() < avg_ceiling || iteration > 1) && - !inputs_to_add.empty()) { - input_ordinal_t ordinal = inputs_to_add.back(); - inputs_to_add.pop_back(); - input_info_t &input = inputs_[ordinal]; - std::lock_guard input_lock(*input.lock); - if (input.binding.empty() || - input.binding.find(i) != input.binding.end()) { - VPRINT(this, 3, "Rebalance iteration %d: output %d taking input %d\n", - iteration, i, ordinal); - add_to_ready_queue_hold_locks(i, &input); - } else { - incompatible_inputs.push_back(ordinal); - } - } - inputs_to_add.insert(inputs_to_add.end(), incompatible_inputs.begin(), - incompatible_inputs.end()); - } - ++iteration; - if (iteration >= 3 && !inputs_to_add.empty()) { - // This is possible with bindings limited to inactive outputs. - // XXX: Rather than return an error, we could add to the unscheduled queue, - // but do not mark the input unscheduled. Then when an output is - // marked active, we could walk the unscheduled queue and take - // inputs not marked unscheduled. - VPRINT(this, 1, "Rebalance hit impossible binding\n"); - status = sched_type_t::STATUS_IMPOSSIBLE_BINDING; - break; - } - } while (!inputs_to_add.empty()); - VPRINT(this, 2, "After:\n"); - VDO(this, 2, { print_queue_stats(); }); - rebalancer_.store(std::thread::id(), std::memory_order_release); - return status; -} - template class scheduler_impl_tmpl_t; template class scheduler_impl_tmpl_t; diff --git a/clients/drcachesim/scheduler/scheduler_impl.h b/clients/drcachesim/scheduler/scheduler_impl.h index 087579e8aca..a8c84f7cc55 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.h +++ b/clients/drcachesim/scheduler/scheduler_impl.h @@ -107,10 +107,7 @@ template class scheduler_impl_tmpl_t using switch_type_t = typename sched_type_t::switch_type_t; public: - scheduler_impl_tmpl_t() - { - last_rebalance_time_.store(0, std::memory_order_relaxed); - } + scheduler_impl_tmpl_t() = default; virtual ~scheduler_impl_tmpl_t(); virtual scheduler_status_t @@ -428,10 +425,6 @@ template class scheduler_impl_tmpl_t output_ordinal_t for_output, input_info_t *&new_input, bool from_back = false); - stream_status_t - rebalance_queues(output_ordinal_t triggering_output, - std::vector inputs_to_add); - // Up to the caller to check verbosity before calling. void print_queue_stats(); @@ -588,9 +581,9 @@ template class scheduler_impl_tmpl_t static constexpr uint64_t INSTRS_PER_US = 2000; // Called just once at initialization time to set the initial input-to-output - // mappings and state. - scheduler_status_t - set_initial_schedule(std::unordered_map> &workload2inputs); + // mappings and state for the particular mapping_t mode. + virtual scheduler_status_t + set_initial_schedule(std::unordered_map> &workload2inputs) = 0; // Assumed to only be called at initialization time. // Reads ahead in each input to find its filetype, and if "gather_timestamps" @@ -670,9 +663,6 @@ template class scheduler_impl_tmpl_t record_schedule_skip(output_ordinal_t output, input_ordinal_t input, uint64_t start_instruction, uint64_t stop_instruction); - scheduler_status_t - read_and_instantiate_traced_schedule(); - scheduler_status_t create_regions_from_times(const std::unordered_map &workload_tids, input_workload_t &workload); @@ -704,9 +694,6 @@ template class scheduler_impl_tmpl_t std::vector> &start2stop, std::vector> &all_sched); - scheduler_status_t - read_recorded_schedule(); - scheduler_status_t read_switch_sequences(); @@ -904,7 +891,7 @@ template class scheduler_impl_tmpl_t stream_status_t stop_speculation(output_ordinal_t output); - stream_status_t + virtual stream_status_t set_output_active(output_ordinal_t output, bool active); // Caller must hold the input's lock. @@ -950,9 +937,6 @@ template class scheduler_impl_tmpl_t mutex_dbg_owned unsched_lock_; // Inputs that are unscheduled indefinitely until directly targeted. input_queue_t unscheduled_priority_; - // Rebalancing coordination. - std::atomic rebalancer_; - std::atomic last_rebalance_time_; // Count of inputs not yet at eof. std::atomic live_input_count_; // In replay mode, count of outputs not yet at the end of the replay sequence. @@ -1010,26 +994,53 @@ typedef scheduler_impl_tmpl_t class scheduler_dynamic_tmpl_t : public scheduler_impl_tmpl_t { +public: + scheduler_dynamic_tmpl_t() + { + last_rebalance_time_.store(0, std::memory_order_relaxed); + } + private: using sched_type_t = scheduler_tmpl_t; using input_ordinal_t = typename sched_type_t::input_ordinal_t; using output_ordinal_t = typename sched_type_t::output_ordinal_t; + using scheduler_status_t = typename sched_type_t::scheduler_status_t; using stream_status_t = typename sched_type_t::stream_status_t; - using input_info_t = - typename scheduler_impl_tmpl_t::input_info_t; - using output_info_t = - typename scheduler_impl_tmpl_t::output_info_t; - using schedule_record_t = - typename scheduler_impl_tmpl_t::schedule_record_t; + using typename scheduler_impl_tmpl_t::input_info_t; + using typename scheduler_impl_tmpl_t::output_info_t; + using typename scheduler_impl_tmpl_t::schedule_record_t; + using + typename scheduler_impl_tmpl_t::InputTimestampComparator; + using scheduler_impl_tmpl_t::options_; + using scheduler_impl_tmpl_t::outputs_; + using scheduler_impl_tmpl_t::inputs_; + using scheduler_impl_tmpl_t::error_string_; + using scheduler_impl_tmpl_t::unscheduled_priority_; + using scheduler_impl_tmpl_t::set_cur_input; protected: + scheduler_status_t + set_initial_schedule( + std::unordered_map> &workload2inputs) override; + stream_status_t pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) override; + stream_status_t check_for_input_switch(output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time, bool &need_new_input, bool &preempt, uint64_t &blocked_time) override; + stream_status_t + set_output_active(output_ordinal_t output, bool active) override; + + stream_status_t + rebalance_queues(output_ordinal_t triggering_output, + std::vector inputs_to_add); + + // Rebalancing coordination. + std::atomic rebalancer_; + std::atomic last_rebalance_time_; }; // Specialized code for replaying schedules: either a recorded dynamic schedule @@ -1040,20 +1051,38 @@ class scheduler_replay_tmpl_t : public scheduler_impl_tmpl_t; using input_ordinal_t = typename sched_type_t::input_ordinal_t; using output_ordinal_t = typename sched_type_t::output_ordinal_t; + using scheduler_status_t = typename sched_type_t::scheduler_status_t; using stream_status_t = typename sched_type_t::stream_status_t; - using schedule_record_t = - typename scheduler_impl_tmpl_t::schedule_record_t; - using input_info_t = - typename scheduler_impl_tmpl_t::input_info_t; + using typename scheduler_impl_tmpl_t::schedule_record_t; + using typename scheduler_impl_tmpl_t::input_info_t; + using + typename scheduler_impl_tmpl_t::schedule_output_tracker_t; + using + typename scheduler_impl_tmpl_t::schedule_input_tracker_t; + using scheduler_impl_tmpl_t::options_; + using scheduler_impl_tmpl_t::outputs_; + using scheduler_impl_tmpl_t::inputs_; + using scheduler_impl_tmpl_t::error_string_; + using scheduler_impl_tmpl_t::set_cur_input; protected: + scheduler_status_t + set_initial_schedule( + std::unordered_map> &workload2inputs) override; + stream_status_t pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) override; + stream_status_t check_for_input_switch(output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time, bool &need_new_input, bool &preempt, uint64_t &blocked_time) override; + scheduler_status_t + read_recorded_schedule(); + + scheduler_status_t + read_and_instantiate_traced_schedule(); }; // Specialized code for fixed "schedules": typically serial or parallel analyzer @@ -1064,14 +1093,24 @@ class scheduler_fixed_tmpl_t : public scheduler_impl_tmpl_t; using input_ordinal_t = typename sched_type_t::input_ordinal_t; using output_ordinal_t = typename sched_type_t::output_ordinal_t; + using scheduler_status_t = typename sched_type_t::scheduler_status_t; using stream_status_t = typename sched_type_t::stream_status_t; - using input_info_t = - typename scheduler_impl_tmpl_t::input_info_t; + using typename scheduler_impl_tmpl_t::input_info_t; + using scheduler_impl_tmpl_t::options_; + using scheduler_impl_tmpl_t::outputs_; + using scheduler_impl_tmpl_t::inputs_; + using scheduler_impl_tmpl_t::error_string_; + using scheduler_impl_tmpl_t::set_cur_input; protected: + scheduler_status_t + set_initial_schedule( + std::unordered_map> &workload2inputs) override; + stream_status_t pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, input_ordinal_t prev_index, input_ordinal_t &index) override; + stream_status_t check_for_input_switch(output_ordinal_t output, RecordType &record, input_info_t *input, uint64_t cur_time, bool &need_new_input, diff --git a/clients/drcachesim/scheduler/scheduler_replay.cpp b/clients/drcachesim/scheduler/scheduler_replay.cpp index dd3a3758672..63cccfdc7c6 100644 --- a/clients/drcachesim/scheduler/scheduler_replay.cpp +++ b/clients/drcachesim/scheduler/scheduler_replay.cpp @@ -35,11 +35,10 @@ #include "scheduler.h" #include "scheduler_impl.h" -#include +#include #include #include #include -#include #include "memref.h" #include "mutex_dbg_owned.h" @@ -50,6 +49,246 @@ namespace dynamorio { namespace drmemtrace { +template +typename scheduler_tmpl_t::scheduler_status_t +scheduler_replay_tmpl_t::set_initial_schedule( + std::unordered_map> &workload2inputs) +{ + if (options_.mapping == sched_type_t::MAP_AS_PREVIOUSLY) { + this->live_replay_output_count_.store(static_cast(outputs_.size()), + std::memory_order_release); + if (options_.schedule_replay_istream == nullptr || + options_.schedule_record_ostream != nullptr) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + scheduler_status_t status = read_recorded_schedule(); + if (status != sched_type_t::STATUS_SUCCESS) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } else if (options_.schedule_replay_istream != nullptr) { + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } else if (options_.mapping == sched_type_t::MAP_TO_RECORDED_OUTPUT && + options_.replay_as_traced_istream != nullptr) { + // Even for just one output we honor a request to replay the schedule + // (although it should match the analyzer serial mode so there's no big + // benefit to reading the schedule file. The analyzer serial mode or other + // special cases of one output don't set the replay_as_traced_istream + // field.) + scheduler_status_t status = read_and_instantiate_traced_schedule(); + if (status != sched_type_t::STATUS_SUCCESS) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + // Now leverage the regular replay code. + options_.mapping = sched_type_t::MAP_AS_PREVIOUSLY; + } else { + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + return sched_type_t::STATUS_SUCCESS; +} + +template +typename scheduler_tmpl_t::scheduler_status_t +scheduler_replay_tmpl_t::read_recorded_schedule() +{ + if (options_.schedule_replay_istream == nullptr) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + + schedule_record_t record; + // We assume we can easily fit the whole context switch sequence in memory. + // If that turns out not to be the case for very long traces, we deliberately + // used an archive format so we could do parallel incremental reads. + // (Conversely, if we want to commit to storing in memory, we could use a + // non-archive format and store the output ordinal in the version record.) + for (int i = 0; i < static_cast(outputs_.size()); ++i) { + std::string err = options_.schedule_replay_istream->open_component( + this->recorded_schedule_component_name(i)); + if (!err.empty()) { + error_string_ = "Failed to open schedule_replay_istream component " + + this->recorded_schedule_component_name(i) + ": " + err; + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + // XXX: This could be made more efficient if we stored the record count + // in the version field's stop_instruction field or something so we can + // size the vector up front. As this only happens once we do not bother + // and live with a few vector resizes. + bool saw_footer = false; + while (options_.schedule_replay_istream->read(reinterpret_cast(&record), + sizeof(record))) { + if (record.type == schedule_record_t::VERSION) { + if (record.key.version != schedule_record_t::VERSION_CURRENT) + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } else if (record.type == schedule_record_t::FOOTER) { + saw_footer = true; + break; + } else + outputs_[i].record.push_back(record); + } + if (!saw_footer) { + error_string_ = "Record file missing footer"; + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + VPRINT(this, 1, "Read %zu recorded records for output #%d\n", + outputs_[i].record.size(), i); + } + // See if there was more data in the file (we do this after reading to not + // mis-report i/o or path errors as this error). + std::string err = options_.schedule_replay_istream->open_component( + this->recorded_schedule_component_name( + static_cast(outputs_.size()))); + if (err.empty()) { + error_string_ = "Not enough output streams for recorded file"; + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + for (int i = 0; i < static_cast(outputs_.size()); ++i) { + if (outputs_[i].record.empty()) { + // XXX i#6630: We should auto-set the output count and avoid + // having extra outputs; these complicate idle computations, etc. + VPRINT(this, 1, "output %d empty: returning eof up front\n", i); + set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); + outputs_[i].at_eof = true; + } else if (outputs_[i].record[0].type == schedule_record_t::IDLE || + outputs_[i].record[0].type == schedule_record_t::IDLE_BY_COUNT) { + set_cur_input(i, sched_type_t::INVALID_INPUT_ORDINAL); + outputs_[i].waiting = true; + if (outputs_[i].record[0].type == schedule_record_t::IDLE) { + // Convert a legacy idle duration from microseconds to record counts. + outputs_[i].record[0].value.idle_duration = + static_cast(options_.time_units_per_us * + outputs_[i].record[0].value.idle_duration); + } + outputs_[i].idle_start_count = -1; // Updated on first next_record(). + VPRINT(this, 3, "output %d starting out idle\n", i); + } else { + assert(outputs_[i].record[0].type == schedule_record_t::DEFAULT); + set_cur_input(i, outputs_[i].record[0].key.input); + } + } + return sched_type_t::STATUS_SUCCESS; +} + +template +typename scheduler_tmpl_t::scheduler_status_t +scheduler_replay_tmpl_t::read_and_instantiate_traced_schedule() +{ + std::vector> start2stop(inputs_.size()); + // We also want to collapse same-cpu consecutive records so we start with + // a temporary local vector. + std::vector> all_sched(outputs_.size()); + // Work around i#6107 by tracking counts sorted by timestamp for each input. + std::vector> input_sched(inputs_.size()); + // These hold entries added in the on-disk (unsorted) order. + std::vector disk_ord2index; // Initially [i] holds i. + std::vector disk_ord2cpuid; // [i] holds cpuid for entry i. + scheduler_status_t res = this->read_traced_schedule( + input_sched, start2stop, all_sched, disk_ord2index, disk_ord2cpuid); + if (res != sched_type_t::STATUS_SUCCESS) + return res; + // Sort by cpuid to get a more natural ordering. + // Probably raw2trace should do this in the first place, but we have many + // schedule files already out there so we still need a sort here. + // If we didn't have cross-indices pointing at all_sched from input_sched, we + // would just sort all_sched: but instead we have to construct a separate + // ordering structure. + std::sort(disk_ord2index.begin(), disk_ord2index.end(), + [disk_ord2cpuid](const output_ordinal_t &l, const output_ordinal_t &r) { + return disk_ord2cpuid[l] < disk_ord2cpuid[r]; + }); + // disk_ord2index[i] used to hold i; now after sorting it holds the ordinal in + // the disk file that has the ith largest cpuid. We need to turn that into + // the output_idx ordinal for the cpu at ith ordinal in the disk file, for + // which we use a new vector disk_ord2output. + // E.g., if the original file was in this order disk_ord2cpuid = {6,2,3,7}, + // disk_ord2index after sorting would hold {1,2,0,3}, which we want to turn + // into disk_ord2output = {2,0,1,3}. + std::vector disk_ord2output(disk_ord2index.size()); + for (size_t i = 0; i < disk_ord2index.size(); ++i) { + disk_ord2output[disk_ord2index[i]] = static_cast(i); + } + for (int disk_idx = 0; disk_idx < static_cast(outputs_.size()); + ++disk_idx) { + if (disk_idx >= static_cast(disk_ord2index.size())) { + // XXX i#6630: We should auto-set the output count and avoid + // having extra ouputs; these complicate idle computations, etc. + VPRINT(this, 1, "Output %d empty: returning eof up front\n", disk_idx); + outputs_[disk_idx].at_eof = true; + set_cur_input(disk_idx, sched_type_t::INVALID_INPUT_ORDINAL); + continue; + } + output_ordinal_t output_idx = disk_ord2output[disk_idx]; + VPRINT(this, 1, "Read %zu as-traced records for output #%d\n", + all_sched[disk_idx].size(), output_idx); + outputs_[output_idx].as_traced_cpuid = disk_ord2cpuid[disk_idx]; + VPRINT(this, 1, "Output #%d is as-traced CPU #%" PRId64 "\n", output_idx, + outputs_[output_idx].as_traced_cpuid); + // Update the stop_instruction field and collapse consecutive entries while + // inserting into the final location. + int start_consec = -1; + for (int sched_idx = 0; sched_idx < static_cast(all_sched[disk_idx].size()); + ++sched_idx) { + auto &segment = all_sched[disk_idx][sched_idx]; + if (!segment.valid) + continue; + auto find = start2stop[segment.input].find(segment.start_instruction); + ++find; + if (find == start2stop[segment.input].end()) + segment.stop_instruction = std::numeric_limits::max(); + else + segment.stop_instruction = *find; + VPRINT(this, 4, + "as-read segment #%d: input=%d start=%" PRId64 " stop=%" PRId64 + " time=%" PRId64 "\n", + sched_idx, segment.input, segment.start_instruction, + segment.stop_instruction, segment.timestamp); + if (sched_idx + 1 < static_cast(all_sched[disk_idx].size()) && + segment.input == all_sched[disk_idx][sched_idx + 1].input && + segment.stop_instruction > + all_sched[disk_idx][sched_idx + 1].start_instruction) { + // A second sanity check. + error_string_ = "Invalid decreasing start field in schedule file"; + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } else if (sched_idx + 1 < static_cast(all_sched[disk_idx].size()) && + segment.input == all_sched[disk_idx][sched_idx + 1].input && + segment.stop_instruction == + all_sched[disk_idx][sched_idx + 1].start_instruction) { + // Collapse into next. + if (start_consec == -1) + start_consec = sched_idx; + } else { + schedule_output_tracker_t &toadd = start_consec >= 0 + ? all_sched[disk_idx][start_consec] + : all_sched[disk_idx][sched_idx]; + outputs_[output_idx].record.emplace_back( + schedule_record_t::DEFAULT, toadd.input, toadd.start_instruction, + all_sched[disk_idx][sched_idx].stop_instruction, toadd.timestamp); + start_consec = -1; + VDO(this, 3, { + auto &added = outputs_[output_idx].record.back(); + VPRINT(this, 3, + "segment #%zu: input=%d start=%" PRId64 " stop=%" PRId64 + " time=%" PRId64 "\n", + outputs_[output_idx].record.size() - 1, added.key.input, + added.value.start_instruction, added.stop_instruction, + added.timestamp); + }); + } + } + VPRINT(this, 1, "Collapsed duplicates for %zu as-traced records for output #%d\n", + outputs_[output_idx].record.size(), output_idx); + if (outputs_[output_idx].record.empty()) { + error_string_ = "Empty as-traced schedule"; + return sched_type_t::STATUS_ERROR_INVALID_PARAMETER; + } + if (outputs_[output_idx].record[0].value.start_instruction != 0) { + VPRINT(this, 1, "Initial input for output #%d is: wait state\n", output_idx); + set_cur_input(output_idx, sched_type_t::INVALID_INPUT_ORDINAL); + outputs_[output_idx].waiting = true; + outputs_[output_idx].record_index->store(-1, std::memory_order_release); + } else { + VPRINT(this, 1, "Initial input for output #%d is %d\n", output_idx, + outputs_[output_idx].record[0].key.input); + set_cur_input(output_idx, outputs_[output_idx].record[0].key.input); + } + } + return sched_type_t::STATUS_SUCCESS; +} + template typename scheduler_tmpl_t::stream_status_t scheduler_replay_tmpl_t::pick_next_input_for_mode( @@ -57,27 +296,26 @@ scheduler_replay_tmpl_t::pick_next_input_for_mode( input_ordinal_t &index) { // Our own index is only modified by us so we can cache it here. - int record_index = - this->outputs_[output].record_index->load(std::memory_order_acquire); - if (record_index + 1 >= static_cast(this->outputs_[output].record.size())) { - if (!this->outputs_[output].at_eof) { - this->outputs_[output].at_eof = true; + int record_index = outputs_[output].record_index->load(std::memory_order_acquire); + if (record_index + 1 >= static_cast(outputs_[output].record.size())) { + if (!outputs_[output].at_eof) { + outputs_[output].at_eof = true; this->live_replay_output_count_.fetch_add(-1, std::memory_order_release); } - return this->eof_or_idle(output, this->outputs_[output].cur_input); + return this->eof_or_idle(output, outputs_[output].cur_input); } - schedule_record_t &segment = this->outputs_[output].record[record_index + 1]; + schedule_record_t &segment = outputs_[output].record[record_index + 1]; if (segment.type == schedule_record_t::IDLE || segment.type == schedule_record_t::IDLE_BY_COUNT) { - this->outputs_[output].waiting = true; + outputs_[output].waiting = true; if (segment.type == schedule_record_t::IDLE) { // Convert a legacy idle duration from microseconds to record counts. segment.value.idle_duration = static_cast( - this->options_.time_units_per_us * segment.value.idle_duration); + options_.time_units_per_us * segment.value.idle_duration); } - this->outputs_[output].idle_start_count = this->outputs_[output].idle_count; - this->outputs_[output].record_index->fetch_add(1, std::memory_order_release); - ++this->outputs_[output].idle_count; + outputs_[output].idle_start_count = outputs_[output].idle_count; + outputs_[output].record_index->fetch_add(1, std::memory_order_release); + ++outputs_[output].idle_count; VPRINT(this, 5, "%s[%d]: next replay segment idle for %" PRIu64 "\n", __FUNCTION__, output, segment.value.idle_duration); return sched_type_t::STATUS_IDLE; @@ -86,28 +324,25 @@ scheduler_replay_tmpl_t::pick_next_input_for_mode( VPRINT(this, 5, "%s[%d]: next replay segment in=%d (@%" PRId64 ") type=%d start=%" PRId64 " end=%" PRId64 "\n", - __FUNCTION__, output, index, this->get_instr_ordinal(this->inputs_[index]), + __FUNCTION__, output, index, this->get_instr_ordinal(inputs_[index]), segment.type, segment.value.start_instruction, segment.stop_instruction); { - std::lock_guard lock(*this->inputs_[index].lock); - if (this->get_instr_ordinal(this->inputs_[index]) > - segment.value.start_instruction) { + std::lock_guard lock(*inputs_[index].lock); + if (this->get_instr_ordinal(inputs_[index]) > segment.value.start_instruction) { VPRINT(this, 1, "WARNING: next_record[%d]: input %d wants instr #%" PRId64 " but it is already at #%" PRId64 "\n", output, index, segment.value.start_instruction, - this->get_instr_ordinal(this->inputs_[index])); + this->get_instr_ordinal(inputs_[index])); } - if (this->get_instr_ordinal(this->inputs_[index]) < - segment.value.start_instruction && + if (this->get_instr_ordinal(inputs_[index]) < segment.value.start_instruction && // Don't wait for an ROI that starts at the beginning. segment.value.start_instruction > 1 && // The output may have begun in the wait state. (record_index == -1 || // When we skip our separator+timestamp markers are at the // prior instr ord so do not wait for that. - (this->outputs_[output].record[record_index].type != - schedule_record_t::SKIP && + (outputs_[output].record[record_index].type != schedule_record_t::SKIP && // Don't wait if we're at the end and just need the end record. segment.type != schedule_record_t::SYNTHETIC_END))) { // Some other output stream has not advanced far enough, and we do @@ -124,29 +359,28 @@ scheduler_replay_tmpl_t::pick_next_input_for_mode( output, index, segment.value.start_instruction); // Give up this input and go into a wait state. // We'll come back here on the next next_record() call. - this->set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL, - // Avoid livelock if prev input == cur input which happens - // with back-to-back segments with the same input. - index == this->outputs_[output].cur_input); - this->outputs_[output].waiting = true; + set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL, + // Avoid livelock if prev input == cur input which happens + // with back-to-back segments with the same input. + index == outputs_[output].cur_input); + outputs_[output].waiting = true; return sched_type_t::STATUS_WAIT; } } // Also wait if this segment is ahead of the next-up segment on another // output. We only have a timestamp per context switch so we can't // enforce finer-grained timing replay. - if (this->options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { - for (int i = 0; i < static_cast(this->outputs_.size()); ++i) { + if (options_.deps == sched_type_t::DEPENDENCY_TIMESTAMPS) { + for (int i = 0; i < static_cast(outputs_.size()); ++i) { if (i == output) continue; // Do an atomic load once and use it to de-reference if it's not at the end. // This is safe because if the target advances to the end concurrently it // will only cause an extra wait that will just come back here and then // continue. - int other_index = - this->outputs_[i].record_index->load(std::memory_order_acquire); - if (other_index + 1 < static_cast(this->outputs_[i].record.size()) && - segment.timestamp > this->outputs_[i].record[other_index + 1].timestamp) { + int other_index = outputs_[i].record_index->load(std::memory_order_acquire); + if (other_index + 1 < static_cast(outputs_[i].record.size()) && + segment.timestamp > outputs_[i].record[other_index + 1].timestamp) { VPRINT(this, 3, "next_record[%d]: waiting because timestamp %" PRIu64 " is ahead of output %d\n", @@ -157,64 +391,60 @@ scheduler_replay_tmpl_t::pick_next_input_for_mode( // order due to using prior values, to avoid hanging. We try to avoid // this by using wall-clock time in record_schedule_segment() rather than // the stored output time. - this->set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); - this->outputs_[output].waiting = true; + set_cur_input(output, sched_type_t::INVALID_INPUT_ORDINAL); + outputs_[output].waiting = true; return sched_type_t::STATUS_WAIT; } } } if (segment.type == schedule_record_t::SYNTHETIC_END) { - std::lock_guard lock(*this->inputs_[index].lock); + std::lock_guard lock(*inputs_[index].lock); // We're past the final region of interest and we need to insert // a synthetic thread exit record. We need to first throw out the // queued candidate record, if any. - this->clear_input_queue(this->inputs_[index]); - this->inputs_[index].queue.push_back( - this->create_thread_exit(this->inputs_[index].tid)); + this->clear_input_queue(inputs_[index]); + inputs_[index].queue.push_back(this->create_thread_exit(inputs_[index].tid)); VPRINT(this, 2, "early end for input %d\n", index); // We're done with this entry but we need the queued record to be read, // so we do not move past the entry. - this->outputs_[output].record_index->fetch_add(1, std::memory_order_release); - stream_status_t status = this->mark_input_eof(this->inputs_[index]); + outputs_[output].record_index->fetch_add(1, std::memory_order_release); + stream_status_t status = this->mark_input_eof(inputs_[index]); if (status != sched_type_t::STATUS_OK) return status; return sched_type_t::STATUS_SKIPPED; } else if (segment.type == schedule_record_t::SKIP) { - std::lock_guard lock(*this->inputs_[index].lock); - uint64_t cur_reader_instr = - this->inputs_[index].reader->get_instruction_ordinal(); + std::lock_guard lock(*inputs_[index].lock); + uint64_t cur_reader_instr = inputs_[index].reader->get_instruction_ordinal(); VPRINT(this, 2, "next_record[%d]: skipping from %" PRId64 " to %" PRId64 " in %d for schedule\n", output, cur_reader_instr, segment.stop_instruction, index); - auto status = this->skip_instructions(this->inputs_[index], + auto status = this->skip_instructions(inputs_[index], segment.stop_instruction - cur_reader_instr - 1 /*exclusive*/); // Increment the region to get window id markers with ordinals. - this->inputs_[index].cur_region++; + inputs_[index].cur_region++; if (status != sched_type_t::STATUS_SKIPPED) return sched_type_t::STATUS_INVALID; // We're done with the skip so move to and past it. - this->outputs_[output].record_index->fetch_add(2, std::memory_order_release); + outputs_[output].record_index->fetch_add(2, std::memory_order_release); return sched_type_t::STATUS_SKIPPED; } else { VPRINT(this, 2, "next_record[%d]: advancing to input %d instr #%" PRId64 "\n", output, index, segment.value.start_instruction); } - this->outputs_[output].record_index->fetch_add(1, std::memory_order_release); + outputs_[output].record_index->fetch_add(1, std::memory_order_release); VDO(this, 2, { // Our own index is only modified by us so we can cache it here. - int local_index = - this->outputs_[output].record_index->load(std::memory_order_acquire); + int local_index = outputs_[output].record_index->load(std::memory_order_acquire); if (local_index >= 0 && - local_index < static_cast(this->outputs_[output].record.size())) { - const schedule_record_t &local_segment = - this->outputs_[output].record[local_index]; + local_index < static_cast(outputs_[output].record.size())) { + const schedule_record_t &local_segment = outputs_[output].record[local_index]; int input = local_segment.key.input; VPRINT(this, 2, "next_record[%d]: replay segment in=%d (@%" PRId64 ") type=%d start=%" PRId64 " end=%" PRId64 "\n", - output, input, this->get_instr_ordinal(this->inputs_[input]), + output, input, this->get_instr_ordinal(inputs_[input]), local_segment.type, local_segment.value.start_instruction, local_segment.stop_instruction); } @@ -229,21 +459,19 @@ scheduler_replay_tmpl_t::check_for_input_switch( bool &need_new_input, bool &preempt, uint64_t &blocked_time) { // Our own index is only modified by us so we can cache it here. - int record_index = - this->outputs_[output].record_index->load(std::memory_order_acquire); + int record_index = outputs_[output].record_index->load(std::memory_order_acquire); assert(record_index >= 0); - if (record_index >= static_cast(this->outputs_[output].record.size())) { + if (record_index >= static_cast(outputs_[output].record.size())) { // We're on the last record. VPRINT(this, 4, "next_record[%d]: on last record\n", output); - } else if (this->outputs_[output].record[record_index].type == - schedule_record_t::SKIP) { + } else if (outputs_[output].record[record_index].type == schedule_record_t::SKIP) { VPRINT(this, 5, "next_record[%d]: need new input after skip\n", output); need_new_input = true; - } else if (this->outputs_[output].record[record_index].type == + } else if (outputs_[output].record[record_index].type == schedule_record_t::SYNTHETIC_END) { VPRINT(this, 5, "next_record[%d]: at synthetic end\n", output); } else { - const schedule_record_t &segment = this->outputs_[output].record[record_index]; + const schedule_record_t &segment = outputs_[output].record[record_index]; assert(segment.type == schedule_record_t::DEFAULT); uint64_t start = segment.value.start_instruction; uint64_t stop = segment.stop_instruction; diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index b256fb60e33..586807c192d 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -2966,7 +2966,30 @@ test_replay_multi_threaded(const char *testdir) #ifdef HAS_ZIP // We subclass scheduler_impl_t to access its record struct and functions. -class test_scheduler_t : public scheduler_impl_t { +// First we fill in pure-virtuals to share with a similar class below: +class test_scheduler_base_t : public scheduler_impl_t { +public: + scheduler_status_t + set_initial_schedule( + std::unordered_map> &workload2inputs) override + { + return sched_type_t::STATUS_ERROR_NOT_IMPLEMENTED; + } + stream_status_t + pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, + input_ordinal_t prev_index, input_ordinal_t &index) override + { + return sched_type_t::STATUS_NOT_IMPLEMENTED; + } + stream_status_t + check_for_input_switch(output_ordinal_t output, memref_t &record, input_info_t *input, + uint64_t cur_time, bool &need_new_input, bool &preempt, + uint64_t &blocked_time) override + { + return sched_type_t::STATUS_NOT_IMPLEMENTED; + } +}; +class test_scheduler_t : public test_scheduler_base_t { public: void write_test_schedule(std::string record_fname) @@ -3012,19 +3035,6 @@ class test_scheduler_t : public scheduler_impl_t { sched1.size() * sizeof(sched1[0]))) assert(false); } - stream_status_t - pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, - input_ordinal_t prev_index, input_ordinal_t &index) override - { - return sched_type_t::STATUS_NOT_IMPLEMENTED; - } - stream_status_t - check_for_input_switch(output_ordinal_t output, memref_t &record, input_info_t *input, - uint64_t cur_time, bool &need_new_input, bool &preempt, - uint64_t &blocked_time) override - { - return sched_type_t::STATUS_NOT_IMPLEMENTED; - } }; #endif @@ -3090,7 +3100,8 @@ test_replay_timestamps() #ifdef HAS_ZIP // We subclass scheduler_impl_t to access its record struct and functions. -class test_noeof_scheduler_t : public scheduler_impl_t { +// We subclass scheduler_impl_t to access its record struct and functions. +class test_noeof_scheduler_t : public test_scheduler_base_t { public: void write_test_schedule(std::string record_fname) @@ -3137,19 +3148,6 @@ class test_noeof_scheduler_t : public scheduler_impl_t { sched1.size() * sizeof(sched1[0]))) assert(false); } - stream_status_t - pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, - input_ordinal_t prev_index, input_ordinal_t &index) override - { - return sched_type_t::STATUS_NOT_IMPLEMENTED; - } - stream_status_t - check_for_input_switch(output_ordinal_t output, memref_t &record, input_info_t *input, - uint64_t cur_time, bool &need_new_input, bool &preempt, - uint64_t &blocked_time) override - { - return sched_type_t::STATUS_NOT_IMPLEMENTED; - } }; #endif From ae960a043fe45033b60d8a9dadf7b27d10c8808c Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Wed, 13 Nov 2024 22:00:00 -0500 Subject: [PATCH 2/3] Fix Windows warning --- clients/drcachesim/scheduler/scheduler_impl.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clients/drcachesim/scheduler/scheduler_impl.cpp b/clients/drcachesim/scheduler/scheduler_impl.cpp index 10e53782773..89f5d65c1a7 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.cpp +++ b/clients/drcachesim/scheduler/scheduler_impl.cpp @@ -891,7 +891,7 @@ scheduler_impl_tmpl_t::init( // filetype we avoid going as far as the timestamp. bool gather_filetype = options_.read_inputs_in_init; if (gather_filetype || gather_timestamps) { - scheduler_status_t res = this->get_initial_input_content(gather_timestamps); + res = this->get_initial_input_content(gather_timestamps); if (res != sched_type_t::STATUS_SUCCESS) { error_string_ = "Failed to read initial input contents for filetype"; if (gather_timestamps) From 12816f740db5e0b06def2f745ad88e9e2a084645 Mon Sep 17 00:00:00 2001 From: Derek Bruening Date: Thu, 14 Nov 2024 14:17:27 -0500 Subject: [PATCH 3/3] Review requests: group virtual; add comments; set_output_active default in header --- clients/drcachesim/scheduler/scheduler.cpp | 6 +- .../drcachesim/scheduler/scheduler_impl.cpp | 9 -- clients/drcachesim/scheduler/scheduler_impl.h | 89 +++++++++++-------- .../drcachesim/tests/scheduler_unit_tests.cpp | 1 - 4 files changed, 56 insertions(+), 49 deletions(-) diff --git a/clients/drcachesim/scheduler/scheduler.cpp b/clients/drcachesim/scheduler/scheduler.cpp index 828c59be514..aea76d706c2 100644 --- a/clients/drcachesim/scheduler/scheduler.cpp +++ b/clients/drcachesim/scheduler/scheduler.cpp @@ -59,9 +59,9 @@ scheduler_tmpl_t::init( scheduler_impl_deleter_t>( new scheduler_replay_tmpl_t); } else { - // Non-dynamic and non-replay fixed modes such as analyzer serial and - // parallel modes. Although serial does do interleaving by timestamp - // it is closer to parallel mode than the file-based replay modes. + // Non-dynamic and non-replay fixed modes such as analyzer + // parallel mode with a static mapping of inputs to outputs and analyzer + // serial mode with a simple time interleaving of all inputs onto one output. impl_ = std::unique_ptr, scheduler_impl_deleter_t>( new scheduler_fixed_tmpl_t); diff --git a/clients/drcachesim/scheduler/scheduler_impl.cpp b/clients/drcachesim/scheduler/scheduler_impl.cpp index 89f5d65c1a7..7a871b0c211 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.cpp +++ b/clients/drcachesim/scheduler/scheduler_impl.cpp @@ -3403,15 +3403,6 @@ scheduler_impl_tmpl_t::get_statistic( return static_cast(outputs_[output].stats[stat]); } -template -typename scheduler_tmpl_t::stream_status_t -scheduler_impl_tmpl_t::set_output_active(output_ordinal_t output, - bool active) -{ - // Only supported in scheduler_dynamic_tmpl_t subclass. - return sched_type_t::STATUS_INVALID; -} - template void scheduler_impl_tmpl_t::print_queue_stats() diff --git a/clients/drcachesim/scheduler/scheduler_impl.h b/clients/drcachesim/scheduler/scheduler_impl.h index a8c84f7cc55..4399fa00677 100644 --- a/clients/drcachesim/scheduler/scheduler_impl.h +++ b/clients/drcachesim/scheduler/scheduler_impl.h @@ -580,11 +580,64 @@ template class scheduler_impl_tmpl_t // We assume a 2GHz clock and IPC=1. static constexpr uint64_t INSTRS_PER_US = 2000; + /////////////////////////////////////////////////////////////////////////// + /// Virtual methods. + /// XXX i#6831: These interfaces between the main class the subclasses could be + /// more clearly separated and crystalized. One goal is to avoid conditionals + // in scheduler_impl_tmpl_t based on options_mapping or possibly on options_ + // at all (possibly only storing options_ in the subclasses). + // Called just once at initialization time to set the initial input-to-output // mappings and state for the particular mapping_t mode. + // Should call set_cur_input() for all outputs with initial inputs. virtual scheduler_status_t set_initial_schedule(std::unordered_map> &workload2inputs) = 0; + // Allow subclasses to perform custom initial marker processing during + // get_initial_input_content(). Returns whether to keep reading. + // The caller will stop calling when an instruction record is reached. + // The 'record' may have TRACE_TYPE_INVALID in some calls in which case + // the two bool parameters are what the return value should be based on. + virtual bool + process_next_initial_record(input_info_t &input, RecordType record, + bool &found_filetype, bool &found_timestamp); + + // Helper for pick_next_input() specialized by mapping_t mode. + // This is called when check_for_input_switch() indicates a switch is needed. + // No input_info_t lock can be held on entry. + virtual stream_status_t + pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, + input_ordinal_t prev_index, input_ordinal_t &index) = 0; + + // Helper for next_record() specialized by mapping_t mode: called on every record + // before it's passed to the user. Determines whether to switch to a new input + // (returned in "need_new_input"; if so, whether it's a preempt is in "preempt" + // and if this current input should be blocked then that time should be set in + // "blocked_time"). If this returns true for "need_new_input", + // pick_next_input_for_mode() is called. + virtual stream_status_t + check_for_input_switch(output_ordinal_t output, RecordType &record, + input_info_t *input, uint64_t cur_time, bool &need_new_input, + bool &preempt, uint64_t &blocked_time) = 0; + + // Process each marker seen for MAP_TO_ANY_OUTPUT during next_record(). + // The input's lock must be held by the caller. + virtual void + process_marker(input_info_t &input, output_ordinal_t output, + trace_marker_type_t marker_type, uintptr_t marker_value); + + // The external interface lets a user request that an output go inactive when + // doing dynamic scheduling. + virtual stream_status_t + set_output_active(output_ordinal_t output, bool active) + { + // Only supported in scheduler_dynamic_tmpl_t subclass. + return sched_type_t::STATUS_INVALID; + } + + /// + /////////////////////////////////////////////////////////////////////////// + // Assumed to only be called at initialization time. // Reads ahead in each input to find its filetype, and if "gather_timestamps" // is set, to find its first timestamp, queuing all records @@ -596,15 +649,6 @@ template class scheduler_impl_tmpl_t void print_configuration(); - // Allow subclasses to perform custom initial marker processing during - // get_initial_input_content(). Returns whether to keep reading. - // The caller will stop calling when an instruction record is reached. - // The 'record' may have TRACE_TYPE_INVALID in some calls in which case - // the two bool parameters are what the return value should be based on. - virtual bool - process_next_initial_record(input_info_t &input, RecordType record, - bool &found_filetype, bool &found_timestamp); - scheduler_status_t legacy_field_support(); @@ -739,24 +783,6 @@ template class scheduler_impl_tmpl_t stream_status_t pick_next_input(output_ordinal_t output, uint64_t blocked_time); - // Helper for pick_next_input() specialized by mapping_t mode. - // This is called when check_for_input_switch() indicates a switch is needed. - // No input_info_t lock can be held on entry. - virtual stream_status_t - pick_next_input_for_mode(output_ordinal_t output, uint64_t blocked_time, - input_ordinal_t prev_index, input_ordinal_t &index) = 0; - - // Helper for next_record() specialized by mapping_t mode: called on every record - // before it's passed to the user. Determines whether to switch to a new input - // (returned in "need_new_input"; if so, whether it's a preempt is in "preempt" - // and if this current input should be blocked then that time should be set in - // "blocked_time"). If this returns true for "need_new_input", - // pick_next_input_for_mode() is called. - virtual stream_status_t - check_for_input_switch(output_ordinal_t output, RecordType &record, - input_info_t *input, uint64_t cur_time, bool &need_new_input, - bool &preempt, uint64_t &blocked_time) = 0; - // If the given record has a thread id field, returns true and the value. bool record_type_has_tid(RecordType record, memref_tid_t &tid); @@ -821,12 +847,6 @@ template class scheduler_impl_tmpl_t void print_record(const RecordType &record); - // Process each marker seen for MAP_TO_ANY_OUTPUT during next_record(). - // The input's lock must be held by the caller. - virtual void - process_marker(input_info_t &input, output_ordinal_t output, - trace_marker_type_t marker_type, uintptr_t marker_value); - // Returns the get_stream_name() value for the current input stream scheduled on // the 'output_ordinal'-th output stream. std::string @@ -891,9 +911,6 @@ template class scheduler_impl_tmpl_t stream_status_t stop_speculation(output_ordinal_t output); - virtual stream_status_t - set_output_active(output_ordinal_t output, bool active); - // Caller must hold the input's lock. // The return value is STATUS_EOF if a global exit is now happening (an // early exit); otherwise STATUS_OK is returned on success but only a diff --git a/clients/drcachesim/tests/scheduler_unit_tests.cpp b/clients/drcachesim/tests/scheduler_unit_tests.cpp index 586807c192d..0e613c90aa2 100644 --- a/clients/drcachesim/tests/scheduler_unit_tests.cpp +++ b/clients/drcachesim/tests/scheduler_unit_tests.cpp @@ -3100,7 +3100,6 @@ test_replay_timestamps() #ifdef HAS_ZIP // We subclass scheduler_impl_t to access its record struct and functions. -// We subclass scheduler_impl_t to access its record struct and functions. class test_noeof_scheduler_t : public test_scheduler_base_t { public: void