Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive scheduler #85

Merged
merged 50 commits into from
Jun 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
a78e735
[scheduler] 25/28 tests passing for NP2.
petervdonovan Mar 5, 2022
505c5eb
[scheduler] Tests passing for NP2.
petervdonovan Mar 6, 2022
086e7d0
[scheduler] First possible performance improvement.
petervdonovan Mar 6, 2022
9a40c6e
[scheduler] Include assert.h.
petervdonovan Mar 6, 2022
5885ec8
[scheduler] SortedLinkList: Make C++ compiler happy.
petervdonovan Mar 6, 2022
746af9a
[scheduler] Make compiler happy.
petervdonovan Mar 6, 2022
a65531d
[scheduler] Reduce expensive calls to get_physical_time.
petervdonovan Mar 7, 2022
022b874
[scheduler] Do not use more workers than there are reactions.
petervdonovan Mar 7, 2022
ece6875
[scheduler] Try again to keep C++ and C compilers both happy.
petervdonovan Mar 7, 2022
ae97fbb
[scheduler] Small adjustments.
petervdonovan Mar 7, 2022
c34ef01
[scheduler] Try to dynamically optimize num workers.
petervdonovan Mar 7, 2022
10736a0
[scheduler] First possible improvement due to runtime analysis.
petervdonovan Mar 8, 2022
bb37043
[scheduler] Dirty patch on the heuristic.
petervdonovan Mar 9, 2022
1466416
[scheduler] Advance level as far as necessary at once.
petervdonovan Mar 19, 2022
09b18cc
[scheduler] If everyone else is sleeping, do not acquire the mutex.
petervdonovan Mar 20, 2022
34c1334
[scheduler] Fix bug caused by previous commit.
petervdonovan Mar 25, 2022
c03077b
[scheduler] Adjust data_collection.h.
petervdonovan Mar 25, 2022
e58497d
[scheduler] Adjust data_collection.h.
petervdonovan Mar 30, 2022
3294609
[scheduler] Add worker affinity.
petervdonovan Mar 30, 2022
4fdcc09
[scheduler] Add work stealing.
petervdonovan Mar 30, 2022
5b972f0
[scheduler] Fix the race condition.*
petervdonovan Mar 31, 2022
7e0e489
[scheduler] Address a rare concurrency bug in SleepingBarber.
petervdonovan Mar 31, 2022
59afce7
[scheduler] Fix another race condition.
petervdonovan Mar 31, 2022
4830702
[scheduler] Adjust data_collection.h.
petervdonovan Mar 31, 2022
17120ad
[scheduler] Bugfix.
petervdonovan Apr 1, 2022
c6303d1
[scheduler] Fix another race condition.
petervdonovan Apr 2, 2022
0625c7c
[scheduler] Minor cleanup.
petervdonovan Apr 2, 2022
3be18ed
[scheduler] Check number of workers required more dynamically.
petervdonovan Apr 2, 2022
e0adf73
[scheduler] Let there be no hypergalactic state transitions.
petervdonovan Apr 2, 2022
0aa1104
[scheduler] More relatively minor tweaks.
petervdonovan Apr 3, 2022
c8a3425
[scheduler] Bugfix.
petervdonovan Apr 6, 2022
606cab9
Compute a hash to assign a reaction to a worker.
petervdonovan Apr 13, 2022
a5b8780
[scheduler] Update CI.
petervdonovan May 31, 2022
ae3444c
[scheduler] Update lingua-franca-ref.txt.
petervdonovan May 31, 2022
80a90f0
[scheduler] Comments and superficial style changes.
petervdonovan May 31, 2022
c020c6e
Update lingua-franca-ref.txt.
petervdonovan Jun 1, 2022
85e45fb
Update lingua-franca-ref.txt.
petervdonovan Jun 1, 2022
c8d697a
[scheduler] Rename "heuristic" -> "adaptive"
petervdonovan Jun 1, 2022
701cfad
Update lingua-franca-ref.txt.
petervdonovan Jun 1, 2022
29cf529
[scheduler] Remove a special case.
petervdonovan Jun 1, 2022
eb202c9
[scheduler] Superficial commenting/renaming.
petervdonovan Jun 1, 2022
e09ad62
[scheduler] Adjust assert.
petervdonovan Jun 1, 2022
d682724
[scheduler] Account for insertion into the current level.
petervdonovan Jun 3, 2022
510cd4c
[scheduler] Unrelated bugfix.
petervdonovan Jun 4, 2022
ea3011b
[scheduler] Update assertion.
petervdonovan Jun 5, 2022
c19bb3d
[scheduler] Fix deadlock.
petervdonovan Jun 7, 2022
f1e20d5
[scheduler] Clean up after previous commit.
petervdonovan Jun 7, 2022
d23d17f
[scheduler] Do not directly use compiler builtins.
petervdonovan Jun 7, 2022
3e1ab3e
[scheduler] Minor cleanups.
petervdonovan Jun 9, 2022
4d300d9
Merge branch 'main' into scaling-wrt-threads-runtime-experiments
petervdonovan Jun 20, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ jobs:
uses: lf-lang/lingua-franca/.github/workflows/extract-ref.yml@master
with:
file: 'lingua-franca-ref.txt'

