Skip to content

Commit

Permalink
Merge pull request #346 from lf-lang/clock-realtime
Browse files Browse the repository at this point in the history
Move to CLOCK_REALTIME introduce clock.h and lf_atomic.h
  • Loading branch information
erlingrj authored Feb 11, 2024
2 parents 85d54a4 + 884bc45 commit 26eb346
Show file tree
Hide file tree
Showing 43 changed files with 633 additions and 532 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ if(DEFINED LF_SINGLE_THREADED)
add_compile_definitions(LF_SINGLE_THREADED=1)
endif()

# Warnings as errors
add_compile_options(-Werror)

set(Test test)
set(Lib lib)
set(CoreLibPath core)
Expand Down
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
set(CORE_ROOT ${CMAKE_CURRENT_SOURCE_DIR})

# Get the general common sources for reactor-c
list(APPEND GENERAL_SOURCES tag.c port.c mixed_radix.c reactor_common.c lf_token.c environment.c)
list(APPEND GENERAL_SOURCES tag.c clock.c port.c mixed_radix.c reactor_common.c lf_token.c environment.c)

# Add tracing support if requested
if (DEFINED LF_TRACE)
Expand Down
60 changes: 60 additions & 0 deletions core/clock.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* @file
* @author Erling Jellum
* @copyright (c) 2020-2024, The University of California at Berkeley.
* License: <a href="https://github.com/lf-lang/reactor-c/blob/main/LICENSE.md">BSD 2-clause</a>
* @brief Implementations of functions in clock.h.
*/
#include "clock.h"
#include "platform.h"

#if defined(_LF_CLOCK_SYNC_ON)
#include "clock-sync.h"
#endif

static instant_t last_read_physical_time = NEVER;

int lf_clock_gettime(instant_t *now) {
instant_t last_read_local;
int res = _lf_clock_gettime(now);
if (res != 0) {
return -1;
}
#if defined (_LF_CLOCK_SYNC_ON)
clock_sync_apply_offset(now);
#endif
do {
// Atomically fetch the last read value. This is done with
// atomics to guarantee that it works on 32bit platforms as well.
last_read_local = lf_atomic_fetch_add64(&last_read_physical_time, 0);

// Ensure monotonicity.
if (*now < last_read_local) {
*now = last_read_local+1;
}

// Update the last read value, atomically and also make sure that another
// thread has not been here in between and changed it. If so. We must redo
// the monotonicity calculation.
} while(!lf_atomic_bool_compare_and_swap64(&last_read_physical_time, last_read_local, *now));

return 0;
}

int lf_clock_interruptable_sleep_until_locked(environment_t *env, instant_t wakeup_time) {
#if defined (_LF_CLOCK_SYNC_ON)
// Remove any clock sync offset and call the Platform API.
clock_sync_remove_offset(&wakeup_time);
#endif
return _lf_interruptable_sleep_until_locked(env, wakeup_time);
}

