Skip to content

Commit

Permalink
Merge pull request #244 from lf-lang/enclave-request-stop
Browse files Browse the repository at this point in the history
Enclave request stop
  • Loading branch information
lhstrh authored Jun 30, 2023
2 parents 12e2924 + 0b76e73 commit f05fb5d
Show file tree
Hide file tree
Showing 12 changed files with 370 additions and 407 deletions.
1 change: 0 additions & 1 deletion core/federated/RTI/enclave.c
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ void initialize_enclave(enclave_t* e, uint16_t id) {
e->downstream = NULL;
e->num_downstream = 0;
e->mode = REALTIME;
e->requested_stop = false;

// Initialize the next event condition variable.
lf_cond_init(&e->next_event_condition, &rti_mutex);
Expand Down
16 changes: 13 additions & 3 deletions core/federated/RTI/enclave.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
/**
* @file
* @author Edward A. Lee (eal@berkeley.edu)
* @author Soroush Bateni (soroush@utdallas.edu)
* @author Erling Jellum (erling.r.jellum@ntnu.no)
* @author Chadlia Jerad (chadlia.jerad@ensi-uma.tn)
* @copyright (c) 2020-2023, The University of California at Berkeley
* License in [BSD 2-clause](https://github.com/lf-lang/reactor-c/blob/main/LICENSE.md)
* @brief Declarations for runtime infrastructure (RTI) for scheduling enclaves and distributed Lingua Franca programs.
* This file declares RTI features that are used by scheduling enclaves as well as federated
* LF programs.
*/

#ifndef ENCLAVE_H
#define ENCLAVE_H

Expand Down Expand Up @@ -46,9 +59,6 @@ typedef struct enclave_t {
int* downstream; // Array of downstream federate ids.
int num_downstream; // Size of the array of downstream federates.
execution_mode_t mode; // FAST or REALTIME.
bool requested_stop; // Indicates that the federate has requested stop or has replied
// to a request for stop from the RTI. Used to prevent double-counting
// a federate when handling lf_request_stop().
lf_cond_t next_event_condition; // Condition variable used by enclaves to notify an enclave
// that it's call to next_event_tag() should unblock.
} enclave_t;
Expand Down
58 changes: 32 additions & 26 deletions core/federated/RTI/rti_lib.c
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,6 @@ void notify_tag_advance_grant(enclave_t* e, tag_t tag) {
if (bytes_written < 0) {
e->state = NOT_CONNECTED;
// FIXME: We need better error handling, but don't stop other execution here.
// mark_federate_requesting_stop(fed);
}
} else {
e->last_granted = tag;
Expand Down Expand Up @@ -245,7 +244,6 @@ void notify_provisional_tag_advance_grant(enclave_t* e, tag_t tag) {
if (bytes_written < 0) {
e->state = NOT_CONNECTED;
// FIXME: We need better error handling, but don't stop other execution here.
// mark_federate_requesting_stop(fed);
}
} else {
e->last_provisionally_granted = tag;
Expand Down Expand Up @@ -560,7 +558,15 @@ void handle_next_event_tag(federate_t* fed) {
*/
bool _lf_rti_stop_granted_already_sent_to_federates = false;

void _lf_rti_broadcast_stop_time_to_federates_already_locked() {
/**
* Once the RTI has seen proposed tags from all connected federates,
* it will broadcast a MSG_TYPE_STOP_GRANTED carrying the _RTI.max_stop_tag.
* This function also checks the most recently received NET from
* each federate and resets that be no greater than the _RTI.max_stop_tag.
*
* This function assumes the caller holds the _RTI.rti_mutex lock.
*/
void _lf_rti_broadcast_stop_time_to_federates_locked() {
if (_lf_rti_stop_granted_already_sent_to_federates == true) {
return;
}
Expand Down Expand Up @@ -592,16 +598,16 @@ void _lf_rti_broadcast_stop_time_to_federates_already_locked() {
}

void mark_federate_requesting_stop(federate_t* fed) {
if (!fed->enclave.requested_stop) {
if (!fed->requested_stop) {
// Assume that the federate
// has requested stop
_f_rti->num_enclaves_handling_stop++;
fed->enclave.requested_stop = true;
fed->requested_stop = true;
}
if (_f_rti->num_enclaves_handling_stop == _f_rti->number_of_enclaves) {
// We now have information about the stop time of all
// federates.
_lf_rti_broadcast_stop_time_to_federates_already_locked();
_lf_rti_broadcast_stop_time_to_federates_locked();
}
}

Expand All @@ -619,7 +625,7 @@ void handle_stop_request_message(federate_t* fed) {

// Check whether we have already received a stop_tag
// from this federate
if (fed->enclave.requested_stop) {
if (fed->requested_stop) {
// Ignore this request
lf_mutex_unlock(&rti_mutex);
return;
Expand Down Expand Up @@ -657,10 +663,15 @@ void handle_stop_request_message(federate_t* fed) {
ENCODE_STOP_REQUEST(stop_request_buffer, _f_rti->max_stop_tag.time, _f_rti->max_stop_tag.microstep);

// Iterate over federates and send each the MSG_TYPE_STOP_REQUEST message
// if we do not have a stop_time already for them.
// if we do not have a stop_time already for them. Do not do this more than once.
if (_f_rti->stop_in_progress) {
lf_mutex_unlock(&rti_mutex);
return;
}
_f_rti->stop_in_progress = true;
for (int i = 0; i < _f_rti->number_of_enclaves; i++) {
federate_t *f = _f_rti->enclaves[i];
if (f->enclave.id != fed->enclave.id && f->enclave.requested_stop == false) {
if (f->enclave.id != fed->enclave.id && f->requested_stop == false) {
if (f->enclave.state == NOT_CONNECTED) {
mark_federate_requesting_stop(f);
continue;
Expand Down Expand Up @@ -788,8 +799,7 @@ void handle_timestamp(federate_t *my_fed) {
tag_t tag = {.time = timestamp, .microstep = 0};
tracepoint_rti_from_federate(_f_rti->trace, receive_TIMESTAMP, my_fed->enclave.id, &tag);
}
LF_PRINT_LOG("RTI received timestamp message: %ld.", timestamp);
LF_PRINT_LOG("RTI received timestamp message: " PRINTF_TIME ".", timestamp);
LF_PRINT_DEBUG("RTI received timestamp message with time: " PRINTF_TIME ".", timestamp);

lf_mutex_lock(&rti_mutex);
_f_rti->num_feds_proposed_start++;
Expand Down Expand Up @@ -924,7 +934,6 @@ void* clock_synchronization_thread(void* noargs) {
// FIXME: We need better error handling here, but clock sync failure
// should not stop execution.
lf_print_error("Clock sync failed with federate %d. Not connected.", fed_id);
// mark_federate_requesting_stop(&fed);
continue;
} else if (!fed->clock_synchronization_enabled) {
continue;
Expand Down Expand Up @@ -1002,8 +1011,6 @@ void handle_federate_resign(federate_t *my_fed) {
}

my_fed->enclave.state = NOT_CONNECTED;
// FIXME: The following results in spurious error messages.
// mark_federate_requesting_stop(my_fed);

// Indicate that there will no further events from this federate.
my_fed->enclave.next_event = FOREVER_TAG;
Expand Down Expand Up @@ -1046,8 +1053,7 @@ void* federate_thread_TCP(void* fed) {
lf_print_warning("RTI: Socket to federate %d is closed. Exiting the thread.", my_fed->enclave.id);
my_fed->enclave.state = NOT_CONNECTED;
my_fed->socket = -1;
// FIXME: We need better error handling here, but this is probably not the right thing to do.
// mark_federate_requesting_stop(my_fed);
// FIXME: We need better error handling here, but do not stop execution here.
break;
}
LF_PRINT_DEBUG("RTI: Received message type %u from federate %d.", buffer[0], my_fed->enclave.id);
Expand Down Expand Up @@ -1528,6 +1534,7 @@ void* respond_to_erroneous_connections(void* nothing) {

void initialize_federate(federate_t* fed, uint16_t id) {
initialize_enclave(&(fed->enclave), id);
fed->requested_stop = false;
fed->socket = -1; // No socket.
fed->clock_synchronization_enabled = true;
fed->in_transit_message_tags = initialize_in_transit_message_q();
Expand Down Expand Up @@ -1586,12 +1593,17 @@ void wait_for_federates(int socket_descriptor) {
if (shutdown(socket_descriptor, SHUT_RDWR)) {
LF_PRINT_LOG("On shut down TCP socket, received reply: %s", strerror(errno));
}
// NOTE: In all common TCP/IP stacks, there is a time period,
// typically between 30 and 120 seconds, called the TIME_WAIT period,
// before the port is released after this close. This is because
// the OS is preventing another program from accidentally receiving
// duplicated packets intended for this program.
close(socket_descriptor);

/************** FIXME: The following is probably not needed.
The above shutdown and close should do the job.
/* NOTE: Below is a song and dance that is apparently not needed.
The above shutdown and close appear to do the job.
// NOTE: Apparently, closing the socket will not necessarily
// Apparently, closing the socket will not necessarily
// cause the respond_to_erroneous_connections accept() call to return,
// so instead, we connect here so that it can check the _f_rti->all_federates_exited
// variable.
Expand Down Expand Up @@ -1620,13 +1632,6 @@ void wait_for_federates(int socket_descriptor) {
close(tmp_socket);
}
}
// NOTE: In all common TCP/IP stacks, there is a time period,
// typically between 30 and 120 seconds, called the TIME_WAIT period,
// before the port is released after this close. This is because
// the OS is preventing another program from accidentally receiving
// duplicated packets intended for this program.
close(socket_descriptor);
*/

if (_f_rti->socket_descriptor_UDP > 0) {
Expand Down Expand Up @@ -1827,4 +1832,5 @@ void initialize_RTI(){
_f_rti->clock_sync_exchanges_per_interval = 10,
_f_rti->authentication_enabled = false,
_f_rti->tracing_enabled = false;
_f_rti->stop_in_progress = false;
}
22 changes: 12 additions & 10 deletions core/federated/RTI/rti_lib.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@
* @file
* @author Edward A. Lee (eal@berkeley.edu)
* @author Soroush Bateni (soroush@utdallas.edu)
* @author Erling Jellum (erling.r.jellum@ntnu.no)
* @author Chadlia Jerad (chadlia.jerad@ensi-uma.tn)
* @copyright (c) 2020-2023, The University of California at Berkeley
* License in [BSD 2-clause](https://github.com/lf-lang/reactor-c/blob/main/LICENSE.md)
* @brief Declarations for runtime infrastructure (RTI) for distributed Lingua Franca programs.
*
* This file extends enclave.h with RTI features that are specific to federations and are not
* used by scheduling enclaves.
*/

#ifndef RTI_LIB_H
Expand Down Expand Up @@ -46,6 +49,9 @@ typedef enum socket_type_t {
*/
typedef struct federate_t {
enclave_t enclave;
bool requested_stop; // Indicates that the federate has requested stop or has replied
// to a request for stop from the RTI. Used to prevent double-counting
// a federate when handling lf_request_stop().
lf_thread_t thread_id; // The ID of the thread handling communication with this federate.
int socket; // The TCP socket descriptor for communicating with this federate.
struct sockaddr_in UDP_addr; // The UDP address for the federate.
Expand Down Expand Up @@ -171,6 +177,11 @@ typedef struct federation_rti_t {
* Boolean indicating that authentication is enabled.
*/
bool authentication_enabled;

/**
* Boolean indicating that a stop request is already in progress.
*/
bool stop_in_progress;
} federation_rti_t;

/**
Expand Down Expand Up @@ -255,15 +266,6 @@ void handle_logical_tag_complete(federate_t* fed);
void handle_next_event_tag(federate_t* fed);

/////////////////// STOP functions ////////////////////
/**
* Once the RTI has seen proposed tags from all connected federates,
* it will broadcast a MSG_TYPE_STOP_GRANTED carrying the _RTI.max_stop_tag.
* This function also checks the most recently received NET from
* each federate and resets that be no greater than the _RTI.max_stop_tag.
*
* This function assumes the caller holds the _RTI.rti_mutex lock.
*/
void _lf_rti_broadcast_stop_time_to_federates_already_locked();

/**
* Mark a federate requesting stop.
Expand Down
Loading

0 comments on commit f05fb5d

Please sign in to comment.