lf-default:
needs: fetch-lf
uses: lf-lang/lingua-franca/.github/workflows/c-tests.yml@master
Expand All @@ -38,12 +38,19 @@ jobs:
runtime-ref: ${{ github.ref }}
compiler-ref: ${{ needs.fetch-lf.outputs.ref }}
scheduler: GEDF_NP

lf-gedf-np-ci:
needs: fetch-lf
uses: lf-lang/lingua-franca/.github/workflows/c-tests.yml@master
with:
runtime-ref: ${{ github.ref }}
compiler-ref: ${{ needs.fetch-lf.outputs.ref }}
scheduler: GEDF_NP_CI


lf-adaptive:
needs: fetch-lf
uses: lf-lang/lingua-franca/.github/workflows/c-tests.yml@master
with:
runtime-ref: ${{ github.ref }}
compiler-ref: ${{ needs.fetch-lf.outputs.ref }}
scheduler: adaptive
20 changes: 18 additions & 2 deletions core/platform.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,17 +210,33 @@ extern int lf_cond_timedwait(lf_cond_t* cond, lf_mutex_t* mutex, instant_t absol
* @param ptr A pointer to a variable.
* @param oldval The value to compare against.
* @param newval The value to assign to *ptr if comparison is successful.
* @return The true if comparison was successful. False otherwise.
* @return True if comparison was successful. False otherwise.
*/
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__)
// Assume that an integer is 32 bits.
// Assume that a boolean is represented with a 32-bit integer.
#define lf_bool_compare_and_swap(ptr, oldval, newval) (InterlockedCompareExchange(ptr, newval, oldval) == oldval)
#elif defined(__GNUC__) || defined(__clang__)
#define lf_bool_compare_and_swap(ptr, oldval, newval) __sync_bool_compare_and_swap(ptr, oldval, newval)
#else
#error "Compiler not supported"
#endif

/*
* Atomically compare the 32-bit value that ptr points to against oldval. If the
* current value is oldval, then write newval into *ptr.
* @param ptr A pointer to a variable.
* @param oldval The value to compare against.
* @param newval The value to assign to *ptr if comparison is successful.
* @return The initial value of *ptr.
*/
#if defined(WIN32) || defined(_WIN32) || defined(__WIN32__) || defined(__NT__)
#define lf_val_compare_and_swap(ptr, oldval, newval) InterlockedCompareExchange(ptr, newval, oldval)
#elif defined(__GNUC__) || defined(__clang__)
#define lf_val_compare_and_swap(ptr, oldval, newval) __sync_val_compare_and_swap(ptr, oldval, newval)
#else
#error "Compiler not supported"
#endif

#endif

/**
Expand Down
253 changes: 253 additions & 0 deletions core/threaded/data_collection.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*************
Copyright (c) 2022, The University of California at Berkeley.

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

1. Redistributions of source code must retain the above copyright notice,
this list of conditions and the following disclaimer.

2. Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
***************/

/**
* Scheduling-related data collection and analysis that is performed at run-time.
* @author{Peter Donovan <peterdonovan@berkeley.edu>}
*/

