diff --git a/core/federated/RTI/main.c b/core/federated/RTI/main.c index fdc234ced..847b2eace 100644 --- a/core/federated/RTI/main.c +++ b/core/federated/RTI/main.c @@ -67,16 +67,26 @@ static rti_remote_t rti; */ const char *rti_trace_file_name = "rti.lft"; +/** Indicator that normal termination has occurred. */ +bool normal_termination = false; + /** - * @brief A clean termination of the RTI will write the trace file, if tracing is - * enabled, before exiting. + * @brief Function to run upon termination. + * This function will be invoked both after main() returns and when a signal + * that results in terminating the process, such as SIGINT. In the former + * case, it should do nothing. In the latter case, it will attempt to write + * the trace file, but without acquiring a mutex lock, so the resulting files + * may be incomplete or even corrupted. But this is better than just failing + * to write the data we have collected so far. */ void termination() { - if (rti.base.tracing_enabled) { - stop_trace(rti.base.trace); - lf_print("RTI trace file saved."); + if (!normal_termination) { + if (rti.base.tracing_enabled) { + stop_trace_locked(rti.base.trace); + lf_print("RTI trace file saved."); + } + lf_print("RTI is exiting abnormally."); } - lf_print("RTI is exiting."); } void usage(int argc, const char* argv[]) { @@ -86,7 +96,7 @@ void usage(int argc, const char* argv[]) { lf_print(" -n, --number_of_federates "); lf_print(" The number of federates in the federation that this RTI will control.\n"); lf_print(" -p, --port "); - lf_print(" The port number to use for the RTI. Must be larger than 0 and smaller than %d. Default is %d.\n", UINT16_MAX, STARTING_PORT); + lf_print(" The port number to use for the RTI. Must be larger than 0 and smaller than %d. Default is %d.\n", UINT16_MAX, DEFAULT_PORT); lf_print(" -c, --clock_sync [off|init|on] [period ] [exchanges-per-interval ]"); lf_print(" The status of clock synchronization for this federate."); lf_print(" - off: Clock synchronization is off."); @@ -254,6 +264,12 @@ int main(int argc, const char* argv[]) { // Catch the Ctrl-C signal, for a clean exit that does not lose the trace information signal(SIGINT, exit); +#ifdef SIGPIPE + // Ignore SIGPIPE errors, which terminate the entire application if + // socket write() fails because the reader has closed the socket. + // Instead, cause an EPIPE error to be set when write() fails. + signal(SIGPIPE, SIG_IGN); +#endif // SIGPIPE if (atexit(termination) != 0) { lf_print_warning("Failed to register termination function!"); } @@ -277,13 +293,20 @@ int main(int argc, const char* argv[]) { // Allocate memory for the federates rti.base.scheduling_nodes = (scheduling_node_t**)calloc(rti.base.number_of_scheduling_nodes, sizeof(scheduling_node_t*)); for (uint16_t i = 0; i < rti.base.number_of_scheduling_nodes; i++) { - federate_info_t *fed_info = (federate_info_t *) malloc(sizeof(federate_info_t)); + federate_info_t *fed_info = (federate_info_t *) calloc(1, sizeof(federate_info_t)); initialize_federate(fed_info, i); rti.base.scheduling_nodes[i] = (scheduling_node_t *) fed_info; } int socket_descriptor = start_rti_server(rti.user_specified_port); wait_for_federates(socket_descriptor); + normal_termination = true; + if (rti.base.tracing_enabled) { + // No need for a mutex lock because all threads have exited. + stop_trace_locked(rti.base.trace); + lf_print("RTI trace file saved."); + } + free_scheduling_nodes(rti.base.scheduling_nodes, rti.base.number_of_scheduling_nodes); lf_print("RTI is exiting."); return 0; diff --git a/core/federated/RTI/rti_common.c b/core/federated/RTI/rti_common.c index a6554195e..ed0408f80 100644 --- a/core/federated/RTI/rti_common.c +++ b/core/federated/RTI/rti_common.c @@ -317,7 +317,7 @@ void update_min_delays_upstream(scheduling_node_t* node) { // Put the results onto the node's struct. node->num_min_delays = count; - node->min_delays = (minimum_delay_t*)malloc(count * sizeof(minimum_delay_t)); + node->min_delays = (minimum_delay_t*)calloc(count, sizeof(minimum_delay_t)); LF_PRINT_DEBUG("++++ Node %hu(is in ZDC: %d\n", node->id, node->flags & IS_IN_ZERO_DELAY_CYCLE); int k = 0; for (int i = 0; i < rti_common->number_of_scheduling_nodes; i++) { diff --git a/core/federated/RTI/rti_local.c b/core/federated/RTI/rti_local.c index 1f6cc0928..57af1047d 100644 --- a/core/federated/RTI/rti_local.c +++ b/core/federated/RTI/rti_local.c @@ -35,7 +35,7 @@ static rti_local_t * rti_local; lf_mutex_t rti_mutex; void initialize_local_rti(environment_t *envs, int num_envs) { - rti_local = (rti_local_t*)malloc(sizeof(rti_local_t)); + rti_local = (rti_local_t*)calloc(1, sizeof(rti_local_t)); LF_ASSERT(rti_local, "Out of memory"); initialize_rti_common(&rti_local->base); @@ -47,7 +47,7 @@ void initialize_local_rti(environment_t *envs, int num_envs) { // Allocate memory for the enclave_info objects rti_local->base.scheduling_nodes = (scheduling_node_t**)calloc(num_envs, sizeof(scheduling_node_t*)); for (int i = 0; i < num_envs; i++) { - enclave_info_t *enclave_info = (enclave_info_t *) malloc(sizeof(enclave_info_t)); + enclave_info_t *enclave_info = (enclave_info_t *) calloc(1, sizeof(enclave_info_t)); initialize_enclave_info(enclave_info, i, &envs[i]); rti_local->base.scheduling_nodes[i] = (scheduling_node_t *) enclave_info; diff --git a/core/federated/RTI/rti_remote.c b/core/federated/RTI/rti_remote.c index 2fce8b1bf..05d07b5b6 100644 --- a/core/federated/RTI/rti_remote.c +++ b/core/federated/RTI/rti_remote.c @@ -86,8 +86,8 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty /* * The following used to permit reuse of a port that an RTI has previously - * used that has not been released. We no longer do this, but instead - * increment the port number until an available port is found. + * used that has not been released. We no longer do this, and instead retry + * some number of times after waiting. // SO_REUSEPORT (since Linux 3.9) // Permits multiple AF_INET or AF_INET6 sockets to be bound to an @@ -127,14 +127,11 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty (struct sockaddr *) &server_fd, sizeof(server_fd)); - // If the binding fails with this port and no particular port was specified - // in the LF program, then try the next few ports in sequence. - while (result != 0 - && specified_port == 0 - && port >= STARTING_PORT - && port <= STARTING_PORT + PORT_RANGE_LIMIT) { - lf_print("RTI failed to get port %d. Trying %d.", port, port + 1); - port++; + // Try repeatedly to bind to the specified port. + int count = 1; + while (result != 0 && count++ < PORT_BIND_RETRY_LIMIT) { + lf_print("RTI failed to get port %d. Will try again.", port); + lf_sleep(PORT_BIND_RETRY_INTERVAL); server_fd.sin_port = htons(port); result = bind( socket_descriptor, @@ -142,13 +139,7 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty sizeof(server_fd)); } if (result != 0) { - if (specified_port == 0) { - lf_print_error_and_exit("Failed to bind the RTI socket. Cannot find a usable port. " - "Consider increasing PORT_RANGE_LIMIT in net_common.h."); - } else { - lf_print_error_and_exit("Failed to bind the RTI socket. Specified port is not available. " - "Consider leaving the port unspecified"); - } + lf_print_error_and_exit("Failed to bind the RTI socket. Port %d is not available. ", port); } char* type = "TCP"; if (socket_type == UDP) { @@ -1016,7 +1007,7 @@ void handle_federate_resign(federate_info_t *my_fed) { // an orderly shutdown. // close(my_fed->socket); // from unistd.h - lf_print("Federate %d has resigned.", my_fed->enclave.id); + lf_print("RTI: Federate %d has resigned.", my_fed->enclave.id); // Check downstream federates to see whether they should now be granted a TAG. // To handle cycles, need to create a boolean array to keep @@ -1558,8 +1549,8 @@ void initialize_federate(federate_info_t* fed, uint16_t id) { int32_t start_rti_server(uint16_t port) { int32_t specified_port = port; if (port == 0) { - // Use the default starting port. - port = STARTING_PORT; + // Use the default port. + port = DEFAULT_PORT; } _lf_initialize_clock(); // Create the TCP socket server diff --git a/core/federated/RTI/rti_remote.h b/core/federated/RTI/rti_remote.h index b3249ec30..21264e76a 100644 --- a/core/federated/RTI/rti_remote.h +++ b/core/federated/RTI/rti_remote.h @@ -187,9 +187,7 @@ extern int lf_critical_section_exit(environment_t* env); /** * Create a server and enable listening for socket connections. * - * @note This function is similar to create_server(...) in - * federate.c. However, it contains logs that are specific - * to the RTI. + * @note This function is different from create_server(...) in federate.c. * * @param port The port number to use. * @param socket_type The type of the socket for the server (TCP or UDP). diff --git a/core/federated/federate.c b/core/federated/federate.c index c57553464..e791728ab 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -115,48 +115,10 @@ federation_metadata_t federation_metadata = { .rti_user = NULL }; - -/** - * Create a server to listen to incoming physical - * connections from remote federates. This function - * only handles the creation of the server socket. - * The reserved port for the server socket is then - * sent to the RTI by sending an MSG_TYPE_ADDRESS_ADVERTISEMENT message - * (@see net_common.h). This function expects no response - * from the RTI. - * - * If a port is specified by the user, that will be used - * as the only possibility for the server. This function - * will fail if that port is not available. If a port is not - * specified, the STARTING_PORT (@see net_common.h) will be used. - * The function will keep incrementing the port in this case - * until the number of tries reaches PORT_RANGE_LIMIT. - * - * @note This function is similar to create_server(...) in rti.c. - * However, it contains specific log messages for the peer to - * peer connections between federates. It also additionally - * sends an address advertisement (MSG_TYPE_ADDRESS_ADVERTISEMENT) message to the - * RTI informing it of the port. - * - * @param specified_port The specified port by the user. - */ void create_server(int specified_port) { - if (specified_port > UINT16_MAX || - specified_port < 0) { - lf_print_error( - "create_server(): The specified port (%d) is out of range." - " Starting with %d instead.", - specified_port, - STARTING_PORT - ); - specified_port = 0; - } + assert(specified_port <= UINT16_MAX && specified_port >= 0); uint16_t port = (uint16_t)specified_port; - if (specified_port == 0) { - // Use the default starting port. - port = STARTING_PORT; - } - LF_PRINT_DEBUG("Creating a socket server on port %d.", port); + LF_PRINT_LOG("Creating a socket server on port %d.", port); // Create an IPv4 socket for TCP (not UDP) communication over IP (0). int socket_descriptor = create_real_time_tcp_socket_errexit(); @@ -174,38 +136,37 @@ void create_server(int specified_port) { socket_descriptor, (struct sockaddr *) &server_fd, sizeof(server_fd)); - // If the binding fails with this port and no particular port was specified - // in the LF program, then try the next few ports in sequence. - while (result != 0 - && specified_port == 0 - && port >= STARTING_PORT - && port <= STARTING_PORT + PORT_RANGE_LIMIT) { - LF_PRINT_DEBUG("Failed to get port %d. Trying %d.", port, port + 1); - port++; - server_fd.sin_port = htons(port); + int count = 0; + while (result < 0 && count++ < PORT_BIND_RETRY_LIMIT) { + lf_sleep(PORT_BIND_RETRY_INTERVAL); result = bind( socket_descriptor, (struct sockaddr *) &server_fd, sizeof(server_fd)); } - if (result != 0) { - if (specified_port == 0) { - lf_print_error_and_exit("Failed to bind socket. Cannot find a usable port. \ - Consider increasing PORT_RANGE_LIMIT in federate.c"); - } else { - lf_print_error_and_exit("Failed to bind socket. Specified port is not available. \ - Consider leaving the port unspecified"); + if (result < 0) { + lf_print_error_and_exit("Failed to bind socket on port %d.", port); + } + + // Set the global server port. + if (specified_port == 0) { + // Need to retrieve the port number assigned by the OS. + struct sockaddr_in assigned; + socklen_t addr_len = sizeof(assigned); + if (getsockname(socket_descriptor, (struct sockaddr *) &assigned, &addr_len) < 0) { + lf_print_error_and_exit("Failed to retrieve assigned port number."); } + _fed.server_port = ntohs(assigned.sin_port); + } else { + _fed.server_port = port; } - LF_PRINT_LOG("Server for communicating with other federates started using port %d.", port); // Enable listening for socket connections. // The second argument is the maximum number of queued socket requests, // which according to the Mac man page is limited to 128. listen(socket_descriptor, 128); - // Set the global server port - _fed.server_port = port; + LF_PRINT_LOG("Server for communicating with other federates started using port %d.", _fed.server_port); // Send the server port number to the RTI // on an MSG_TYPE_ADDRESS_ADVERTISEMENT message (@see net_common.h). @@ -767,7 +728,7 @@ void connect_to_federate(uint16_t remote_federate_id) { // remote federate has not yet sent an MSG_TYPE_ADDRESS_ADVERTISEMENT message to the RTI. // Sleep for some time before retrying. if (port == -1) { - if (count_tries++ >= CONNECT_NUM_RETRIES) { + if (count_tries++ >= CONNECT_MAX_RETRIES) { lf_print_error_and_exit("TIMEOUT obtaining IP/port for federate %d from the RTI.", remote_federate_id); } @@ -794,7 +755,7 @@ void connect_to_federate(uint16_t remote_federate_id) { #endif // Iterate until we either successfully connect or exceed the number of - // attempts given by CONNECT_NUM_RETRIES. + // attempts given by CONNECT_MAX_RETRIES. int socket_id = -1; while (result < 0) { // Create an IPv4 socket for TCP (not UDP) communication over IP (0). @@ -824,11 +785,11 @@ void connect_to_federate(uint16_t remote_federate_id) { // accepting socket connections. But possibly it will be busy (in process of accepting // another socket connection?). Hence, we retry. count_retries++; - if (count_retries > CONNECT_NUM_RETRIES) { - // If the remote federate is not accepting the connection after CONNECT_NUM_RETRIES + if (count_retries > CONNECT_MAX_RETRIES) { + // If the remote federate is not accepting the connection after CONNECT_MAX_RETRIES // treat it as a soft error condition and return. lf_print_error("Failed to connect to federate %d after %d retries. Giving up.", - remote_federate_id, CONNECT_NUM_RETRIES); + remote_federate_id, CONNECT_MAX_RETRIES); return; } lf_print_warning("Could not connect to federate %d. Will try again every %lld nanoseconds.\n", @@ -969,18 +930,10 @@ void perform_hmac_authentication(int rti_socket) { } #endif -/** - * Connect to the RTI at the specified host and port and return - * the socket descriptor for the connection. If this fails, the - * program exits. If it succeeds, it sets the _fed.socket_TCP_RTI global - * variable to refer to the socket for communicating with the RTI. - * @param hostname A hostname, such as "localhost". - * @param port_number A port number. - */ void connect_to_rti(const char* hostname, int port) { LF_PRINT_LOG("Connecting to the RTI."); - // override passed hostname and port if passed as runtime arguments + // Override passed hostname and port if passed as runtime arguments. hostname = federation_metadata.rti_host ? federation_metadata.rti_host : hostname; port = federation_metadata.rti_port >= 0 ? federation_metadata.rti_port : port; @@ -989,25 +942,16 @@ void connect_to_rti(const char* hostname, int port) { port > INT16_MAX) { lf_print_error( "connect_to_rti(): Specified port (%d) is out of range," - " using zero instead.", - port + " using the default port %d instead.", + port, DEFAULT_PORT ); + uport = DEFAULT_PORT; } else { uport = (uint16_t)port; } - - // Repeatedly try to connect, one attempt every 2 seconds, until - // either the program is killed, the sleep is interrupted, - // or the connection succeeds. - // If the specified port is 0, set it instead to the start of the - // port range. - bool specific_port_given = true; if (uport == 0) { - uport = STARTING_PORT; - specific_port_given = false; + uport = DEFAULT_PORT; } - int result = -1; - int count_retries = 0; struct addrinfo hints; struct addrinfo *res; @@ -1020,144 +964,114 @@ void connect_to_rti(const char* hostname, int port) { hints.ai_next = NULL; hints.ai_flags = AI_NUMERICSERV; /* Allow only numeric port numbers */ - while (result < 0) { - // Convert port number to string - char str[6]; - sprintf(str,"%u",uport); - - // Get address structure matching hostname and hints criteria, and - // set port to the port number provided in str. There should only - // ever be one matching address structure, and we connect to that. - int server = getaddrinfo(hostname, (const char*)&str, &hints, &res); - if (server != 0) { - lf_print_error_and_exit("No host for RTI matching given hostname: %s", hostname); - } + // Convert port number to string + char str[6]; + sprintf(str,"%u",uport); + + // Get address structure matching hostname and hints criteria, and + // set port to the port number provided in str. There should only + // ever be one matching address structure, and we connect to that. + int server = getaddrinfo(hostname, (const char*)&str, &hints, &res); + if (server != 0) { + lf_print_error_and_exit("No host for RTI matching given hostname: %s", hostname); + } - // Create a socket - _fed.socket_TCP_RTI = create_real_time_tcp_socket_errexit(); + // Create a socket + _fed.socket_TCP_RTI = create_real_time_tcp_socket_errexit(); + int result = connect(_fed.socket_TCP_RTI, res->ai_addr, res->ai_addrlen);; + int count_retries = 1; + + while (result < 0 && count_retries++ < CONNECT_MAX_RETRIES) { + lf_print("Failed to connect to RTI on port %d. Will try again.", uport); + lf_sleep(CONNECT_RETRY_INTERVAL); result = connect(_fed.socket_TCP_RTI, res->ai_addr, res->ai_addrlen); - if (result == 0) { - lf_print("Successfully connected to RTI."); - } + } + freeaddrinfo(res); /* No longer needed */ - freeaddrinfo(res); /* No longer needed */ + if (result != 0) { + lf_print_error_and_exit("Failed to connect to RTI on port %d after %d tries.", uport, CONNECT_MAX_RETRIES); + } + lf_print("Successfully connected to an RTI."); - // If this failed, try more ports, unless a specific port was given. - if (result != 0 - && !specific_port_given - && uport >= STARTING_PORT - && uport <= STARTING_PORT + PORT_RANGE_LIMIT - ) { - lf_print("Failed to connect to RTI on port %d. Trying %d.", uport, uport + 1); - uport++; - // Wait PORT_KNOCKING_RETRY_INTERVAL seconds. - if (lf_sleep(PORT_KNOCKING_RETRY_INTERVAL) != 0) { - // Sleep was interrupted. - continue; - } - } - // If this still failed, try again with the original port after some time. - if (result < 0) { - if (!specific_port_given && uport == STARTING_PORT + PORT_RANGE_LIMIT + 1) { - uport = STARTING_PORT; - } - count_retries++; - if (count_retries > CONNECT_NUM_RETRIES) { - lf_print_error_and_exit("Failed to connect to the RTI after %d retries. Giving up.", - CONNECT_NUM_RETRIES); - } - lf_print("Could not connect to RTI at %s. Will try again every %lld seconds.", - hostname, CONNECT_RETRY_INTERVAL / BILLION); - // Wait CONNECT_RETRY_INTERVAL nanoseconds. - if (lf_sleep(CONNECT_RETRY_INTERVAL) != 0) { - // Sleep was interrupted. - continue; - } - } else { - // Have connected to an RTI, but not sure it's the right RTI. - // Send a MSG_TYPE_FED_IDS message and wait for a reply. - // Notify the RTI of the ID of this federate and its federation. - unsigned char buffer[4]; + // Have connected to an RTI, but not sure it's the right RTI. + // Send a MSG_TYPE_FED_IDS message and wait for a reply. + // Notify the RTI of the ID of this federate and its federation. + unsigned char buffer[4]; #ifdef FEDERATED_AUTHENTICATED - LF_PRINT_LOG("Connected to an RTI. Performing HMAC-based authentication using federation ID."); - perform_hmac_authentication(_fed.socket_TCP_RTI); + LF_PRINT_LOG("Connected to an RTI. Performing HMAC-based authentication using federation ID."); + perform_hmac_authentication(_fed.socket_TCP_RTI); #else - LF_PRINT_LOG("Connected to an RTI. Sending federation ID for authentication."); + LF_PRINT_LOG("Connected to an RTI. Sending federation ID for authentication."); #endif - // Send the message type first. - buffer[0] = MSG_TYPE_FED_IDS; - // Next send the federate ID. - if (_lf_my_fed_id > UINT16_MAX) { - lf_print_error_and_exit("Too many federates! More than %d.", UINT16_MAX); - } - encode_uint16((uint16_t)_lf_my_fed_id, &buffer[1]); - // Next send the federation ID length. - // The federation ID is limited to 255 bytes. - size_t federation_id_length = strnlen(federation_metadata.federation_id, 255); - buffer[1 + sizeof(uint16_t)] = (unsigned char)(federation_id_length & 0xff); + // Send the message type first. + buffer[0] = MSG_TYPE_FED_IDS; + // Next send the federate ID. + if (_lf_my_fed_id > UINT16_MAX) { + lf_print_error_and_exit("Too many federates! More than %d.", UINT16_MAX); + } + encode_uint16((uint16_t)_lf_my_fed_id, &buffer[1]); + // Next send the federation ID length. + // The federation ID is limited to 255 bytes. + size_t federation_id_length = strnlen(federation_metadata.federation_id, 255); + buffer[1 + sizeof(uint16_t)] = (unsigned char)(federation_id_length & 0xff); - // Trace the event when tracing is enabled - tracepoint_federate_to_rti(_fed.trace, send_FED_ID, _lf_my_fed_id, NULL); + // Trace the event when tracing is enabled + tracepoint_federate_to_rti(_fed.trace, send_FED_ID, _lf_my_fed_id, NULL); - write_to_socket_errexit(_fed.socket_TCP_RTI, 2 + sizeof(uint16_t), buffer, - "Failed to send federate ID to RTI."); + write_to_socket_errexit(_fed.socket_TCP_RTI, 2 + sizeof(uint16_t), buffer, + "Failed to send federate ID to RTI."); - // Next send the federation ID itself. - write_to_socket_errexit(_fed.socket_TCP_RTI, federation_id_length, (unsigned char*)federation_metadata.federation_id, - "Failed to send federation ID to RTI."); + // Next send the federation ID itself. + write_to_socket_errexit(_fed.socket_TCP_RTI, federation_id_length, (unsigned char*)federation_metadata.federation_id, + "Failed to send federation ID to RTI."); - // Wait for a response. - // The response will be MSG_TYPE_REJECT if the federation ID doesn't match. - // Otherwise, it will be either MSG_TYPE_ACK or MSG_TYPE_UDP_PORT, where the latter - // is used if clock synchronization will be performed. - unsigned char response; + // Wait for a response. + // The response will be MSG_TYPE_REJECT if the federation ID doesn't match. + // Otherwise, it will be either MSG_TYPE_ACK or MSG_TYPE_UDP_PORT, where the latter + // is used if clock synchronization will be performed. + unsigned char response; - LF_PRINT_DEBUG("Waiting for response to federation ID from the RTI."); + LF_PRINT_DEBUG("Waiting for response to federation ID from the RTI."); - read_from_socket_errexit(_fed.socket_TCP_RTI, 1, &response, "Failed to read response from RTI."); - if (response == MSG_TYPE_REJECT) { - // Trace the event when tracing is enabled - tracepoint_federate_from_rti(_fed.trace, receive_REJECT, _lf_my_fed_id, NULL); - // Read one more byte to determine the cause of rejection. - unsigned char cause; - read_from_socket_errexit(_fed.socket_TCP_RTI, 1, &cause, "Failed to read the cause of rejection by the RTI."); - if (cause == FEDERATION_ID_DOES_NOT_MATCH || cause == WRONG_SERVER) { - lf_print("Connected to the wrong RTI on port %d. Trying %d.", uport, uport + 1); - uport++; - result = -1; - continue; - } - lf_print_error_and_exit("RTI Rejected MSG_TYPE_FED_IDS message with response (see net_common.h): " - "%d. Error code: %d. Federate quits.\n", response, cause); - } else if (response == MSG_TYPE_ACK) { - // Trace the event when tracing is enabled - tracepoint_federate_from_rti(_fed.trace, receive_ACK, _lf_my_fed_id, NULL); - LF_PRINT_LOG("Received acknowledgment from the RTI."); - - // Call a generated (external) function that sends information - // about connections between this federate and other federates - // where messages are routed through the RTI. - // @see MSG_TYPE_NEIGHBOR_STRUCTURE in net_common.h - send_neighbor_structure_to_RTI(_fed.socket_TCP_RTI); - - uint16_t udp_port = setup_clock_synchronization_with_rti(); - - // Write the returned port number to the RTI - unsigned char UDP_port_number[1 + sizeof(uint16_t)]; - UDP_port_number[0] = MSG_TYPE_UDP_PORT; - encode_uint16(udp_port, &(UDP_port_number[1])); - write_to_socket_errexit(_fed.socket_TCP_RTI, 1 + sizeof(uint16_t), UDP_port_number, - "Failed to send the UDP port number to the RTI."); - } else { - lf_print_error_and_exit("Received unexpected response %u from the RTI (see net_common.h).", - response); - } - lf_print("Connected to RTI at %s:%d.", hostname, uport); + read_from_socket_errexit(_fed.socket_TCP_RTI, 1, &response, "Failed to read response from RTI."); + if (response == MSG_TYPE_REJECT) { + // Trace the event when tracing is enabled + tracepoint_federate_from_rti(_fed.trace, receive_REJECT, _lf_my_fed_id, NULL); + // Read one more byte to determine the cause of rejection. + unsigned char cause; + read_from_socket_errexit(_fed.socket_TCP_RTI, 1, &cause, "Failed to read the cause of rejection by the RTI."); + if (cause == FEDERATION_ID_DOES_NOT_MATCH || cause == WRONG_SERVER) { + lf_print_error_and_exit("Connected to the wrong RTI on port %d.", uport); } + lf_print_error_and_exit("RTI Rejected MSG_TYPE_FED_IDS message with response (see net_common.h): " + "%d. Error code: %d. Federate quits.\n", response, cause); + } else if (response == MSG_TYPE_ACK) { + // Trace the event when tracing is enabled + tracepoint_federate_from_rti(_fed.trace, receive_ACK, _lf_my_fed_id, NULL); + LF_PRINT_LOG("Received acknowledgment from the RTI."); + + // Call a generated (external) function that sends information + // about connections between this federate and other federates + // where messages are routed through the RTI. + // @see MSG_TYPE_NEIGHBOR_STRUCTURE in net_common.h + send_neighbor_structure_to_RTI(_fed.socket_TCP_RTI); + + uint16_t udp_port = setup_clock_synchronization_with_rti(); + + // Write the returned port number to the RTI + unsigned char UDP_port_number[1 + sizeof(uint16_t)]; + UDP_port_number[0] = MSG_TYPE_UDP_PORT; + encode_uint16(udp_port, &(UDP_port_number[1])); + write_to_socket_errexit(_fed.socket_TCP_RTI, 1 + sizeof(uint16_t), UDP_port_number, + "Failed to send the UDP port number to the RTI."); + } else { + lf_print_error_and_exit("Received unexpected response %u from the RTI (see net_common.h).", + response); } + lf_print("Connected to RTI at %s:%d.", hostname, uport); } /** @@ -1520,7 +1434,7 @@ static trigger_handle_t schedule_message_received_from_network_locked( * * @return 1 if the MSG_TYPE_CLOSE_REQUEST message is sent successfully, 0 otherwise. */ -int _lf_request_close_inbound_socket(int fed_id) { +static int _lf_request_close_inbound_socket(int fed_id) { assert(fed_id >= 0 && fed_id < NUMBER_OF_FEDERATES); if (_fed.sockets_for_inbound_p2p_connections[fed_id] < 1) return 0; @@ -2337,6 +2251,23 @@ void handle_stop_request_message() { lf_mutex_unlock(&outbound_socket_mutex); } +/** + * Send a resign signal to the RTI. + */ +static void send_resign_signal(environment_t* env) { + size_t bytes_to_write = 1 + sizeof(tag_t); + unsigned char buffer[bytes_to_write]; + buffer[0] = MSG_TYPE_RESIGN; + tag_t tag = env->current_tag; + encode_tag(&(buffer[1]), tag); + ssize_t written = write_to_socket(_fed.socket_TCP_RTI, bytes_to_write, &(buffer[0])); + if (written == bytes_to_write) { + LF_PRINT_LOG("Resigned."); + } + // Trace the event when tracing is enabled + tracepoint_federate_to_rti(_fed.trace, send_RESIGN, _lf_my_fed_id, &tag); +} + /** * Close sockets used to communicate with other federates, if they are open, * and send a MSG_TYPE_RESIGN message to the RTI. This implements the function @@ -2347,52 +2278,49 @@ void handle_stop_request_message() { void terminate_execution(environment_t* env) { assert(env != GLOBAL_ENVIRONMENT); + // For an abnormal termination (e.g. a SIGINT), we need to send a + // MSG_TYPE_RESIGN message to the RTI, but we should not acquire a mutex. + if (_fed.socket_TCP_RTI >= 0) { + if (_lf_normal_termination) { + lf_mutex_lock(&outbound_socket_mutex); + send_resign_signal(env); + lf_mutex_unlock(&outbound_socket_mutex); + } else { + // Do not acquire mutex and do not trace. + send_resign_signal(env); + } + } + + LF_PRINT_DEBUG("Requesting closing of incoming P2P sockets."); + // Request closing the incoming P2P sockets. + for (int i=0; i < NUMBER_OF_FEDERATES; i++) { + _lf_request_close_inbound_socket(i); + // Ignore errors. Mark the socket closed. + _fed.sockets_for_inbound_p2p_connections[i] = -1; + } + + // For abnormal termination, skip the rest, letting the threads be terminated + // and sockets be closed by the OS. + if (!_lf_normal_termination) return; + // Check for all outgoing physical connections in // _fed.sockets_for_outbound_p2p_connections and // if the socket ID is not -1, the connection is still open. // Send an EOF by closing the socket here. - // NOTE: It is dangerous to acquire a mutex in a termination - // process because it can block program exit if a deadlock occurs. - // Hence, it is paramount that these mutexes not allow for any - // possibility of deadlock. To ensure this, this - // function should NEVER be called while holding any mutex lock. - lf_mutex_lock(&outbound_socket_mutex); for (int i=0; i < NUMBER_OF_FEDERATES; i++) { + // Close outbound connections, in case they have not closed themselves. // This will result in EOF being sent to the remote federate, I think. _lf_close_outbound_socket(i); } - // Resign the federation, which will close the socket to the RTI. - if (_fed.socket_TCP_RTI >= 0) { - size_t bytes_to_write = 1 + sizeof(tag_t); - unsigned char buffer[bytes_to_write]; - buffer[0] = MSG_TYPE_RESIGN; - tag_t tag = env->current_tag; - encode_tag(&(buffer[1]), tag); - // Trace the event when tracing is enabled - tracepoint_federate_to_rti(_fed.trace, send_RESIGN, _lf_my_fed_id, &tag); - ssize_t written = write_to_socket(_fed.socket_TCP_RTI, bytes_to_write, &(buffer[0])); - if (written == bytes_to_write) { - LF_PRINT_LOG("Resigned."); - } - } - lf_mutex_unlock(&outbound_socket_mutex); - - LF_PRINT_DEBUG("Requesting closing of incoming P2P sockets."); - // Request closing the incoming P2P sockets. - for (int i=0; i < NUMBER_OF_FEDERATES; i++) { - if (_lf_request_close_inbound_socket(i) == 0) { - // Sending the close request failed. Mark the socket closed. - _fed.sockets_for_inbound_p2p_connections[i] = -1; - } - } LF_PRINT_DEBUG("Waiting for inbound p2p socket listener threads."); // Wait for each inbound socket listener thread to close. - if (_fed.number_of_inbound_p2p_connections > 0) { + if (_fed.number_of_inbound_p2p_connections > 0 && _fed.inbound_socket_listeners != NULL) { LF_PRINT_LOG("Waiting for %zu threads listening for incoming messages to exit.", _fed.number_of_inbound_p2p_connections); for (int i=0; i < _fed.number_of_inbound_p2p_connections; i++) { + if (_fed.inbound_socket_listeners[i] == NULL) continue; // Ignoring errors here. lf_thread_join(_fed.inbound_socket_listeners[i], NULL); } diff --git a/core/reactor.c b/core/reactor.c index ce95b057d..7f5c3e344 100644 --- a/core/reactor.c +++ b/core/reactor.c @@ -397,6 +397,7 @@ int lf_reactor_c_main(int argc, const char* argv[]) { if (_lf_do_step(env)) { while (next(env) != 0); } + _lf_normal_termination = true; return 0; } else { return -1; diff --git a/core/reactor_common.c b/core/reactor_common.c index 6984d920b..ff64f8d77 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -265,6 +265,11 @@ void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_n * counts between time steps and at the end of execution. */ void _lf_start_time_step(environment_t *env) { + if (_lf_execution_started == false) { + // Execution hasn't started, so this is probably being invoked in termination + // due to an error. + return; + } assert(env != GLOBAL_ENVIRONMENT); LF_PRINT_LOG("--------- Start time step at tag " PRINTF_TAG ".", env->current_tag.time - start_time, env->current_tag.microstep); // Handle dynamically created tokens for mutable inputs. @@ -1714,80 +1719,95 @@ void initialize_global(void) { _lf_initialize_trigger_objects() ; } +/** Flag to prevent termination function from executing twice. */ +bool _lf_termination_executed = false; + +/** Flag used to disable cleanup operations on normal termination. */ +bool _lf_normal_termination = false; + /** * Report elapsed logical and physical times and report if any * memory allocated by set_new, set_new_array, or lf_writable_copy * has not been freed. */ void termination(void) { + if (_lf_termination_executed) return; + _lf_termination_executed = true; + environment_t *env; int num_envs = _lf_get_environments(&env); // Invoke the code generated termination function. It terminates the federated related services. - // It should only be called for the top-level environment, which, after convention, is the first environment. + // It should only be called for the top-level environment, which, by convention, is the first environment. terminate_execution(env); - // In order to free tokens, we perform the same actions we would have for a new time step. for (int i = 0; iid); - if (!env->initialized) { + if (env == NULL || !env->initialized) { lf_print_warning("---- Environment %u was never initialized", env->id); continue; } + lf_print("---- Terminating environment %u", env->id); // Stop any tracing, if it is running. + // No need to acquire a mutex because if this is normal termination, all + // other threads have stopped, and if it's not, then acquiring a mutex could + // lead to a deadlock. stop_trace_locked(env->trace); - _lf_start_time_step(env); + // Skip most cleanup on abnormal termination. + if (_lf_normal_termination) { + _lf_start_time_step(env); #ifdef MODAL_REACTORS - // Free events and tokens suspended by modal reactors. - _lf_terminate_modal_reactors(env); + // Free events and tokens suspended by modal reactors. + _lf_terminate_modal_reactors(env); #endif - - // If the event queue still has events on it, report that. - if (env->event_q != NULL && pqueue_size(env->event_q) > 0) { - lf_print_warning("---- There are %zu unprocessed future events on the event queue.", pqueue_size(env->event_q)); - event_t* event = (event_t*)pqueue_peek(env->event_q); - interval_t event_time = event->time - start_time; - lf_print_warning("---- The first future event has timestamp " PRINTF_TIME " after start time.", event_time); - } - // Print elapsed times. - // If these are negative, then the program failed to start up. - interval_t elapsed_time = lf_time_logical_elapsed(env); - if (elapsed_time >= 0LL) { - char time_buffer[29]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 - lf_comma_separated_time(time_buffer, elapsed_time); - printf("---- Elapsed logical time (in nsec): %s\n", time_buffer); - - // If start_time is 0, then execution didn't get far enough along - // to initialize this. - if (start_time > 0LL) { - lf_comma_separated_time(time_buffer, lf_time_physical_elapsed()); - printf("---- Elapsed physical time (in nsec): %s\n", time_buffer); + // If the event queue still has events on it, report that. + if (env->event_q != NULL && pqueue_size(env->event_q) > 0) { + lf_print_warning("---- There are %zu unprocessed future events on the event queue.", pqueue_size(env->event_q)); + event_t* event = (event_t*)pqueue_peek(env->event_q); + interval_t event_time = event->time - start_time; + lf_print_warning("---- The first future event has timestamp " PRINTF_TIME " after start time.", event_time); + } + // Print elapsed times. + // If these are negative, then the program failed to start up. + interval_t elapsed_time = lf_time_logical_elapsed(env); + if (elapsed_time >= 0LL) { + char time_buffer[29]; // 28 bytes is enough for the largest 64 bit number: 9,223,372,036,854,775,807 + lf_comma_separated_time(time_buffer, elapsed_time); + printf("---- Elapsed logical time (in nsec): %s\n", time_buffer); + + // If start_time is 0, then execution didn't get far enough along + // to initialize this. + if (start_time > 0LL) { + lf_comma_separated_time(time_buffer, lf_time_physical_elapsed()); + printf("---- Elapsed physical time (in nsec): %s\n", time_buffer); + } } - } - // Free up memory associated with environment - environment_free(env); - + // Free up memory associated with environment + environment_free(env); + } env++; } - _lf_free_all_tokens(); // Must be done before freeing reactors. - // Issue a warning if a memory leak has been detected. - if (_lf_count_payload_allocations > 0) { - lf_print_warning("Memory allocated for messages has not been freed."); - lf_print_warning("Number of unfreed messages: %d.", _lf_count_payload_allocations); - } - if (_lf_count_token_allocations > 0) { - lf_print_warning("Memory allocated for tokens has not been freed!"); - lf_print_warning("Number of unfreed tokens: %d.", _lf_count_token_allocations); - } + // Skip most cleanup on abnormal termination. + if (_lf_normal_termination) { + _lf_free_all_tokens(); // Must be done before freeing reactors. + // Issue a warning if a memory leak has been detected. + if (_lf_count_payload_allocations > 0) { + lf_print_warning("Memory allocated for messages has not been freed."); + lf_print_warning("Number of unfreed messages: %d.", _lf_count_payload_allocations); + } + if (_lf_count_token_allocations > 0) { + lf_print_warning("Memory allocated for tokens has not been freed!"); + lf_print_warning("Number of unfreed tokens: %d.", _lf_count_token_allocations); + } #if !defined(LF_SINGLE_THREADED) - for (int i = 0; i < _lf_watchdog_count; i++) { - if (_lf_watchdogs[i].base->reactor_mutex != NULL) { - free(_lf_watchdogs[i].base->reactor_mutex); + for (int i = 0; i < _lf_watchdog_count; i++) { + if (_lf_watchdogs[i].base->reactor_mutex != NULL) { + free(_lf_watchdogs[i].base->reactor_mutex); + } } - } #endif - _lf_free_all_reactors(); + _lf_free_all_reactors(); + } } diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index a462dd77c..08921399f 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -1239,6 +1239,10 @@ int lf_reactor_c_main(int argc, const char* argv[]) { LF_PRINT_LOG("---- All worker threads exited successfully."); } } + _lf_normal_termination = true; + // Invoke termination function here before freeing the local RTI. + termination(); + #if defined LF_ENCLAVES free_local_rti(); #endif diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 880408ec6..7250ed29f 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -273,11 +273,12 @@ void _lf_logical_tag_complete(tag_t); /** * Connect to the RTI at the specified host and port and return - * the socket descriptor for the connection. If this fails, the + * the socket descriptor for the connection. If this fails, wait CONNECT_RETRY_INTERVAL + * and try again. If it fails after CONNECT_MAX_RETRIES, the * program exits. If it succeeds, it sets the _fed.socket_TCP_RTI global * variable to refer to the socket for communicating with the RTI. * @param hostname A hostname, such as "localhost". - * @param port_number A port number. + * @param port_number A port number, or 0 to use the default port. */ void connect_to_rti(const char*, int); @@ -296,28 +297,22 @@ void connect_to_rti(const char*, int); void* listen_to_federates(void*); /** - * Create a server to listen to incoming physical - * connections from remote federates. This function + * Create a server to listen to incoming p2p connection (physical + * connections or decentralized connections) from remote federates. This function * only handles the creation of the server socket. - * The reserved port for the server socket is then + * The bound port for the server socket is then * sent to the RTI by sending an MSG_TYPE_ADDRESS_ADVERTISEMENT message * (@see net_common.h). This function expects no response * from the RTI. * - * If a port is specified by the user, that will be used - * as the only possibility for the server. This function - * will fail if that port is not available. If a port is not - * specified, the STARTING_PORT (@see net_common.h) will be used. - * The function will keep incrementing the port in this case - * until the number of tries reaches PORT_RANGE_LIMIT. + * If a port is specified by the user, that will be used. + * Otherwise, a random port will be assigned. If the bind fails, + * it will retry after PORT_BIND_RETRY_INTERVAL until it has tried + * PORT_BIND_RETRY_LIMIT times. Then it will fail. * - * @note This function is similar to create_server(...) in rti.c. - * However, it contains specific log messages for the peer to - * peer connections between federates. It also additionally - * sends an address advertisement (MSG_TYPE_ADDRESS_ADVERTISEMENT) message to the - * RTI informing it of the port. + * @note This function is different from create_server(...) in rti.c. * - * @param specified_port The specified port by the user. + * @param specified_port The specified port by the user or 0 to use a random port. */ void create_server(int specified_port); diff --git a/include/core/federated/network/net_common.h b/include/core/federated/network/net_common.h index 38001cc0b..47bb644e8 100644 --- a/include/core/federated/network/net_common.h +++ b/include/core/federated/network/net_common.h @@ -37,12 +37,9 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * * Each federate attempts to connect with an RTI at the IP address * put into its code by the code generator (i.e., it attempts to - * open a TCP connection). It starts by trying the - * port number given by STARTING_PORT and increments the port number - * from there until it successfully connects. The maximum port number - * it will try before giving up is STARTING_PORT + PORT_RANGE_LIMIT. - * - * FIXME: What if a port is specified in the "at" of the federated statement? + * open a TCP connection). If an explicit port is given in the `at` clause + * on the `federated reactor` statement, it will use that port. Otherwise, it will + * use DEFAULT_PORT. * * When it has successfully opened a TCP connection, the first message it sends * to the RTI is a MSG_TYPE_FED_IDS message, which contains the ID of this federate @@ -137,9 +134,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * parameter of the target is "decentralized" and the federate has * inbound connections from other federates, then it starts a socket * server to listen for incoming connections from those federates. - * It attempts to create the server at the port given by STARTING_PORT, - * and if this fails, increments the port number from there until a - * port is available. It then sends to the RTI an MSG_TYPE_ADDRESS_ADVERTISEMENT message + * It then sends to the RTI an MSG_TYPE_ADDRESS_ADVERTISEMENT message * with the port number as a payload. The federate then creates a thread * to listen for incoming socket connections and messages. * @@ -208,7 +203,6 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ #define UDP_TIMEOUT_TIME SEC(1) - /** * Size of the buffer used for messages sent between federates. * This is used by both the federates and the rti, so message lengths @@ -217,63 +211,52 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #define FED_COM_BUFFER_SIZE 256u /** - * Number of nanoseconds that elapse between a federate's attempts - * to connect to the RTI. + * Time between a federate's attempts to connect to the RTI. */ -#define CONNECT_RETRY_INTERVAL 2000000000LL +#define CONNECT_RETRY_INTERVAL SEC(1) /** * Bound on the number of retries to connect to the RTI. * A federate will retry every CONNECT_RETRY_INTERVAL seconds - * this many times before giving up. E.g., 500 retries every - * 2 seconds results in retrying for about 16 minutes. + * this many times before giving up. E.g., 600 retries every + * 1 seconds results in retrying for about 10 minutes. + * This allows time to start federates before the RTI. */ -#define CONNECT_NUM_RETRIES 500 +#define CONNECT_MAX_RETRIES 600 /** - * Number of nanoseconds that a federate waits before asking + * Time that a federate waits before asking * the RTI again for the port and IP address of a federate * (an MSG_TYPE_ADDRESS_QUERY message) after the RTI responds that it - * does not know. + * does not know. This allows time for federates to start separately. */ -#define ADDRESS_QUERY_RETRY_INTERVAL 100000000LL +#define ADDRESS_QUERY_RETRY_INTERVAL SEC(1) /** - * Number of nanoseconds that a federate waits before trying - * another port for the RTI. This is to avoid overwhelming - * the OS and the socket with too many calls. - * FIXME: Is this too small? + * Time to wait before re-attempting to bind to a port. */ -#define PORT_KNOCKING_RETRY_INTERVAL 10000LL +#define PORT_BIND_RETRY_INTERVAL MSEC(10) /** - * Default starting port number for the RTI and federates' socket server. - * Unless a specific port has been specified by the LF program in the "at" - * for the RTI, when the federates start up, they will attempt - * to open a socket server - * on this port, and, if this fails, increment the port number and - * try again. The number of increments is limited by PORT_RANGE_LIMIT. - * FIXME: Clarify what happens if a specific port has been given in "at". + * Number of attempts to bind to a port before giving up. */ -#define STARTING_PORT 15045u +#define PORT_BIND_RETRY_LIMIT 100 /** - * Number of ports to try to connect to. Unless the LF program specifies - * a specific port number to use, the RTI or federates will attempt to start - * a socket server on port STARTING_PORT. If that port is not available (e.g., - * another RTI is running or has recently exited), then it will try the - * next port, STARTING_PORT+1, and keep incrementing the port number up to this - * limit. If no port between STARTING_PORT and STARTING_PORT + PORT_RANGE_LIMIT - * is available, then the RTI or the federate will fail to start. This number, therefore, - * limits the number of RTIs and federates that can be simultaneously - * running on any given machine without assigning specific port numbers. + * Default port number for the RTI. + * Unless a specific port has been specified by the LF program in the "at" + * for the RTI or on the command line, when the RTI starts up, it will attempt + * to open a socket server on this port. */ -#define PORT_RANGE_LIMIT 1024 +#define DEFAULT_PORT 15045u /** * Delay the start of all federates by this amount. - * FIXME: More. - * FIXME: Should use the latency estimates that were + * This helps ensure that the federates do not start at the same time. + * Each federate has provided its current physical time to the RTI, and + * the RTI has picked the largest of these. It will add this quantity + * and declare that to be the start time. + * FIXME: This could use the latency estimates that were * acquired during initial clock synchronization. */ #define DELAY_START SEC(1) diff --git a/include/core/reactor_common.h b/include/core/reactor_common.h index be74165b7..1ec8082b6 100644 --- a/include/core/reactor_common.h +++ b/include/core/reactor_common.h @@ -19,6 +19,9 @@ extern bool _lf_execution_started; extern bool keepalive_specified; extern interval_t _lf_fed_STA_offset; +/** Flag used to disable cleanup operations on normal termination. */ +extern bool _lf_normal_termination; + extern int default_argc; extern const char** default_argv; diff --git a/include/core/trace.h b/include/core/trace.h index 598c669bf..5946e16dc 100644 --- a/include/core/trace.h +++ b/include/core/trace.h @@ -435,6 +435,10 @@ void tracepoint_reaction_deadline_missed(trace_t* trace, reaction_t *reaction, i * close the files. */ void stop_trace(trace_t* trace); + +/** + * Version of stop_trace() that does not lock the trace mutex. + */ void stop_trace_locked(trace_t* trace); //////////////////////////////////////////////////////////// diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 1f7391f92..2d73ce2c4 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -master +port-search-removal