Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Various bugfixes and cleanups in the support for federated programs #323

Merged
merged 86 commits into from
Jan 22, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
5ad7f92
Clean up RTI
edwardalee Dec 20, 2023
e41eb63
Use lf_print_error_system_failure
edwardalee Dec 20, 2023
028e15a
Clean up federates
edwardalee Dec 20, 2023
fb47b2b
Point to lingua-franca/federated-cleanup
edwardalee Dec 20, 2023
c1c755d
Fix possible segfault on tracing termination
edwardalee Dec 21, 2023
548e278
Remove one more deadlock risk
edwardalee Dec 21, 2023
444ebee
Fix compile error and bogus comparison
edwardalee Dec 21, 2023
ab7605e
Prevent sending redundant reply to stop request
edwardalee Dec 21, 2023
b9b17af
Fixed compile error
edwardalee Dec 21, 2023
d451273
Treat the stop request from the RTI as if a local stop request had be…
edwardalee Dec 21, 2023
5bad8b9
Adjust port binding retries to realistic times
edwardalee Dec 22, 2023
4875564
RTI sends RESIGN on abnormal termination
edwardalee Dec 22, 2023
59ab5d2
Free environment only after all logging and debug statements
edwardalee Dec 23, 2023
e043239
Better handling of socket shutdown.
edwardalee Dec 23, 2023
24dab5a
Major refactoring of network functions
edwardalee Dec 26, 2023
6a1e313
Send all messages to stdout, not stderr
edwardalee Dec 27, 2023
a31a5d4
Allow scheduling at current time before execution starts
edwardalee Dec 27, 2023
b849176
Better handling of startup
edwardalee Dec 27, 2023
535cacb
Made execution_started an environment flag
edwardalee Dec 27, 2023
e17ee9a
Prevent spurious error at start
edwardalee Dec 27, 2023
f406b26
Comment only
edwardalee Dec 27, 2023
b571826
Pop events after wait, not before
edwardalee Dec 27, 2023
dfde25f
Ensure dummy events before start of execution
edwardalee Dec 27, 2023
8898631
Typo in comment
edwardalee Dec 27, 2023
be4a03a
Fixed modal reactors
edwardalee Dec 27, 2023
95b7dd8
Reworked socket send functions
edwardalee Dec 28, 2023
3f8ee20
Typo
edwardalee Dec 28, 2023
9f6600f
Better timing
edwardalee Dec 28, 2023
6d77482
Fix bug that can lead to deadlock on STP violation
edwardalee Dec 28, 2023
68b8c27
Merge branch 'main' into federated-cleanup
edwardalee Dec 28, 2023
3feb677
Tuning macros
edwardalee Dec 28, 2023
47cce71
Comments only
edwardalee Dec 29, 2023
2b0796e
Comments only
edwardalee Dec 29, 2023
089489f
Last known status of a port doesn't lag current tag
edwardalee Dec 29, 2023
96adc51
Resurrected port search for RTI. Set maximum number of RTIs on a host…
edwardalee Dec 29, 2023
6261541
Fixed authenticated
edwardalee Dec 29, 2023
7ddc477
Fixed debug message
edwardalee Dec 30, 2023
191a261
Fixed tracing of stop request messages
edwardalee Dec 30, 2023
c042ddd
Comment out too-verbose debug message
edwardalee Dec 30, 2023
61c310f
Fixed deadlock with race in lf_request_stop
edwardalee Dec 30, 2023
283ff04
Impose a time out for response to stop requests
edwardalee Dec 30, 2023
1d3a2e1
Tolerate socket closing during reading physical connection
edwardalee Dec 30, 2023
99edcf9
Have _lf_schedule_at_tag return trigger_handle_t
edwardalee Dec 31, 2023
3c95c2f
Allow tardy messages to unblock reactions and clarify docs
edwardalee Dec 31, 2023
0d4d997
General cleanup
edwardalee Jan 1, 2024
8de37dd
Clean up doxygen docs
edwardalee Jan 1, 2024
b901418
Probably uncessary precaution on connection failure
edwardalee Jan 1, 2024
a07f78b
Fix a bug in EIMT on microstep/after delay interaction
edwardalee Jan 1, 2024
9de19ab
Comments and formatting only
edwardalee Jan 2, 2024
2f1c9df
Removed message_record, replace with pqueue_tag
edwardalee Jan 2, 2024
0ba93da
Fixed RTI compile errors
edwardalee Jan 2, 2024
3c5a96b
Update test for void return value
edwardalee Jan 3, 2024
c374f31
Make resign messages backward compatible
edwardalee Jan 3, 2024
3763535
Fixed tracing for FAILED message
edwardalee Jan 3, 2024
67db29f
Fixed tracing of RESIGN
edwardalee Jan 3, 2024
3d0e8de
make clean removes executables
edwardalee Jan 3, 2024
a0e1c22
Tolerate incomplete message reads for decentralized
edwardalee Jan 6, 2024
78970c8
Formatting only
edwardalee Jan 7, 2024
8ba8a78
Removed noisy debug message
edwardalee Jan 7, 2024
48bccef
Exit RTI immediately if federate fails
edwardalee Jan 9, 2024
0106dfe
Improve the handling of tardy messages.
edwardalee Jan 10, 2024
6712510
Update last known status also for centralized coordination
edwardalee Jan 10, 2024
5de7c5e
Do not wait for tag to advance if MLAA is finite
edwardalee Jan 10, 2024
a8fa18f
Added includes (why weren't these needed before?)
edwardalee Jan 10, 2024
9da5094
Check that ports are in fact unknown before looping
edwardalee Jan 10, 2024
a961d9c
Fixed use of write_to_socket
edwardalee Jan 11, 2024
9a797bb
Fix deadlock caused by STP violation
petervdonovan Jan 13, 2024
08309a9
Merge branch 'main' into federated-cleanup
edwardalee Jan 13, 2024
5e1d9bb
Removed outdated comments
edwardalee Jan 13, 2024
f6e090d
Update core/federated/RTI/main.c
edwardalee Jan 14, 2024
f6e685e
Update core/federated/RTI/rti_common.c
edwardalee Jan 14, 2024
edfde09
Absorb delay functionality into lf_tag_add()
edwardalee Jan 14, 2024
161f00a
Clarify comments for eimt_strict()
edwardalee Jan 14, 2024
fd05ada
Print error on failure to write trace file
edwardalee Jan 14, 2024
f4ab3d8
Comment only
edwardalee Jan 14, 2024
e1783f1
Update core/federated/RTI/rti_remote.c
edwardalee Jan 15, 2024
4b7c940
Comment only
edwardalee Jan 14, 2024
5ff00a2
Comment only
edwardalee Jan 14, 2024
193bd66
Move freeing of local RTI to termination function
edwardalee Jan 15, 2024
7f84a33
Don't exit immediately on federate failure
edwardalee Jan 15, 2024
753d79c
Clean up error handling in receive_and_check_fed_id_message
edwardalee Jan 15, 2024
e44d284
Do not overwrite NET with message tag unless less
edwardalee Jan 15, 2024
c37968d
Comments only
edwardalee Jan 18, 2024
30601a8
Comments only
edwardalee Jan 20, 2024
ea398c7
Trace before write and after read
edwardalee Jan 21, 2024
6e4af8e
Do not acquire mutex during abnormal termination
edwardalee Jan 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 36 additions & 9 deletions core/federated/RTI/main.c
Original file line number Diff line number Diff line change
Expand Up @@ -67,16 +67,26 @@ static rti_remote_t rti;
*/
const char *rti_trace_file_name = "rti.lft";

/** Indicator that normal termination has occurred. */
bool normal_termination = false;

/**
* @brief A clean termination of the RTI will write the trace file, if tracing is
* enabled, before exiting.
* @brief Function to run upon termination.
* This function will be invoked both after main() returns and when a signal
* that results in terminating the process, such as SIGINT. In the former
* case, it should do nothing. In the latter case, it will attempt to write
* the trace file, but without acquiring a mutex lock, so the resulting files
* may be incomplete or even corrupted. But this is better than just failing
* to write the data we have collected so far.
*/
void termination() {
if (rti.base.tracing_enabled) {
stop_trace(rti.base.trace);
lf_print("RTI trace file saved.");
if (!normal_termination) {
if (rti.base.tracing_enabled) {
stop_trace_locked(rti.base.trace);
lf_print("RTI trace file saved.");
}
lf_print("RTI is exiting abnormally.");
}
lf_print("RTI is exiting.");
}

void usage(int argc, const char* argv[]) {
Expand All @@ -86,7 +96,7 @@ void usage(int argc, const char* argv[]) {
lf_print(" -n, --number_of_federates <n>");
lf_print(" The number of federates in the federation that this RTI will control.\n");
lf_print(" -p, --port <n>");
lf_print(" The port number to use for the RTI. Must be larger than 0 and smaller than %d. Default is %d.\n", UINT16_MAX, STARTING_PORT);
lf_print(" The port number to use for the RTI. Must be larger than 0 and smaller than %d. Default is %d.\n", UINT16_MAX, DEFAULT_PORT);
lf_print(" -c, --clock_sync [off|init|on] [period <n>] [exchanges-per-interval <n>]");
lf_print(" The status of clock synchronization for this federate.");
lf_print(" - off: Clock synchronization is off.");
Expand Down Expand Up @@ -254,6 +264,16 @@ int main(int argc, const char* argv[]) {

// Catch the Ctrl-C signal, for a clean exit that does not lose the trace information
signal(SIGINT, exit);
#ifdef SIGPIPE
// Ignore SIGPIPE errors, which terminate the entire application if
// socket write() fails because the reader has closed the socket.
// Instead, cause an EPIPE error to be set when write() fails.
// NOTE: The reason for a broken socket causing a SIGPIPE signal
// instead of just having write() return an error is to robutly
// a foo | bar pipeline where bar crashes. The default behavior
// is for foo to also exit.
signal(SIGPIPE, SIG_IGN);
#endif // SIGPIPE
if (atexit(termination) != 0) {
lf_print_warning("Failed to register termination function!");
}
Expand All @@ -277,15 +297,22 @@ int main(int argc, const char* argv[]) {
// Allocate memory for the federates
rti.base.scheduling_nodes = (scheduling_node_t**)calloc(rti.base.number_of_scheduling_nodes, sizeof(scheduling_node_t*));
for (uint16_t i = 0; i < rti.base.number_of_scheduling_nodes; i++) {
federate_info_t *fed_info = (federate_info_t *) malloc(sizeof(federate_info_t));
federate_info_t *fed_info = (federate_info_t *) calloc(1, sizeof(federate_info_t));
initialize_federate(fed_info, i);
rti.base.scheduling_nodes[i] = (scheduling_node_t *) fed_info;
}

int socket_descriptor = start_rti_server(rti.user_specified_port);
wait_for_federates(socket_descriptor);
normal_termination = true;
if (rti.base.tracing_enabled) {
// No need for a mutex lock because all threads have exited.
stop_trace_locked(rti.base.trace);
lf_print("RTI trace file saved.");
}

lf_print("RTI is exiting."); // Do this before freeing scheduling nodes.
free_scheduling_nodes(rti.base.scheduling_nodes, rti.base.number_of_scheduling_nodes);
lf_print("RTI is exiting.");
return 0;
}
#endif // STANDALONE_RTI
Expand Down
59 changes: 48 additions & 11 deletions core/federated/RTI/rti_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,40 @@ tag_t earliest_future_incoming_message_tag(scheduling_node_t* e) {
return t_d;
}

tag_t eimt_strict(scheduling_node_t* e) {
// Find the tag of the earliest possible incoming message from immediately upstream
// enclaves or federates that are not part of a zero-delay cycle.
// This will be the smallest upstream NET plus the least delay.
// This could be NEVER_TAG if the RTI has not seen a NET from some upstream node.
tag_t t_d = FOREVER_TAG;
for (int i = 0; i < e->num_upstream; i++) {
scheduling_node_t* upstream = rti_common->scheduling_nodes[e->upstream[i]];
// Skip this node if it is part of a zero-delay cycle.
if (is_in_zero_delay_cycle(upstream)) continue;
// If we haven't heard from the upstream node, then assume it can send an event at the start time.
if (lf_tag_compare(upstream->next_event, NEVER_TAG) == 0) {
tag_t start_tag = {.time = start_time, .microstep = 0};
upstream->next_event = start_tag;
}
// Need to consider nodes that are upstream of the upstream node because those
// nodes may send messages to the upstream node.
tag_t earliest = earliest_future_incoming_message_tag(upstream);
// If the next event of the upstream node is earlier, then use that.
if (lf_tag_compare(upstream->next_event, earliest) < 0) {
earliest = upstream->next_event;
}
tag_t earliest_tag_from_upstream = lf_delay_tag(earliest, e->upstream_delay[i]);
LF_PRINT_DEBUG("RTI: Strict EIMT of fed/encl %d at fed/encl %d has tag " PRINTF_TAG ".",
e->id,
upstream->id,
earliest_tag_from_upstream.time - start_time, earliest_tag_from_upstream.microstep);
if (lf_tag_compare(earliest_tag_from_upstream, t_d) < 0) {
t_d = earliest_tag_from_upstream;
}
}
return t_d;
}

tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) {
tag_advance_grant_t result = {.tag = NEVER_TAG, .is_provisional = false};

Expand Down Expand Up @@ -152,24 +186,26 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) {
// Find the tag of the earliest event that may be later received from an upstream enclave
// or federate (which includes any after delays on the connections).
tag_t t_d = earliest_future_incoming_message_tag(e);
// Strict version of the above. This is a tag that must be strictly greater than
// that of any granted PTAG.
tag_t t_d_strict = eimt_strict(e);

LF_PRINT_LOG("RTI: Earliest next event upstream of node %d has tag " PRINTF_TAG ".",
e->id, t_d.time - start_time, t_d.microstep);

// Given an EIMT (earliest incoming message tag) there are these possible scenarios:
// 1) The EIMT is greater than the NET we want to advance to. Grant a TAG.
// 2) The EIMT is equal to the NET and the federate is part of a zero-delay cycle (ZDC).
// 3) The EIMT is equal to the NET and the federate is not part of a ZDC.
// 4) The EIMT is less than the NET
// In (1) we can give a TAG to NET. In (2) we can give a PTAG.
// In (3) and (4), we wait for further updates from upstream federates.
// 2) The EIMT is equal to the NET and the strict EIMT is greater than the net
// and the federate is part of a zero-delay cycle (ZDC). Grant a PTAG.
// 3) Otherwise, grant nothing and wait for further updates.

if ( // Scenario (1) above
lf_tag_compare(t_d, e->next_event) > 0 // EIMT greater than NET
&& lf_tag_compare(e->next_event, NEVER_TAG) > 0 // NET is not NEVER_TAG
&& lf_tag_compare(t_d, e->last_provisionally_granted) >= 0 // The grant is not redundant
// (equal is important to override any previous
// PTAGs).
&& lf_tag_compare(t_d, e->last_granted) > 0 // The grant is not redundant.
// (equal is important to override any previous
// PTAGs).
&& lf_tag_compare(t_d, e->last_granted) > 0 // The grant is not redundant.
) {
// No upstream node can send events that will be received with a tag less than or equal to
// e->next_event, so it is safe to send a TAG.
Expand All @@ -180,9 +216,10 @@ tag_advance_grant_t tag_advance_grant_if_safe(scheduling_node_t* e) {
e->next_event.time - lf_time_start(),
e->next_event.microstep);
result.tag = e->next_event;
} else if( // Scenario (2) or (3) above
} else if( // Scenario (2) above
lf_tag_compare(t_d, e->next_event) == 0 // EIMT equal to NET
&& is_in_zero_delay_cycle(e) // The node is part of a ZDC
&& lf_tag_compare(t_d_strict, e->next_event) > 0 // The strict EIMT is greater than the NET
&& lf_tag_compare(t_d, e->last_provisionally_granted) > 0 // The grant is not redundant
&& lf_tag_compare(t_d, e->last_granted) > 0 // The grant is not redundant.
) {
Expand Down Expand Up @@ -317,8 +354,8 @@ void update_min_delays_upstream(scheduling_node_t* node) {

// Put the results onto the node's struct.
node->num_min_delays = count;
node->min_delays = (minimum_delay_t*)malloc(count * sizeof(minimum_delay_t));
LF_PRINT_DEBUG("++++ Node %hu(is in ZDC: %d\n", node->id, node->flags & IS_IN_ZERO_DELAY_CYCLE);
node->min_delays = (minimum_delay_t*)calloc(count, sizeof(minimum_delay_t));
LF_PRINT_DEBUG("++++ Node %hu is in ZDC: %d", node->id, is_in_zero_delay_cycle(node));
int k = 0;
for (int i = 0; i < rti_common->number_of_scheduling_nodes; i++) {
if (lf_tag_compare(path_delays[i], FOREVER_TAG) < 0) {
Expand Down
14 changes: 13 additions & 1 deletion core/federated/RTI/rti_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -230,14 +230,26 @@ void update_scheduling_node_next_event_tag_locked(scheduling_node_t* e, tag_t ne

/**
* Given a node (enclave or federate), find the tag of the earliest possible incoming
* message from upstream enclaves or federates, which will be the smallest upstream NET
* message (EIMT) from upstream enclaves or federates, which will be the smallest upstream NET
* plus the least delay. This could be NEVER_TAG if the RTI has not seen a NET from some
* upstream node.
* @param e The target node.
* @return The earliest possible incoming message tag.
*/
tag_t earliest_future_incoming_message_tag(scheduling_node_t* e);

/**
* Given a node (enclave or federate), find the earliest incoming message tag (EIMT) from
* any immediately upstream node that is not part of zero-delay cycle (ZDC).
* These tags are treated strictly by the RTI when deciding whether to grant a PTAG.
* Since the upstream node is not part of a ZDC, there is no need to block on the input
* from that node since we can simply wait for it to complete its tag without chance of
* introducing a deadlock. This will return FOREVER_TAG if there are no non-ZDC upstream nodes.
* @param e The target node.
* @return The earliest possible incoming message tag from a non-ZDC upstream node.
*/
tag_t eimt_strict(scheduling_node_t* e);

/**
* Return true if the node is in a zero-delay cycle.
* @param node The node.
Expand Down
4 changes: 2 additions & 2 deletions core/federated/RTI/rti_local.c
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ static rti_local_t * rti_local;
lf_mutex_t rti_mutex;

void initialize_local_rti(environment_t *envs, int num_envs) {
rti_local = (rti_local_t*)malloc(sizeof(rti_local_t));
rti_local = (rti_local_t*)calloc(1, sizeof(rti_local_t));
LF_ASSERT(rti_local, "Out of memory");

initialize_rti_common(&rti_local->base);
Expand All @@ -47,7 +47,7 @@ void initialize_local_rti(environment_t *envs, int num_envs) {
// Allocate memory for the enclave_info objects
rti_local->base.scheduling_nodes = (scheduling_node_t**)calloc(num_envs, sizeof(scheduling_node_t*));
for (int i = 0; i < num_envs; i++) {
enclave_info_t *enclave_info = (enclave_info_t *) malloc(sizeof(enclave_info_t));
enclave_info_t *enclave_info = (enclave_info_t *) calloc(1, sizeof(enclave_info_t));
initialize_enclave_info(enclave_info, i, &envs[i]);
rti_local->base.scheduling_nodes[i] = (scheduling_node_t *) enclave_info;

Expand Down
50 changes: 23 additions & 27 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty
timeout_time = (struct timeval){.tv_sec = UDP_TIMEOUT_TIME / BILLION, .tv_usec = (UDP_TIMEOUT_TIME % BILLION) / 1000};
}
if (socket_descriptor < 0) {
lf_print_error_and_exit("Failed to create RTI socket.");
lf_print_error_system_failure("Failed to create RTI socket.");
}

// Set the option for this socket to reuse the same address
Expand All @@ -86,8 +86,8 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty

/*
* The following used to permit reuse of a port that an RTI has previously
* used that has not been released. We no longer do this, but instead
* increment the port number until an available port is found.
* used that has not been released. We no longer do this, and instead retry
* some number of times after waiting.

// SO_REUSEPORT (since Linux 3.9)
// Permits multiple AF_INET or AF_INET6 sockets to be bound to an
Expand Down Expand Up @@ -127,28 +127,19 @@ int create_server(int32_t specified_port, uint16_t port, socket_type_t socket_ty
(struct sockaddr *) &server_fd,
sizeof(server_fd));

// If the binding fails with this port and no particular port was specified
// in the LF program, then try the next few ports in sequence.
while (result != 0
&& specified_port == 0
&& port >= STARTING_PORT
&& port <= STARTING_PORT + PORT_RANGE_LIMIT) {
lf_print("RTI failed to get port %d. Trying %d.", port, port + 1);
port++;
// Try repeatedly to bind to the specified port.
int count = 1;
while (result != 0 && count++ < PORT_BIND_RETRY_LIMIT) {
lf_print("RTI failed to get port %d. Will try again.", port);
lf_sleep(PORT_BIND_RETRY_INTERVAL);
server_fd.sin_port = htons(port);
result = bind(
socket_descriptor,
(struct sockaddr *) &server_fd,
sizeof(server_fd));
}
if (result != 0) {
if (specified_port == 0) {
lf_print_error_and_exit("Failed to bind the RTI socket. Cannot find a usable port. "
"Consider increasing PORT_RANGE_LIMIT in net_common.h.");
} else {
lf_print_error_and_exit("Failed to bind the RTI socket. Specified port is not available. "
"Consider leaving the port unspecified");
}
lf_print_error_and_exit("Failed to bind the RTI socket. Port %d is not available. ", port);
}
char* type = "TCP";
if (socket_type == UDP) {
Expand Down Expand Up @@ -251,9 +242,9 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
// a later or equal PTAG or TAG sent previously and if their transitive
// NET is greater than or equal to the tag.
// This is needed to stimulate absent messages from upstream and break deadlocks.
// NOTE: This could later be replaced with a TNET mechanism once
// we have an available encoding of causality interfaces.
// That might be more efficient.
// The scenario this deals with is illustrated in `test/C/src/federated/FeedbackDelay2.lf`
// and `test/C/src/federated/FeedbackDelay4.lf`.
// Note that this is transitive.
// NOTE: This is not needed for enclaves because zero-delay loops are prohibited.
// It's only needed for federates, which is why this is implemented here.
for (int j = 0; j < e->num_upstream; j++) {
Expand All @@ -263,10 +254,13 @@ void notify_provisional_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
if (upstream->state == NOT_CONNECTED) continue;

tag_t earliest = earliest_future_incoming_message_tag(upstream);
tag_t strict_earliest = eimt_strict(upstream);

// If these tags are equal, then a TAG or PTAG should have already been granted,
// in which case, another will not be sent. But it may not have been already granted.
if (lf_tag_compare(earliest, tag) >= 0) {
if (lf_tag_compare(earliest, tag) > 0) {
notify_tag_advance_grant(upstream, tag);
} else if(lf_tag_compare(earliest, tag) == 0 && lf_tag_compare(strict_earliest, tag) > 0) {
notify_provisional_tag_advance_grant(upstream, tag);
}
}
Expand Down Expand Up @@ -1016,7 +1010,7 @@ void handle_federate_resign(federate_info_t *my_fed) {
// an orderly shutdown.
// close(my_fed->socket); // from unistd.h

lf_print("Federate %d has resigned.", my_fed->enclave.id);
lf_print("RTI: Federate %d has resigned.", my_fed->enclave.id);

// Check downstream federates to see whether they should now be granted a TAG.
// To handle cycles, need to create a boolean array to keep
Expand Down Expand Up @@ -1093,8 +1087,10 @@ void* federate_info_thread_TCP(void* fed) {
}

// Nothing more to do. Close the socket and exit.
// Prevent multiple threads from closing the same socket at the same time.
lf_mutex_lock(&rti_mutex);
close(my_fed->socket); // from unistd.h

lf_mutex_unlock(&rti_mutex);
return NULL;
}

Expand Down Expand Up @@ -1458,7 +1454,7 @@ void connect_to_federates(int socket_descriptor) {
// Got a socket
break;
} else if (socket_id < 0 && (errno != EAGAIN || errno != EWOULDBLOCK)) {
lf_print_error_and_exit("RTI failed to accept the socket. %s.", strerror(errno));
lf_print_error_system_failure("RTI failed to accept the socket.");
} else {
// Try again
lf_print_warning("RTI failed to accept the socket. %s. Trying again.", strerror(errno));
Expand Down Expand Up @@ -1558,8 +1554,8 @@ void initialize_federate(federate_info_t* fed, uint16_t id) {
int32_t start_rti_server(uint16_t port) {
int32_t specified_port = port;
if (port == 0) {
// Use the default starting port.
port = STARTING_PORT;
// Use the default port.
port = DEFAULT_PORT;
}
_lf_initialize_clock();
// Create the TCP socket server
Expand Down
4 changes: 1 addition & 3 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,7 @@ extern int lf_critical_section_exit(environment_t* env);
/**
* Create a server and enable listening for socket connections.
*
* @note This function is similar to create_server(...) in
* federate.c. However, it contains logs that are specific
* to the RTI.
* @note This function is different from create_server(...) in federate.c.
*
* @param port The port number to use.
* @param socket_type The type of the socket for the server (TCP or UDP).
Expand Down
6 changes: 2 additions & 4 deletions core/federated/clock-sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,14 @@ uint16_t setup_clock_synchronization_with_rti() {
_lf_rti_socket_UDP,
(struct sockaddr *) &federate_UDP_addr,
sizeof(federate_UDP_addr)) < 0) {
lf_print_error_and_exit("Failed to bind its UDP socket: %s.",
strerror(errno));
lf_print_error_system_failure("Failed to bind its UDP socket.");
}
// Retrieve the port number that was assigned by the operating system
socklen_t addr_length = sizeof(federate_UDP_addr);
if (getsockname(_lf_rti_socket_UDP, (struct sockaddr *)&federate_UDP_addr, &addr_length) == -1) {
// FIXME: Send 0 UDP_PORT message instead of exiting.
// That will disable clock synchronization.
lf_print_error_and_exit("Failed to retrieve UDP port: %s.",
strerror(errno));
lf_print_error_system_failure("Failed to retrieve UDP port.");
}
LF_PRINT_DEBUG("Assigned UDP port number %u to its socket.", ntohs(federate_UDP_addr.sin_port));

Expand Down
Loading
Loading