Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix adaptive scheduler #463

Merged
merged 3 commits into from
Jul 2, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions core/reactor_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -1166,6 +1166,7 @@ void termination(void) {
}
}
}
lf_tracing_global_shutdown();
// Skip most cleanup on abnormal termination.
if (_lf_normal_termination) {
_lf_free_all_tokens(); // Must be done before freeing reactors.
Expand All @@ -1189,8 +1190,6 @@ void termination(void) {
#endif
lf_free_all_reactors();

lf_tracing_global_shutdown();

// Free up memory associated with environment.
// Do this last so that printed warnings don't access freed memory.
for (int i = 0; i < num_envs; i++) {
Expand Down
21 changes: 13 additions & 8 deletions core/threaded/scheduler_adaptive.c
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ static size_t cond_of(size_t worker) {
static void set_level(lf_scheduler_t* scheduler, size_t level) {
worker_assignments_t* worker_assignments = scheduler->custom_data->worker_assignments;
assert(level < worker_assignments->num_levels);
assert(0 <= level);
assert(0 <= (long long) level);
data_collection_end_level(scheduler, worker_assignments->current_level, worker_assignments->num_workers);
worker_assignments->current_level = level;
worker_assignments->num_reactions_by_worker = worker_assignments->num_reactions_by_worker_by_level[level];
Expand Down Expand Up @@ -224,7 +224,7 @@ static reaction_t* get_reaction(lf_scheduler_t* scheduler, size_t worker) {
if (old_num_reactions <= 0)
return NULL;
} while ((current_num_reactions = lf_atomic_val_compare_and_swap32(
((int32_t*)worker_assignments->num_reactions_by_worker + worker), old_num_reactions,
(int32_t*) (worker_assignments->num_reactions_by_worker + worker), old_num_reactions,
(index = old_num_reactions - 1))) != old_num_reactions);
return worker_assignments->reactions_by_worker[worker][index];
#endif
Expand All @@ -238,9 +238,9 @@ static reaction_t* get_reaction(lf_scheduler_t* scheduler, size_t worker) {
*/
static reaction_t* worker_assignments_get_or_lock(lf_scheduler_t* scheduler, size_t worker) {
worker_assignments_t* worker_assignments = scheduler->custom_data->worker_assignments;
assert(worker >= 0);
assert((long long) worker >= 0);
// assert(worker < num_workers); // There are edge cases where this doesn't hold.
assert(worker_assignments->num_reactions_by_worker[worker] >= 0);
assert((long long) worker_assignments->num_reactions_by_worker[worker] >= 0);
reaction_t* ret;
while (true) {
if ((ret = get_reaction(scheduler, worker)))
Expand Down Expand Up @@ -425,6 +425,7 @@ static void worker_states_sleep_and_unlock(lf_scheduler_t* scheduler, size_t wor
static void advance_level_and_unlock(lf_scheduler_t* scheduler, size_t worker) {
worker_assignments_t* worker_assignments = scheduler->custom_data->worker_assignments;
size_t max_level = worker_assignments->num_levels - 1;
size_t total_num_reactions;
while (true) {
if (worker_assignments->current_level == max_level) {
data_collection_end_tag(scheduler, worker_assignments->num_workers_by_level,
Expand All @@ -438,12 +439,15 @@ static void advance_level_and_unlock(lf_scheduler_t* scheduler, size_t worker) {
}
} else {
#ifdef FEDERATED
lf_stall_advance_level_federation(scheduler->env, worker_assignments->current_level);
lf_stall_advance_level_federation_locked(worker_assignments->current_level);
#endif
worker_assignments->current_level++;
set_level(scheduler, worker_assignments->current_level);
total_num_reactions = get_num_reactions(scheduler);
if (!total_num_reactions) {
worker_assignments->current_level++;
set_level(scheduler, worker_assignments->current_level);
}
}
size_t total_num_reactions = get_num_reactions(scheduler);
total_num_reactions = get_num_reactions(scheduler);
if (total_num_reactions) {
size_t num_workers_to_awaken = LF_MIN(total_num_reactions, worker_assignments->num_workers);
LF_ASSERT(num_workers_to_awaken > 0, "");
Expand Down Expand Up @@ -598,6 +602,7 @@ static size_t restrict_to_range(size_t start_inclusive, size_t end_inclusive, si
*/
static void compute_number_of_workers(lf_scheduler_t* scheduler, size_t* num_workers_by_level,
size_t* max_num_workers_by_level, bool jitter) {

data_collection_t* data_collection = scheduler->custom_data->data_collection;
for (size_t level = 0; level < data_collection->num_levels; level++) {
interval_t this_execution_time =
Expand Down
Loading