diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 871dc1b9f..88659485a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -23,7 +23,7 @@ jobs: uses: lf-lang/lingua-franca/.github/workflows/extract-ref.yml@master with: file: 'lingua-franca-ref.txt' - + lf-default: needs: fetch-lf uses: lf-lang/lingua-franca/.github/workflows/c-tests.yml@master @@ -38,7 +38,7 @@ jobs: runtime-ref: ${{ github.ref }} compiler-ref: ${{ needs.fetch-lf.outputs.ref }} scheduler: GEDF_NP - + lf-gedf-np-ci: needs: fetch-lf uses: lf-lang/lingua-franca/.github/workflows/c-tests.yml@master @@ -46,4 +46,11 @@ jobs: runtime-ref: ${{ github.ref }} compiler-ref: ${{ needs.fetch-lf.outputs.ref }} scheduler: GEDF_NP_CI - + + lf-adaptive: + needs: fetch-lf + uses: lf-lang/lingua-franca/.github/workflows/c-tests.yml@master + with: + runtime-ref: ${{ github.ref }} + compiler-ref: ${{ needs.fetch-lf.outputs.ref }} + scheduler: adaptive diff --git a/core/platform.h b/core/platform.h index 941cce803..3b00f440f 100644 --- a/core/platform.h +++ b/core/platform.h @@ -210,10 +210,10 @@ extern int lf_cond_timedwait(lf_cond_t* cond, lf_mutex_t* mutex, instant_t absol * @param ptr A pointer to a variable. * @param oldval The value to compare against. * @param newval The value to assign to *ptr if comparison is successful. - * @return The true if comparison was successful. False otherwise. + * @return True if comparison was successful. False otherwise. */ #if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) -// Assume that an integer is 32 bits. +// Assume that a boolean is represented with a 32-bit integer. #define lf_bool_compare_and_swap(ptr, oldval, newval) (InterlockedCompareExchange(ptr, newval, oldval) == oldval) #elif defined(__GNUC__) || defined(__clang__) #define lf_bool_compare_and_swap(ptr, oldval, newval) __sync_bool_compare_and_swap(ptr, oldval, newval) @@ -221,6 +221,22 @@ extern int lf_cond_timedwait(lf_cond_t* cond, lf_mutex_t* mutex, instant_t absol #error "Compiler not supported" #endif +/* + * Atomically compare the 32-bit value that ptr points to against oldval. If the + * current value is oldval, then write newval into *ptr. + * @param ptr A pointer to a variable. + * @param oldval The value to compare against. + * @param newval The value to assign to *ptr if comparison is successful. + * @return The initial value of *ptr. + */ +#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__) +#define lf_val_compare_and_swap(ptr, oldval, newval) InterlockedCompareExchange(ptr, newval, oldval) +#elif defined(__GNUC__) || defined(__clang__) +#define lf_val_compare_and_swap(ptr, oldval, newval) __sync_val_compare_and_swap(ptr, oldval, newval) +#else +#error "Compiler not supported" +#endif + #endif /** diff --git a/core/threaded/data_collection.h b/core/threaded/data_collection.h new file mode 100644 index 000000000..036c78467 --- /dev/null +++ b/core/threaded/data_collection.h @@ -0,0 +1,253 @@ +/************* +Copyright (c) 2022, The University of California at Berkeley. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +***************/ + +/** + * Scheduling-related data collection and analysis that is performed at run-time. + * @author{Peter Donovan } + */ + +#ifndef DATA_COLLECTION +#define DATA_COLLECTION + +#ifndef NUMBER_OF_WORKERS +#define NUMBER_OF_WORKERS 1 +#endif // NUMBER_OF_WORKERS + +#include +#include "scheduler.h" + +static interval_t* start_times_by_level; +static interval_t** execution_times_by_num_workers_by_level; +static interval_t* execution_times_mins; +static size_t* execution_times_argmins; +static size_t data_collection_counter = 0; +static bool collecting_data = false; + +/** + * A monotonically increasing sequence of numbers of workers, the first and last elements of which + * are too large or small to represent valid states of the system (i.e., state transitions to them + * are instantaneously reflected). + */ +static size_t* possible_nums_workers; + +extern size_t num_levels; +extern size_t max_num_workers; +instant_t lf_time_physical(void); + +#define START_EXPERIMENTS 8 +#define SLOW_EXPERIMENTS 256 +#define EXECUTION_TIME_MEMORY 15 + +/** @brief Initialize the possible_nums_workers array. */ +static void possible_nums_workers_init() { + // Start with 0 and end with two numbers strictly greater than max_num_workers. This must start + // at 4 because the first two and last two entries are not counted. + size_t pnw_length = 4; + size_t temp = max_num_workers; + while ((temp >>= 1)) pnw_length++; + possible_nums_workers = (size_t*) malloc(pnw_length * sizeof(size_t)); + temp = 1; + possible_nums_workers[0] = 0; + for (int i = 1; i < pnw_length; i++) { + possible_nums_workers[i] = temp; + temp *= 2; + } + assert(temp > max_num_workers); +} + +/** + * @brief Return a random integer in the interval [-1, +1] representing whether the number of + * workers used on a certain level should increase, decrease, or remain the same, with a probability + * distribution possibly dependent on the parameters. + * @param current_state The index currently used by this level in the possible_nums_workers array. + * @param execution_time An estimate of the execution time of the level in the case for which we + * would like to optimize. + */ +static int get_jitter(size_t current_state, interval_t execution_time) { + static const size_t parallelism_cost_max = 114688; + // The following handles the case where the current level really is just fluff: + // No parallelism needed, no work to be done. + if (execution_time < 16384 && current_state == 1) return 0; + int left_score = 16384; // Want: For execution time = 65536, p(try left) = p(try right) + int middle_score = 65536; + int right_score = 65536; + if (execution_time < parallelism_cost_max) left_score += parallelism_cost_max - execution_time; + int result = rand() % (left_score + middle_score + right_score); + if (result < left_score) return -1; + if (result < left_score + middle_score) return 0; + return 1; +} + +/** @brief Get the number of workers resulting from a random state transition. */ +static size_t get_nums_workers_neighboring_state(size_t current_state, interval_t execution_time) { + size_t jitter = get_jitter(current_state, execution_time); + if (!jitter) return current_state; + size_t i = 1; + // TODO: There are more efficient ways to do this. + while (possible_nums_workers[i] < current_state) i++; + return possible_nums_workers[i + jitter]; +} + +static void data_collection_init(sched_params_t* params) { + size_t num_levels = params->num_reactions_per_level_size; + start_times_by_level = (interval_t*) calloc(num_levels, sizeof(interval_t)); + execution_times_by_num_workers_by_level = (interval_t**) calloc( + num_levels, sizeof(interval_t*) + ); + execution_times_mins = (interval_t*) calloc(num_levels, sizeof(interval_t)); + execution_times_argmins = (size_t*) calloc(num_levels, sizeof(size_t)); + for (size_t i = 0; i < num_levels; i++) { + execution_times_argmins[i] = max_num_workers; + execution_times_by_num_workers_by_level[i] = (interval_t*) calloc( + max_num_workers + 1, // Add 1 for 1-based indexing + sizeof(interval_t) + ); + } + possible_nums_workers_init(); +} + +static void data_collection_free() { + free(start_times_by_level); + for (size_t i = 0; i < num_levels; i++) { + free(execution_times_by_num_workers_by_level[i]); + } + free(execution_times_by_num_workers_by_level); + free(possible_nums_workers); +} + +/** @brief Record that the execution of the given level is beginning. */ +static void data_collection_start_level(size_t level) { + if (collecting_data) start_times_by_level[level] = lf_time_physical(); +} + +/** @brief Record that the execution of the given level has completed. */ +static void data_collection_end_level(size_t level, size_t num_workers) { + if (collecting_data && start_times_by_level[level]) { + interval_t dt = lf_time_physical() - start_times_by_level[level]; + if (!execution_times_by_num_workers_by_level[level][num_workers]) { + execution_times_by_num_workers_by_level[level][num_workers] = MAX( + dt, + 2 * execution_times_by_num_workers_by_level[level][execution_times_argmins[level]] + ); + } + interval_t* prior_et = &execution_times_by_num_workers_by_level[level][num_workers]; + *prior_et = (*prior_et * EXECUTION_TIME_MEMORY + dt) / (EXECUTION_TIME_MEMORY + 1); + } +} + +static size_t restrict_to_range(size_t start_inclusive, size_t end_inclusive, size_t value) { + if (value < start_inclusive) return start_inclusive; + if (value > end_inclusive) return end_inclusive; + return value; +} + +/** + * @brief Update num_workers_by_level in-place. + * @param num_workers_by_level The number of workers that should be used to execute each level. + * @param max_num_workers_by_level The maximum possible number of workers that could reasonably be + * assigned to each level. + * @param jitter Whether the possibility of state transitions to numbers of workers that are not + * (yet) empirically optimal is desired. + */ +static void compute_number_of_workers( + size_t* num_workers_by_level, + size_t* max_num_workers_by_level, + bool jitter +) { + for (size_t level = 0; level < num_levels; level++) { + interval_t this_execution_time = execution_times_by_num_workers_by_level[level][ + num_workers_by_level[level] + ]; + size_t ideal_number_of_workers; + size_t max_reasonable_num_workers = max_num_workers_by_level[level]; + ideal_number_of_workers = execution_times_argmins[level]; + int range = 1; + if (jitter) { + ideal_number_of_workers = get_nums_workers_neighboring_state( + ideal_number_of_workers, this_execution_time + ); + } + int minimum_workers = 1; +#ifdef WORKERS_NEEDED_FOR_FEDERATE + // TODO: only apply this constraint on levels containing control reactions + minimum_workers = WORKERS_NEEDED_FOR_FEDERATE > max_reasonable_num_workers ? + max_reasonable_num_workers : WORKERS_NEEDED_FOR_FEDERATE; +#endif + num_workers_by_level[level] = restrict_to_range( + minimum_workers, max_reasonable_num_workers, ideal_number_of_workers + ); + } +} + +/** + * @brief Update minimum and argmin (wrt number of workers used) execution times according the most + * recent execution times recorded. + * @param num_workers_by_level The number of workers most recently used to execute each level. + */ +static void compute_costs(size_t* num_workers_by_level) { + for (size_t level = 0; level < num_levels; level++) { + interval_t score = execution_times_by_num_workers_by_level[level][ + num_workers_by_level[level] + ]; + if ( + !execution_times_mins[level] + | (score < execution_times_mins[level]) + | (num_workers_by_level[level] == execution_times_argmins[level]) + ) { + execution_times_mins[level] = score; + execution_times_argmins[level] = num_workers_by_level[level]; + } + } +} + +/** + * @brief Record that the execution of a tag has completed. + * @param num_workers_by_level The number of workers used to execute each level of the tag. + * @param max_num_workers_by_level The maximum number of workers that could reasonably be used to + * execute each level, for any tag. + */ +static void data_collection_end_tag( + size_t* num_workers_by_level, + size_t* max_num_workers_by_level +) { + if (collecting_data && start_times_by_level[0]) { + compute_costs(num_workers_by_level); + } + data_collection_counter++; + size_t period = 2 + 128 * (data_collection_counter > SLOW_EXPERIMENTS); + size_t state = data_collection_counter % period; + if (state == 0) { + compute_number_of_workers( + num_workers_by_level, + max_num_workers_by_level, + data_collection_counter > START_EXPERIMENTS + ); + collecting_data = true; + } else if (state == 1) { + compute_number_of_workers(num_workers_by_level, max_num_workers_by_level, false); + collecting_data = false; + } +} + +#endif diff --git a/core/threaded/scheduler_adaptive.c b/core/threaded/scheduler_adaptive.c new file mode 100644 index 000000000..8b951fe44 --- /dev/null +++ b/core/threaded/scheduler_adaptive.c @@ -0,0 +1,129 @@ +/************* +Copyright (c) 2022, The University of California at Berkeley. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR +ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON +ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +***************/ + +/** + * This is a non-priority-driven scheduler. See scheduler.h for documentation. + * @author{Peter Donovan } + */ + +#ifndef NUMBER_OF_WORKERS +#define NUMBER_OF_WORKERS 1 +#endif // NUMBER_OF_WORKERS + +#include + +#include "scheduler.h" +#include "../utils/pqueue_support.h" +#include "scheduler_sync_tag_advance.c" +#include "worker_assignments.h" +#include "worker_states.h" + +#ifndef MAX_REACTION_LEVEL +#define MAX_REACTION_LEVEL INITIAL_REACT_QUEUE_SIZE +#endif + +static bool init_called = false; +static bool should_stop = false; + +///////////////////////// Scheduler Private Functions /////////////////////////// + +/** + * @brief Increment the level currently being executed, and the tag if necessary. + * @param worker The number of the calling worker. + */ +static void advance_level_and_unlock(size_t worker) { + size_t max_level = num_levels - 1; + while (true) { + if (current_level == max_level) { + data_collection_end_tag(num_workers_by_level, max_num_workers_by_level); + set_level(0); + if (_lf_sched_advance_tag_locked()) { + should_stop = true; + worker_states_awaken_locked(worker, max_num_workers); + worker_states_unlock(worker); + return; + } + } else { + set_level(current_level + 1); + } + size_t total_num_reactions = get_num_reactions(); + if (total_num_reactions) { + size_t num_workers_to_awaken = MIN(total_num_reactions, num_workers); + assert(num_workers_to_awaken > 0); + worker_states_awaken_locked(worker, num_workers_to_awaken); + worker_states_unlock(worker); + return; + } + } +} + +///////////////////// Scheduler Init and Destroy API ///////////////////////// + +void lf_sched_init(size_t number_of_workers, sched_params_t* params) { + // TODO: Instead of making this a no-op, crash the program. If this gets called twice, then that + // is a bug that should be fixed. + if (init_called) return; + worker_states_init(number_of_workers); + worker_assignments_init(number_of_workers, params); + init_called = true; +} + +void lf_sched_free() { + worker_states_free(); + worker_assignments_free(); +} + +///////////////////////// Scheduler Worker API /////////////////////////////// + +reaction_t* lf_sched_get_ready_reaction(int worker_number) { + assert(worker_number >= 0); + reaction_t* ret; + while (true) { + size_t level_counter_snapshot = level_counter; + ret = worker_assignments_get_or_lock(worker_number); + if (ret) return ret; + if (worker_states_finished_with_level_locked(worker_number)) { + advance_level_and_unlock(worker_number); + } else { + worker_states_sleep_and_unlock(worker_number, level_counter_snapshot); + } + if (should_stop) { + return NULL; + } + } + return (reaction_t*) ret; +} + +void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction) { + assert(worker_number >= 0); + assert(done_reaction->status != inactive); + done_reaction->status = inactive; +} + +void lf_sched_trigger_reaction(reaction_t* reaction, int worker_number) { + assert(worker_number >= -1); + if (!lf_bool_compare_and_swap(&reaction->status, inactive, queued)) return; + worker_assignments_put(reaction); +} diff --git a/core/threaded/worker_assignments.h b/core/threaded/worker_assignments.h new file mode 100644 index 000000000..b934c30b5 --- /dev/null +++ b/core/threaded/worker_assignments.h @@ -0,0 +1,226 @@ +/************* +Copyright (c) 2022, The University of California at Berkeley. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +***************/ + +/** + * Assign reactions to workers. + * @author{Peter Donovan } + */ + +#ifndef WORKER_ASSIGNMENTS +#define WORKER_ASSIGNMENTS + +#ifndef NUMBER_OF_WORKERS +#define NUMBER_OF_WORKERS 1 +#endif // NUMBER_OF_WORKERS + +#include +#include "scheduler.h" + +/** The queued reactions. */ +static reaction_t**** reactions_by_worker_by_level; +/** The number of queued reactions currently assigned to each worker at each level. */ +static size_t** num_reactions_by_worker_by_level; +/** The maximum number of workers that could possibly be kept simultaneously busy at each level. */ +static size_t* max_num_workers_by_level; +/** The number of workers that will be used to execute each level. */ +static size_t* num_workers_by_level; +/** The number of levels. */ +static size_t num_levels; +/** The maximum number of workers that can be used to execute any level. */ +static size_t max_num_workers; + +/** The following values apply to the current level. */ +static size_t current_level; +/** The number of reactions each worker still has to execute, indexed by worker. */ +static size_t* num_reactions_by_worker; +/** The reactions to be executed, indexed by assigned worker. */ +static reaction_t*** reactions_by_worker; +/** The total number of workers active, including those who have finished their work. */ +static size_t num_workers; + +#include "data_collection.h" + +static void worker_states_lock(size_t worker); +static void worker_states_unlock(size_t worker); + +/** + * @brief Set the level to be executed now. This function assumes that concurrent calls to it are + * impossible. + * @param level The new current level. + */ +static void set_level(size_t level) { + assert(level < num_levels); + assert(0 <= level); + data_collection_end_level(current_level, num_workers); + current_level = level; + num_reactions_by_worker = num_reactions_by_worker_by_level[level]; + reactions_by_worker = reactions_by_worker_by_level[level]; + num_workers = num_workers_by_level[level]; + // TODO: Experiment with not recording that the level is starting in the case that there is + // nothing to execute. We need not optimize for the case when there is nothing to execute + // because that case is not merely optimized, but is optimized out (we do not bother with + // executing nothing). + data_collection_start_level(current_level); +} + +/** @brief Return the total number of reactions enqueued on the current level. */ +static size_t get_num_reactions() { + size_t total_num_reactions = 0; + for (size_t i = 0; i < num_workers; i++) { + total_num_reactions += num_reactions_by_worker[i]; + } + // TODO: if num_workers was > total_num_reactions, report this to data_collection? + return total_num_reactions; +} + +static void worker_assignments_init(size_t number_of_workers, sched_params_t* params) { + num_levels = params->num_reactions_per_level_size; + max_num_workers = number_of_workers; + reactions_by_worker_by_level = (reaction_t****) malloc(sizeof(reaction_t***) * num_levels); + num_reactions_by_worker_by_level = (size_t**) malloc(sizeof(size_t*) * num_levels); + num_workers_by_level = (size_t*) malloc(sizeof(size_t) * num_levels); + max_num_workers_by_level = (size_t*) malloc(sizeof(size_t) * num_levels); + for (size_t level = 0; level < num_levels; level++) { + size_t num_reactions = params->num_reactions_per_level[level]; + size_t num_workers = num_reactions < max_num_workers ? num_reactions : max_num_workers; + max_num_workers_by_level[level] = num_workers; + num_workers_by_level[level] = max_num_workers_by_level[level]; + reactions_by_worker_by_level[level] = (reaction_t***) malloc( + sizeof(reaction_t**) * max_num_workers + ); + num_reactions_by_worker_by_level[level] = (size_t*) calloc(max_num_workers, sizeof(size_t)); + for (size_t worker = 0; worker < max_num_workers_by_level[level]; worker++) { + reactions_by_worker_by_level[level][worker] = (reaction_t**) malloc( + sizeof(reaction_t*) * num_reactions + ); // Warning: This wastes space. + } + } + data_collection_init(params); + set_level(0); +} + +static void worker_assignments_free() { + for (size_t level = 0; level < num_levels; level++) { + for (size_t worker = 0; worker < max_num_workers_by_level[level]; worker++) { + free(reactions_by_worker_by_level[level][worker]); + } + free(reactions_by_worker_by_level[level]); + free(num_reactions_by_worker_by_level[level]); + } + free(max_num_workers_by_level); + free(num_workers_by_level); + data_collection_free(); +} + +/** + * @brief Return a reaction that has been assigned to the given worker, or NULL if no such reaction + * exists. + * @param worker The number of a worker needing work. + */ +static reaction_t* get_reaction(size_t worker) { +#ifndef FEDERATED + int index = lf_atomic_add_fetch(num_reactions_by_worker + worker, -1); + if (index >= 0) { + return reactions_by_worker[worker][index]; + } + num_reactions_by_worker[worker] = 0; + return NULL; +#else + // This is necessary for federated programs because reactions may be inserted into the current + // level. + int old_num_reactions; + int current_num_reactions = num_reactions_by_worker[worker]; + int index; + do { + old_num_reactions = current_num_reactions; + if (old_num_reactions <= 0) return NULL; + } while ( + (current_num_reactions = lf_val_compare_and_swap( + num_reactions_by_worker + worker, + old_num_reactions, + (index = old_num_reactions - 1) + )) != old_num_reactions + ); + return reactions_by_worker[worker][index]; +#endif +} + +/** + * @brief Get a reaction for the given worker to execute. If no such reaction exists, claim the + * mutex. + * @param worker A worker requesting work. + * @return reaction_t* A reaction to execute, or NULL if no such reaction exists. + */ +static reaction_t* worker_assignments_get_or_lock(size_t worker) { + assert(worker >= 0); + // assert(worker < num_workers); // There are edge cases where this doesn't hold. + assert(num_reactions_by_worker[worker] >= 0); + reaction_t* ret; + while (true) { + if ((ret = get_reaction(worker))) return ret; + if (worker < num_workers) { + for ( + size_t victim = (worker + 1) % num_workers; + victim != worker; + victim = (victim + 1) % num_workers + ) { + if ((ret = get_reaction(victim))) return ret; + } + } + worker_states_lock(worker); + if (!num_reactions_by_worker[worker]) { + return NULL; + } + worker_states_unlock(worker); + } +} + +/** + * @brief Trigger the given reaction. + * @param reaction A reaction to be executed in the current tag. + */ +static void worker_assignments_put(reaction_t* reaction) { + size_t level = LEVEL(reaction->index); + assert(reaction != NULL); +#ifndef FEDERATED + assert(level > current_level || current_level == 0); +#endif + assert(level < num_levels); + // Source: https://xorshift.di.unimi.it/splitmix64.c + // TODO: This is probably not the most efficient way to get the randomness that we need because + // it is designed to give an entire word of randomness, whereas we only need + // ~log2(num_workers_by_level[level]) bits of randomness. + uint64_t hash = (uint64_t) reaction; + hash = (hash ^ (hash >> 30)) * 0xbf58476d1ce4e5b9; + hash = (hash ^ (hash >> 27)) * 0x94d049bb133111eb; + hash = hash ^ (hash >> 31); + size_t worker = hash % num_workers_by_level[level]; + size_t num_preceding_reactions = lf_atomic_fetch_add( + &num_reactions_by_worker_by_level[level][worker], + 1 + ); + reactions_by_worker_by_level[level][worker][num_preceding_reactions] = reaction; +} + +#endif diff --git a/core/threaded/worker_states.h b/core/threaded/worker_states.h new file mode 100644 index 000000000..6029a3321 --- /dev/null +++ b/core/threaded/worker_states.h @@ -0,0 +1,211 @@ +/************* +Copyright (c) 2022, The University of California at Berkeley. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +***************/ + +/** + * Management of worker wakefulness and locking. + * @author{Peter Donovan } + */ + +#ifndef WORKER_STATES +#define WORKER_STATES + +#ifndef NUMBER_OF_WORKERS +#define NUMBER_OF_WORKERS 1 +#endif // NUMBER_OF_WORKERS + +#include +#include "scheduler.h" +#include "../platform.h" + +/** An array of condition variables, each corresponding to a group of workers. */ +static lf_cond_t* worker_conds; +/** The cumsum of the sizes of the groups of workers corresponding to each successive cond. */ +static size_t* cumsum_of_worker_group_sizes; +/** The number of non-waiting threads. */ +static volatile size_t num_loose_threads; +/** The number of threads that were awakened for the purpose of executing the current level. */ +static volatile size_t num_awakened; +/** Whether the mutex is held by each worker via this module's API. */ +static bool* mutex_held; + +/** See worker_assignments.h for documentation. */ +extern size_t current_level; +extern size_t** num_reactions_by_worker_by_level; +extern size_t max_num_workers; + +/** See reactor_threaded.c for documentation. */ +extern lf_mutex_t mutex; + +/** See reactor_common.c for documentation. */ +extern bool fast; + +/** + * The level counter is a number that changes whenever the current level changes. + * + * This number must have a very long period in the sense that if it changes and is checked at a time + * in the future that is selected from some "reasonable" distribution, the probability that it will + * have returned to the same value must be negligible. + */ +static size_t level_counter = 0; + +/** + * @brief Return the index of the condition variable used by worker. + * + * This function is nondecreasing, and the least element of its image is zero. + * + * @param worker A worker number. + * @return size_t The index of the condition variable used by worker. + */ +static size_t cond_of(size_t worker) { + // Note: __builtin_clz with GCC might be preferred, or fls (?). + int ret = 0; + while (worker) { + ret++; + worker >>= 1; + } + return ret; +} + +static void worker_states_init(size_t number_of_workers) { + size_t greatest_worker_number = number_of_workers - 1; + size_t num_conds = cond_of(greatest_worker_number) + 1; + worker_conds = (lf_cond_t*) malloc(sizeof(lf_cond_t) * num_conds); + cumsum_of_worker_group_sizes = (size_t*) calloc(num_conds, sizeof(size_t)); + mutex_held = (bool*) calloc(number_of_workers, sizeof(bool)); + for (int i = 0; i < number_of_workers; i++) { + cumsum_of_worker_group_sizes[cond_of(i)]++; + } + for (int i = 1; i < num_conds; i++) { + cumsum_of_worker_group_sizes[i] += cumsum_of_worker_group_sizes[i - 1]; + } + for (int i = 0; i < num_conds; i++) { + lf_cond_init(worker_conds + i); + } + num_loose_threads = number_of_workers; +} + +static void worker_states_free() { + // FIXME: Why do the condition variables and mutexes not need to be freed? + free(worker_conds); + free(mutex_held); +} + +/** + * @brief Awaken the workers scheduled to work on the current level. + * + * @param worker The calling worker. + * @param num_to_awaken The number of workers to awaken. + * @return A snapshot of the level counter after awakening the workers. + */ +static void worker_states_awaken_locked(size_t worker, size_t num_to_awaken) { + assert(num_to_awaken <= max_num_workers); + if ((worker == 0) && (num_to_awaken <= 1)) { + num_loose_threads = 1; + return; + } + size_t greatest_worker_number_to_awaken = num_to_awaken - 1; + size_t max_cond = cond_of(greatest_worker_number_to_awaken); + if (!mutex_held[worker]) { + mutex_held[worker] = true; + lf_mutex_lock(&mutex); + } + // The predicate of the condition variable depends on num_awakened and level_counter, so + // this is a critical section. + num_loose_threads = cumsum_of_worker_group_sizes[max_cond]; + num_loose_threads += worker >= num_loose_threads; + num_awakened = num_loose_threads; + level_counter++; + for (int cond = 0; cond <= max_cond; cond++) { + lf_cond_broadcast(worker_conds + cond); + } +} + +/** Lock the global mutex if needed. */ +static void worker_states_lock(size_t worker) { + assert(num_loose_threads > 0); + assert(num_loose_threads <= max_num_workers); + size_t lt = num_loose_threads; + if (lt > 1 || !fast) { // FIXME: Lock should be partially optimized out even when !fast + lf_mutex_lock(&mutex); + assert(!mutex_held[worker]); + mutex_held[worker] = true; + } +} + +/** Unlock the global mutex if needed. */ +static void worker_states_unlock(size_t worker) { + if (!mutex_held[worker]) return; + mutex_held[worker] = false; + lf_mutex_unlock(&mutex); +} + +/** + * @brief Record that worker is finished working on the current level. + * + * @param worker The number of a worker. + * @return true If this is the last worker to finish working on the current level. + * @return false If at least one other worker is still working on the current level. + */ +static bool worker_states_finished_with_level_locked(size_t worker) { + assert(worker >= 0); + assert(num_loose_threads > 0); + assert(num_reactions_by_worker[worker] != 1); + assert(((int64_t) num_reactions_by_worker[worker]) <= 0); + // Why use an atomic operation when we are supposed to be "as good as locked"? Because I took a + // shortcut, and the shortcut was imperfect. + size_t ret = lf_atomic_add_fetch(&num_loose_threads, -1); + assert(ret <= max_num_workers); // Check for underflow + return !ret; +} + +/** + * @brief Make the given worker go to sleep. + * + * This should be called by the given worker when the worker will do nothing for the remainder of + * the execution of the current level. + * + * @param worker The number of the calling worker. + * @param level_counter_snapshot The value of the level counter at the time of the decision to + * sleep. + */ +static void worker_states_sleep_and_unlock(size_t worker, size_t level_counter_snapshot) { + assert(worker < max_num_workers); + assert(num_loose_threads <= max_num_workers); + if (!mutex_held[worker]) { + lf_mutex_lock(&mutex); + } + mutex_held[worker] = false; // This will be true soon, upon call to lf_cond_wait. + size_t cond = cond_of(worker); + if ( + ((level_counter_snapshot == level_counter) || worker >= num_awakened) + ) { + do { + lf_cond_wait(worker_conds + cond, &mutex); + } while (level_counter_snapshot == level_counter || worker >= num_awakened); + } + assert(!mutex_held[worker]); // This thread holds the mutex, but it did not report that. + lf_mutex_unlock(&mutex); +} + +#endif diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index d71afe82b..1a62c2c9f 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -25e67092a31bb30811a7907bc2b8644b9a2d2b43 +36c2e76fdb75717a825ab4217fe80b74076d0cd0