#if !defined(LF_SINGLE_THREADED)
int lf_clock_cond_timedwait(lf_cond_t *cond, instant_t wakeup_time) {
#if defined (_LF_CLOCK_SYNC_ON)
// Remove any clock sync offset and call the Platform API.
clock_sync_remove_offset(&wakeup_time);
#endif
return _lf_cond_timedwait(cond, wakeup_time);
}
#endif
4 changes: 4 additions & 0 deletions core/federated/RTI/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ add_executable(
rti_remote.c
${CoreLib}/trace.c
${LF_PLATFORM_FILE}
${CoreLib}/platform/lf_atomic_gcc_clang.c
${CoreLib}/platform/lf_unix_clock_support.c
${CoreLib}/utils/util.c
${CoreLib}/tag.c
${CoreLib}/clock.c
${CoreLib}/federated/network/net_util.c
${CoreLib}/utils/pqueue_base.c
${CoreLib}/utils/pqueue_tag.c
Expand All @@ -92,6 +94,8 @@ target_compile_definitions(RTI PUBLIC PLATFORM_${CMAKE_SYSTEM_NAME})
# Set RTI Tracing
target_compile_definitions(RTI PUBLIC RTI_TRACE)

# Warnings as errors
target_compile_options(RTI PUBLIC -Werror)
# Find threads and link to it
find_package(Threads REQUIRED)
target_link_libraries(RTI Threads::Threads)
Expand Down
16 changes: 8 additions & 8 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,10 @@ void handle_port_absent_message(federate_info_t *sending_federate, unsigned char
LF_MUTEX_UNLOCK(&rti_mutex);
lf_print_warning("RTI: Destination federate %d is no longer connected. Dropping message.",
federate_id);
LF_PRINT_LOG("Fed status: next_event (" PRINTF_TIME ", %d), "
"completed (" PRINTF_TIME ", %d), "
"last_granted (" PRINTF_TIME ", %d), "
"last_provisionally_granted (" PRINTF_TIME ", %d).",
LF_PRINT_LOG("Fed status: next_event " PRINTF_TAG ", "
"completed " PRINTF_TAG ", "
"last_granted " PRINTF_TAG ", "
"last_provisionally_granted " PRINTF_TAG ".",
fed->enclave.next_event.time - start_time,
fed->enclave.next_event.microstep,
fed->enclave.completed.time - start_time,
Expand Down Expand Up @@ -432,10 +432,10 @@ void handle_timed_message(federate_info_t *sending_federate, unsigned char *buff
LF_MUTEX_UNLOCK(&rti_mutex);
lf_print_warning("RTI: Destination federate %d is no longer connected. Dropping message.",
federate_id);
LF_PRINT_LOG("Fed status: next_event (" PRINTF_TIME ", %d), "
"completed (" PRINTF_TIME ", %d), "
"last_granted (" PRINTF_TIME ", %d), "
"last_provisionally_granted (" PRINTF_TIME ", %d).",
LF_PRINT_LOG("Fed status: next_event " PRINTF_TAG ", "
"completed " PRINTF_TAG ", "
"last_granted " PRINTF_TAG ", "
"last_provisionally_granted " PRINTF_TAG ".",
fed->enclave.next_event.time - start_time,
fed->enclave.next_event.microstep,
fed->enclave.completed.time - start_time,
Expand Down
47 changes: 37 additions & 10 deletions core/federated/clock-sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "net_util.h"
#include "util.h"

// Global variables defined in tag.c:
extern interval_t _lf_time_physical_clock_offset;
extern interval_t _lf_time_test_physical_clock_offset;
/** Offset calculated by the clock synchronization algorithm. */
interval_t _lf_clock_sync_offset = NSEC(0);
/** Offset used to test clock synchronization (clock sync should largely remove this offset). */
interval_t _lf_clock_sync_constant_bias = NSEC(0);

/**
* Keep a record of connection statistics
Expand Down Expand Up @@ -77,6 +78,14 @@ instant_t _lf_last_clock_sync_instant = 0LL;
*/
int _lf_rti_socket_UDP = -1;

/**
* Atomically add an adjustment to the clock sync offset.
* This needs to be atomic to be thread safe, particularly on 32-bit platforms.
*/
static void adjust_lf_clock_sync_offset(interval_t adjustment) {
lf_atomic_fetch_add64(&_lf_clock_sync_offset, adjustment);
}

#ifdef _LF_CLOCK_SYNC_COLLECT_STATS
/**
* Update statistic on the socket based on the newly calculated network delay
Expand Down Expand Up @@ -380,7 +389,7 @@ void handle_T4_clock_sync_message(unsigned char* buffer, int socket, instant_t r
// Apply a jitter attenuator to the estimated clock error to prevent
// large jumps in the underlying clock.
// Note that estimated_clock_error is calculated using lf_time_physical() which includes
// the _lf_time_physical_clock_offset adjustment.
// the clock sync adjustment.
adjustment = estimated_clock_error / _LF_CLOCK_SYNC_ATTENUATION;
} else {
// Use of TCP socket means we are in the startup phase, so
Expand Down Expand Up @@ -420,21 +429,19 @@ void handle_T4_clock_sync_message(unsigned char* buffer, int socket, instant_t r
// which means we can now adjust the clock offset.
// For the AVG algorithm, history is a running average and can be directly
// applied
_lf_time_physical_clock_offset += _lf_rti_socket_stat.history;
adjust_lf_clock_sync_offset(_lf_rti_socket_stat.history);
// @note AVG and SD will be zero if collect-stats is set to false
LF_PRINT_LOG("Clock sync:"
" New offset: " PRINTF_TIME "."
" Round trip delay to RTI (now): " PRINTF_TIME "."
" (AVG): " PRINTF_TIME "."
" (SD): " PRINTF_TIME "."
" Local round trip delay: " PRINTF_TIME "."
" Test offset: " PRINTF_TIME ".",
_lf_time_physical_clock_offset,
" Local round trip delay: " PRINTF_TIME ".",
_lf_clock_sync_offset,
network_round_trip_delay,
stats.average,
stats.standard_deviation,
_lf_rti_socket_stat.local_delay,
_lf_time_test_physical_clock_offset);
_lf_rti_socket_stat.local_delay);
// Reset the stats
reset_socket_stat(&_lf_rti_socket_stat);
// Set the last instant at which the clocks were synchronized
Expand Down Expand Up @@ -538,6 +545,25 @@ void* listen_to_rti_UDP_thread(void* args) {
}


// If clock synchronization is enabled, provide implementations. If not
// just empty implementations that should be optimized away.
#if defined(FEDERATED) && defined(_LF_CLOCK_SYNC_ON)
void clock_sync_apply_offset(instant_t *t) {
*t += (_lf_clock_sync_offset + _lf_clock_sync_constant_bias);
}

void clock_sync_remove_offset(instant_t *t) {
*t -= (_lf_clock_sync_offset + _lf_clock_sync_constant_bias);
}

void clock_sync_set_constant_bias(interval_t offset) {
_lf_clock_sync_constant_bias = offset;
}
#else
void clock_sync_apply_offset(instant_t *t) { }
void clock_sync_remove_offset(instant_t *t) { }
void clock_sync_set_constant_bias(interval_t offset) { }
#endif

/**
* Create the thread responsible for handling clock synchronization
Expand All @@ -554,4 +580,5 @@ int create_clock_sync_thread(lf_thread_t* thread_id) {
#endif // _LF_CLOCK_SYNC_ON
return 0;
}

#endif
15 changes: 4 additions & 11 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
#endif

// Global variables defined in tag.c:
extern instant_t _lf_last_reported_unadjusted_physical_time_ns;
extern instant_t start_time;

// Global variable defined in reactor_common.c:
Expand Down Expand Up @@ -1841,7 +1840,7 @@ void lf_connect_to_federate(uint16_t remote_federate_id) {
remote_federate_id, CONNECT_MAX_RETRIES);
return;
}
lf_print_warning("Could not connect to federate %d. Will try again every %lld nanoseconds.\n",
lf_print_warning("Could not connect to federate %d. Will try again every" PRINTF_TIME "nanoseconds.\n",
remote_federate_id, ADDRESS_QUERY_RETRY_INTERVAL);

// Check whether the RTI is still there.
Expand Down Expand Up @@ -2144,7 +2143,7 @@ void lf_enqueue_port_absent_reactions(environment_t* env){
return;
}
#endif
LF_PRINT_DEBUG("Enqueueing port absent reactions at time %lld.", (long long) (env->current_tag.time - start_time));
LF_PRINT_DEBUG("Enqueueing port absent reactions at time " PRINTF_TIME ".", (env->current_tag.time - start_time));
if (num_port_absent_reactions == 0) {
LF_PRINT_DEBUG("No port absent reactions.");
return;
Expand Down Expand Up @@ -2509,21 +2508,15 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply)
// RTI. That amount of time will be no greater than ADVANCE_MESSAGE_INTERVAL in the future.
LF_PRINT_DEBUG("Waiting for physical time to elapse or an event on the event queue.");

// The above call to bounded_NET called lf_time_physical()
// set _lf_last_reported_unadjusted_physical_time_ns, the
// time obtained using CLOCK_REALTIME before adjustment for
// clock synchronization. Since that is the clock used by
// lf_cond_timedwait, this is the clock we want to use.
instant_t wait_until_time_ns =
_lf_last_reported_unadjusted_physical_time_ns + ADVANCE_MESSAGE_INTERVAL;
instant_t wait_until_time_ns = lf_time_physical() + ADVANCE_MESSAGE_INTERVAL;

// Regardless of the ADVANCE_MESSAGE_INTERVAL, do not let this
// wait exceed the time of the next tag.
if (wait_until_time_ns > original_tag.time) {
wait_until_time_ns = original_tag.time;
}

lf_cond_timedwait(&env->event_q_changed, wait_until_time_ns);
lf_clock_cond_timedwait(&env->event_q_changed, wait_until_time_ns);

LF_PRINT_DEBUG("Wait finished or interrupted.");

Expand Down
8 changes: 4 additions & 4 deletions core/platform/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ set(LF_PLATFORM_FILES
lf_zephyr_clock_counter.c
lf_zephyr_clock_kernel.c
lf_rp2040_support.c
lf_atomic_windows.c
lf_atomic_gcc_clang.c
lf_atomic_irq.c
)

if(${CMAKE_SYSTEM_NAME} STREQUAL "Windows")
set(CMAKE_SYSTEM_VERSION 10.0)
message("Using Windows SDK version ${CMAKE_VS_WINDOWS_TARGET_PLATFORM_VERSION}")
elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Nrf52")
if(${CMAKE_SYSTEM_NAME} STREQUAL "Nrf52")
list(APPEND REACTORC_COMPILE_DEFS PLATFORM_NRF52)
elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Zephyr")
list(APPEND REACTORC_COMPILE_DEFS PLATFORM_ZEPHYR)
Expand Down
4 changes: 2 additions & 2 deletions core/platform/arduino_mbed/ConditionWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ void condition_delete(void* condition){
delete cv;
}

bool condition_wait_for(void* condition, int64_t absolute_time_ns){
bool condition_wait_for(void* condition, int64_t wakeup_time){
ConditionVariable* cv = (ConditionVariable*) condition;
return cv->wait_for(absolute_time_ns / 1000000LL);
return cv->wait_for(wakeup_time / 1000000LL);
}

int condition_wait(void* condition){
Expand Down
16 changes: 8 additions & 8 deletions core/platform/lf_C11_threads_support.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#if !defined(LF_SINGLE_THREADED) && !defined(PLATFORM_ARDUINO)
#include "platform.h"
#include "lf_C11_threads_support.h"

#include <threads.h>
#include <stdlib.h>
#include <stdint.h> // For fixed-width integral types
Expand Down Expand Up @@ -45,17 +44,18 @@ int lf_cond_wait(lf_cond_t* cond) {
return cnd_wait((cnd_t*)&cond->condition, (mtx_t*)cond->mutex);
}

int lf_cond_timedwait(lf_cond_t* cond, int64_t absolute_time_ns) {
// Convert the absolute time to a timespec.
// timespec is seconds and nanoseconds.
struct timespec timespec_absolute_time
= {(time_t)absolute_time_ns / 1000000000LL, (long)absolute_time_ns % 1000000000LL};
int return_value = 0;
return_value = cnd_timedwait(
int _lf_cond_timedwait(lf_cond_t* cond, instant_t wakeup_time) {
struct timespec timespec_absolute_time = {
.tv_sec = wakeup_time / BILLION,
.tv_nsec = wakeup_time % BILLION
};

int return_value = cnd_timedwait(
(cnd_t*)&cond->condition,
(mtx_t*)cond->mutex,
&timespec_absolute_time
);

switch (return_value) {
case thrd_timedout:
return_value = LF_TIMEOUT;
Expand Down
11 changes: 4 additions & 7 deletions core/platform/lf_POSIX_threads_support.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#if !defined(LF_SINGLE_THREADED) && !defined(PLATFORM_ARDUINO)
#include "platform.h"
#include "lf_POSIX_threads_support.h"
#include "lf_unix_clock_support.h"

#include <pthread.h>
#include <errno.h>
Expand Down Expand Up @@ -62,13 +63,9 @@ int lf_cond_wait(lf_cond_t* cond) {
return pthread_cond_wait((pthread_cond_t*)&cond->condition, (pthread_mutex_t*)cond->mutex);
}

int lf_cond_timedwait(lf_cond_t* cond, int64_t absolute_time_ns) {
// Convert the absolute time to a timespec.
// timespec is seconds and nanoseconds.
struct timespec timespec_absolute_time
= {(time_t)absolute_time_ns / 1000000000LL, (long)absolute_time_ns % 1000000000LL};
int return_value = 0;
return_value = pthread_cond_timedwait(
int _lf_cond_timedwait(lf_cond_t* cond, instant_t wakeup_time) {
struct timespec timespec_absolute_time = convert_ns_to_timespec(wakeup_time);
int return_value = pthread_cond_timedwait(
(pthread_cond_t*)&cond->condition,
(pthread_mutex_t*)cond->mutex,
&timespec_absolute_time
Expand Down
Loading

0 comments on commit 26eb346

Please sign in to comment.