Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port search removal #319

Closed
wants to merge 14 commits into from
Closed
39 changes: 31 additions & 8 deletions core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,26 @@ static rti_remote_t rti;
*/
const char *rti_trace_file_name = "rti.lft";

/** Indicator that normal termination has occurred. */
bool normal_termination = false;

/**
* @brief A clean termination of the RTI will write the trace file, if tracing is
* enabled, before exiting.
* @brief Function to run upon termination.
* This function will be invoked both after main() returns and when a signal
* that results in terminating the process, such as SIGINT. In the former
* case, it should do nothing. In the latter case, it will attempt to write
* the trace file, but without acquiring a mutex lock, so the resulting files
* may be incomplete or even corrupted. But this is better than just failing
* to write the data we have collected so far.
*/
void termination() {
if (rti.base.tracing_enabled) {
stop_trace(rti.base.trace);
lf_print("RTI trace file saved.");
if (!normal_termination) {
if (rti.base.tracing_enabled) {
stop_trace_locked(rti.base.trace);
lf_print("RTI trace file saved.");
}
lf_print("RTI is exiting abnormally.");
}
lf_print("RTI is exiting.");
}

void usage(int argc, const char* argv[]) {
Expand All @@ -86,7 +96,7 @@ void usage(int argc, const char* argv[]) {
lf_print(" -n, --number_of_federates <n>");
lf_print(" The number of federates in the federation that this RTI will control.\n");
lf_print(" -p, --port <n>");
lf_print(" The port number to use for the RTI. Must be larger than 0 and smaller than %d. Default is %d.\n", UINT16_MAX, STARTING_PORT);
lf_print(" The port number to use for the RTI. Must be larger than 0 and smaller than %d. Default is %d.\n", UINT16_MAX, DEFAULT_PORT);
lf_print(" -c, --clock_sync [off|init|on] [period <n>] [exchanges-per-interval <n>]");
lf_print(" The status of clock synchronization for this federate.");
lf_print(" - off: Clock synchronization is off.");
Expand Down Expand Up @@ -254,6 +264,12 @@ int main(int argc, const char* argv[]) {

// Catch the Ctrl-C signal, for a clean exit that does not lose the trace information
signal(SIGINT, exit);
#ifdef SIGPIPE
// Ignore SIGPIPE errors, which terminate the entire application if
// socket write() fails because the reader has closed the socket.
// Instead, cause an EPIPE error to be set when write() fails.
signal(SIGPIPE, SIG_IGN);
#endif // SIGPIPE
if (atexit(termination) != 0) {
lf_print_warning("Failed to register termination function!");
}
Expand All @@ -277,13 +293,20 @@ int main(int argc, const char* argv[]) {
// Allocate memory for the federates
rti.base.scheduling_nodes = (scheduling_node_t**)calloc(rti.base.number_of_scheduling_nodes, sizeof(scheduling_node_t*));
for (uint16_t i = 0; i < rti.base.number_of_scheduling_nodes; i++) {
federate_info_t *fed_info = (federate_info_t *) malloc(sizeof(federate_info_t));
federate_info_t *fed_info = (federate_info_t *) calloc(1, sizeof(federate_info_t));
initialize_federate(fed_info, i);
rti.base.scheduling_nodes[i] = (scheduling_node_t *) fed_info;
}

int socket_descriptor = start_rti_server(rti.user_specified_port);
wait_for_federates(socket_descriptor);
normal_termination = true;
if (rti.base.tracing_enabled) {
// No need for a mutex lock because all threads have exited.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to put this in an assertion. I'm weary of assumptions stated in comments that may become false or misleading when the code changes...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know of any way to put this in an assertion.

stop_trace_locked(rti.base.trace);
lf_print("RTI trace file saved.");
}

free_scheduling_nodes(rti.base.scheduling_nodes, rti.base.number_of_scheduling_nodes);
lf_print("RTI is exiting.");
return 0;
Expand Down
2 changes: 1 addition & 1 deletion core/federated/RTI/rti_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ void update_min_delays_upstream(scheduling_node_t* node) {

// Put the results onto the node's struct.
node->num_min_delays = count;
node->min_delays = (minimum_delay_t*)malloc(count * sizeof(minimum_delay_t));
node->min_delays = (minimum_delay_t*)calloc(count, sizeof(minimum_delay_t));
LF_PRINT_DEBUG("++++ Node %hu(is in ZDC: %d\n", node->id, node->flags & IS_IN_ZERO_DELAY_CYCLE);
int k = 0;
for (int i = 0; i < rti_common->number_of_scheduling_nodes; i++) {
Expand Down
4 changes: 2 additions & 2 deletions core/federated/RTI/rti_local.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ static rti_local_t * rti_local;
lf_mutex_t rti_mutex;

void initialize_local_rti(environment_t *envs, int num_envs) {
rti_local = (rti_local_t*)malloc(sizeof(rti_local_t));
rti_local = (rti_local_t*)calloc(1, sizeof(rti_local_t));
LF_ASSERT(rti_local, "Out of memory");

initialize_rti_common(&rti_local->base);
Expand All @@ -47,7 +47,7 @@ void initialize_local_rti(environment_t *envs, int num_envs) {
// Allocate memory for the enclave_info objects
rti_local->base.scheduling_nodes = (scheduling_node_t**)calloc(num_envs, sizeof(scheduling_node_t*));
for (int i = 0; i < num_envs; i++) {
enclave_info_t *enclave_info = (enclave_info_t *) malloc(sizeof(enclave_info_t));
enclave_info_t *enclave_info = (enclave_info_t *) calloc(1, sizeof(enclave_info_t));
initialize_enclave_info(enclave_info, i, &envs[i]);
rti_local->base.scheduling_nodes[i] = (scheduling_node_t *) enclave_info;

Expand Down
31 changes: 11 additions & 20 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty

/*
* The following used to permit reuse of a port that an RTI has previously
* used that has not been released. We no longer do this, but instead
* increment the port number until an available port is found.
* used that has not been released. We no longer do this, and instead retry
* some number of times after waiting.

// SO_REUSEPORT (since Linux 3.9)
// Permits multiple AF_INET or AF_INET6 sockets to be bound to an
Expand Down Expand Up @@ -127,28 +127,19 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty
(struct sockaddr *) &server_fd,
sizeof(server_fd));

// If the binding fails with this port and no particular port was specified
// in the LF program, then try the next few ports in sequence.
while (result != 0
&& specified_port == 0
&& port >= STARTING_PORT
&& port <= STARTING_PORT + PORT_RANGE_LIMIT) {
lf_print("RTI failed to get port %d. Trying %d.", port, port + 1);
port++;
// Try repeatedly to bind to the specified port.
int count = 1;
while (result != 0 && count++ < PORT_BIND_RETRY_LIMIT) {
lf_print("RTI failed to get port %d. Will try again.", port);
lf_sleep(PORT_BIND_RETRY_INTERVAL);
server_fd.sin_port = htons(port);
result = bind(
socket_descriptor,
(struct sockaddr *) &server_fd,
sizeof(server_fd));
}
if (result != 0) {
if (specified_port == 0) {
lf_print_error_and_exit("Failed to bind the RTI socket. Cannot find a usable port. "
"Consider increasing PORT_RANGE_LIMIT in net_common.h.");
} else {
lf_print_error_and_exit("Failed to bind the RTI socket. Specified port is not available. "
"Consider leaving the port unspecified");
}
lf_print_error_and_exit("Failed to bind the RTI socket. Port %d is not available. ", port);
}
char* type = "TCP";
if (socket_type == UDP) {
Expand Down Expand Up @@ -1016,7 +1007,7 @@ void handle_federate_resign(federate_info_t *my_fed) {
// an orderly shutdown.
// close(my_fed->socket); // from unistd.h

lf_print("Federate %d has resigned.", my_fed->enclave.id);
lf_print("RTI: Federate %d has resigned.", my_fed->enclave.id);

// Check downstream federates to see whether they should now be granted a TAG.
// To handle cycles, need to create a boolean array to keep
Expand Down Expand Up @@ -1558,8 +1549,8 @@ void initialize_federate(federate_info_t* fed, uint16_t id) {
int32_t start_rti_server(uint16_t port) {
int32_t specified_port = port;
if (port == 0) {
// Use the default starting port.
port = STARTING_PORT;
// Use the default port.
port = DEFAULT_PORT;
}
_lf_initialize_clock();
// Create the TCP socket server
Expand Down
4 changes: 1 addition & 3 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,7 @@ extern int lf_critical_section_exit(environment_t* env);
/**
* Create a server and enable listening for socket connections.
*
* @note This function is similar to create_server(...) in
* federate.c. However, it contains logs that are specific
* to the RTI.
* @note This function is different from create_server(...) in federate.c.
*
* @param port The port number to use.
* @param socket_type The type of the socket for the server (TCP or UDP).
Expand Down
Loading
Loading