#ifndef DATA_COLLECTION
#define DATA_COLLECTION

#ifndef NUMBER_OF_WORKERS
#define NUMBER_OF_WORKERS 1
#endif // NUMBER_OF_WORKERS

#include <assert.h>
#include "scheduler.h"

static interval_t* start_times_by_level;
static interval_t** execution_times_by_num_workers_by_level;
static interval_t* execution_times_mins;
static size_t* execution_times_argmins;
static size_t data_collection_counter = 0;
static bool collecting_data = false;

/**
* A monotonically increasing sequence of numbers of workers, the first and last elements of which
* are too large or small to represent valid states of the system (i.e., state transitions to them
* are instantaneously reflected).
*/
static size_t* possible_nums_workers;

extern size_t num_levels;
extern size_t max_num_workers;
instant_t lf_time_physical(void);

#define START_EXPERIMENTS 8
#define SLOW_EXPERIMENTS 256
#define EXECUTION_TIME_MEMORY 15

/** @brief Initialize the possible_nums_workers array. */
static void possible_nums_workers_init() {
// Start with 0 and end with two numbers strictly greater than max_num_workers. This must start
// at 4 because the first two and last two entries are not counted.
size_t pnw_length = 4;
size_t temp = max_num_workers;
while ((temp >>= 1)) pnw_length++;
possible_nums_workers = (size_t*) malloc(pnw_length * sizeof(size_t));
temp = 1;
possible_nums_workers[0] = 0;
for (int i = 1; i < pnw_length; i++) {
possible_nums_workers[i] = temp;
temp *= 2;
}
assert(temp > max_num_workers);
}

/**
* @brief Return a random integer in the interval [-1, +1] representing whether the number of
* workers used on a certain level should increase, decrease, or remain the same, with a probability
* distribution possibly dependent on the parameters.
* @param current_state The index currently used by this level in the possible_nums_workers array.
* @param execution_time An estimate of the execution time of the level in the case for which we
* would like to optimize.
*/
static int get_jitter(size_t current_state, interval_t execution_time) {
static const size_t parallelism_cost_max = 114688;
// The following handles the case where the current level really is just fluff:
// No parallelism needed, no work to be done.
if (execution_time < 16384 && current_state == 1) return 0;
int left_score = 16384; // Want: For execution time = 65536, p(try left) = p(try right)
int middle_score = 65536;
int right_score = 65536;
if (execution_time < parallelism_cost_max) left_score += parallelism_cost_max - execution_time;
int result = rand() % (left_score + middle_score + right_score);
if (result < left_score) return -1;
if (result < left_score + middle_score) return 0;
return 1;
}

/** @brief Get the number of workers resulting from a random state transition. */
static size_t get_nums_workers_neighboring_state(size_t current_state, interval_t execution_time) {
size_t jitter = get_jitter(current_state, execution_time);
if (!jitter) return current_state;
size_t i = 1;
// TODO: There are more efficient ways to do this.
while (possible_nums_workers[i] < current_state) i++;
return possible_nums_workers[i + jitter];
}

static void data_collection_init(sched_params_t* params) {
size_t num_levels = params->num_reactions_per_level_size;
start_times_by_level = (interval_t*) calloc(num_levels, sizeof(interval_t));
execution_times_by_num_workers_by_level = (interval_t**) calloc(
num_levels, sizeof(interval_t*)
);
execution_times_mins = (interval_t*) calloc(num_levels, sizeof(interval_t));
execution_times_argmins = (size_t*) calloc(num_levels, sizeof(size_t));
for (size_t i = 0; i < num_levels; i++) {
execution_times_argmins[i] = max_num_workers;
execution_times_by_num_workers_by_level[i] = (interval_t*) calloc(
max_num_workers + 1, // Add 1 for 1-based indexing
sizeof(interval_t)
);
}
possible_nums_workers_init();
}

static void data_collection_free() {
free(start_times_by_level);
for (size_t i = 0; i < num_levels; i++) {
free(execution_times_by_num_workers_by_level[i]);
}
free(execution_times_by_num_workers_by_level);
free(possible_nums_workers);
}

/** @brief Record that the execution of the given level is beginning. */
static void data_collection_start_level(size_t level) {
if (collecting_data) start_times_by_level[level] = lf_time_physical();
}

/** @brief Record that the execution of the given level has completed. */
static void data_collection_end_level(size_t level, size_t num_workers) {
if (collecting_data && start_times_by_level[level]) {
interval_t dt = lf_time_physical() - start_times_by_level[level];
if (!execution_times_by_num_workers_by_level[level][num_workers]) {
execution_times_by_num_workers_by_level[level][num_workers] = MAX(
dt,
2 * execution_times_by_num_workers_by_level[level][execution_times_argmins[level]]
);
}
interval_t* prior_et = &execution_times_by_num_workers_by_level[level][num_workers];
*prior_et = (*prior_et * EXECUTION_TIME_MEMORY + dt) / (EXECUTION_TIME_MEMORY + 1);
}
}

static size_t restrict_to_range(size_t start_inclusive, size_t end_inclusive, size_t value) {
if (value < start_inclusive) return start_inclusive;
if (value > end_inclusive) return end_inclusive;
return value;
}

/**
* @brief Update num_workers_by_level in-place.
* @param num_workers_by_level The number of workers that should be used to execute each level.
* @param max_num_workers_by_level The maximum possible number of workers that could reasonably be
* assigned to each level.
* @param jitter Whether the possibility of state transitions to numbers of workers that are not
* (yet) empirically optimal is desired.
*/
static void compute_number_of_workers(
size_t* num_workers_by_level,
size_t* max_num_workers_by_level,
bool jitter
) {
for (size_t level = 0; level < num_levels; level++) {
interval_t this_execution_time = execution_times_by_num_workers_by_level[level][
num_workers_by_level[level]
];
size_t ideal_number_of_workers;
size_t max_reasonable_num_workers = max_num_workers_by_level[level];
ideal_number_of_workers = execution_times_argmins[level];
int range = 1;
if (jitter) {
ideal_number_of_workers = get_nums_workers_neighboring_state(
ideal_number_of_workers, this_execution_time
);
}
int minimum_workers = 1;
#ifdef WORKERS_NEEDED_FOR_FEDERATE
// TODO: only apply this constraint on levels containing control reactions
minimum_workers = WORKERS_NEEDED_FOR_FEDERATE > max_reasonable_num_workers ?
max_reasonable_num_workers : WORKERS_NEEDED_FOR_FEDERATE;
#endif
num_workers_by_level[level] = restrict_to_range(
minimum_workers, max_reasonable_num_workers, ideal_number_of_workers
);
}
}

/**
* @brief Update minimum and argmin (wrt number of workers used) execution times according the most
* recent execution times recorded.
* @param num_workers_by_level The number of workers most recently used to execute each level.
*/
static void compute_costs(size_t* num_workers_by_level) {
for (size_t level = 0; level < num_levels; level++) {
interval_t score = execution_times_by_num_workers_by_level[level][
num_workers_by_level[level]
];
if (
!execution_times_mins[level]
| (score < execution_times_mins[level])
| (num_workers_by_level[level] == execution_times_argmins[level])
) {
execution_times_mins[level] = score;
execution_times_argmins[level] = num_workers_by_level[level];
}
}
}

/**
* @brief Record that the execution of a tag has completed.
* @param num_workers_by_level The number of workers used to execute each level of the tag.
* @param max_num_workers_by_level The maximum number of workers that could reasonably be used to
* execute each level, for any tag.
*/
static void data_collection_end_tag(
size_t* num_workers_by_level,
size_t* max_num_workers_by_level
) {
if (collecting_data && start_times_by_level[0]) {
compute_costs(num_workers_by_level);
}
data_collection_counter++;
size_t period = 2 + 128 * (data_collection_counter > SLOW_EXPERIMENTS);
size_t state = data_collection_counter % period;
if (state == 0) {
compute_number_of_workers(
num_workers_by_level,
max_num_workers_by_level,
data_collection_counter > START_EXPERIMENTS
);
collecting_data = true;
} else if (state == 1) {
compute_number_of_workers(num_workers_by_level, max_num_workers_by_level, false);
collecting_data = false;
}
}

#endif
Loading