Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move to CLOCK_REALTIME introduce clock.h and lf_atomic.h #346

Merged
merged 61 commits into from
Feb 11, 2024
Merged
Show file tree
Hide file tree
Changes from 48 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
839ee39
Simplify tag.c by removing epoch offset and clock sync offset handling
erlingrj Feb 3, 2024
de31c3a
Thread-safe clock-sync offset application
erlingrj Feb 3, 2024
09d0ccc
Removes CLOCK_MONOTONIC. Applies and removes clock sync offset under …
erlingrj Feb 3, 2024
61f9fa1
Fix typo
erlingrj Feb 3, 2024
73ce8bb
Didnt fix the typos properly
erlingrj Feb 3, 2024
ff492b1
Try to fix macos compiler error
erlingrj Feb 3, 2024
08a09cb
FIx some more mistakes
erlingrj Feb 3, 2024
4460c1c
FIx up the atomics API. It was currently not safe for use with 64bit …
erlingrj Feb 4, 2024
1a1ec8f
Typo
erlingrj Feb 4, 2024
96de9e3
Fix up use of lf_bool_compare_and_swap
erlingrj Feb 4, 2024
f06a5f1
Fix up CMake for lf_atomic
erlingrj Feb 4, 2024
f2f2ed9
Fix up some naming issues
erlingrj Feb 4, 2024
c56524e
Put enable/disable interrupts as part of the Platform API
erlingrj Feb 4, 2024
c0f96e9
Remove #else in atomic64 since Arduino build
erlingrj Feb 4, 2024
09f0f83
Typo
erlingrj Feb 4, 2024
d9c6e58
Fix arduino-cli and cast to int32 when using lf_atomic_32 API
erlingrj Feb 4, 2024
b1608b1
Provide implementations of lf_disable_interupts for arduino and rpi …
erlingrj Feb 4, 2024
c748d12
Make lf_time_physical thread-safe and 32bit-safe
erlingrj Feb 4, 2024
e3338fb
INclude atomics implementation in RTI also
erlingrj Feb 4, 2024
7e81c2e
Do clock-sync application and removal lock-free
erlingrj Feb 4, 2024
8a797d3
Make sure that SEC(x) can be formatted without warning by PRINTF_TIME
erlingrj Feb 4, 2024
263b0c4
ADd atomics to RTI. Also add -Werror to RTI build
erlingrj Feb 4, 2024
ddb845f
Fix printf warning
erlingrj Feb 4, 2024
bcb6548
Typo
erlingrj Feb 4, 2024
7b8a898
Update core/federated/clock-sync.c
erlingrj Feb 5, 2024
5c87a28
Merge remote-tracking branch 'origin/main' into clock-realtime
erlingrj Feb 5, 2024
b9780e1
Apply suggestions from code review
erlingrj Feb 5, 2024
5022c41
Fix some print formatting
erlingrj Feb 5, 2024
337bbc6
Type
erlingrj Feb 5, 2024
697d823
Forgotten semicolon
erlingrj Feb 5, 2024
5b83456
Merge branch 'clock-realtime' of https://github.com/lf-lang/reactor-c…
erlingrj Feb 5, 2024
02e9b86
Add support for setting a constant bias for the clock readouts.
erlingrj Feb 5, 2024
ece395e
Update lf-ref
erlingrj Feb 5, 2024
798cf5b
Remove disable/enable interrupts from the common platform API
erlingrj Feb 5, 2024
f768dec
We need enable/disable interrupts in the common platform API behind i…
erlingrj Feb 5, 2024
d69348b
Update core/tag.c
erlingrj Feb 7, 2024
c8a609f
FIx up atomics AP.
erlingrj Feb 7, 2024
eedb91b
Merge branch 'clock-realtime' of https://github.com/lf-lang/reactor-c…
erlingrj Feb 7, 2024
d060511
Fix RTI build
erlingrj Feb 7, 2024
4e44c00
Remove some whitspace
erlingrj Feb 7, 2024
4a2c8e5
Remove redundant LF_MIN_SLEEP inside platform API
erlingrj Feb 7, 2024
b5d4613
Merge branch 'main' into clock-realtime
erlingrj Feb 8, 2024
70b8750
Include windows headers for lf_atomic_windows
erlingrj Feb 8, 2024
c3d61c4
Fix some warnings
erlingrj Feb 8, 2024
bc6f405
Fix warnings and mistake in monotonic physical time
erlingrj Feb 8, 2024
42eee2f
Do atomics on the clock-sync side, not the apply/remove side
erlingrj Feb 8, 2024
664fe9a
Do self-review. Fix several issues relating to adding/removing clock …
erlingrj Feb 8, 2024
4680498
ADd more docs on the clock-sync stuff
erlingrj Feb 8, 2024
3e9a325
Rethink platform API wrt clocks
erlingrj Feb 8, 2024
77631a6
Hide clock_sync application behind macros for now to make it compatib…
erlingrj Feb 8, 2024
7d4b379
Apply suggestions from code review
erlingrj Feb 8, 2024
850a05e
Comments only
erlingrj Feb 8, 2024
3ee3aaf
Merge branch 'clock-realtime' of https://github.com/lf-lang/reactor-c…
erlingrj Feb 8, 2024
2e9e6e4
Fix RTI build
erlingrj Feb 8, 2024
4a70c46
lf_cond_timedwait -> lf_clock_cond_timedwait
erlingrj Feb 9, 2024
62d364d
USe PRINTF_TAG
erlingrj Feb 11, 2024
cdd9588
Fix arduino sleep_until function
erlingrj Feb 11, 2024
e36f10f
Apply suggestions from code review
erlingrj Feb 11, 2024
73274fe
Remove some empty lines
erlingrj Feb 11, 2024
23ac71c
Merge branch 'clock-realtime' of https://github.com/lf-lang/reactor-c…
erlingrj Feb 11, 2024
884bc45
Fix comment
erlingrj Feb 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ if(DEFINED LF_SINGLE_THREADED)
add_compile_definitions(LF_SINGLE_THREADED=1)
endif()

# Warnings as errors
add_compile_options(-Werror)

set(Test test)
set(Lib lib)
set(CoreLibPath core)
Expand Down
3 changes: 3 additions & 0 deletions core/federated/RTI/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ add_executable(
rti_remote.c
${CoreLib}/trace.c
${LF_PLATFORM_FILE}
${CoreLib}/platform/lf_atomic_gcc_clang.c
${CoreLib}/platform/lf_unix_clock_support.c
${CoreLib}/utils/util.c
${CoreLib}/tag.c
Expand All @@ -92,6 +93,8 @@ target_compile_definitions(RTI PUBLIC PLATFORM_${CMAKE_SYSTEM_NAME})
# Set RTI Tracing
target_compile_definitions(RTI PUBLIC RTI_TRACE)

# Warnings as errors
target_compile_options(RTI PUBLIC -Werror)
# Find threads and link to it
find_package(Threads REQUIRED)
target_link_libraries(RTI Threads::Threads)
Expand Down
38 changes: 28 additions & 10 deletions core/federated/clock-sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,8 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "net_util.h"
#include "util.h"

// Global variables defined in tag.c:
extern interval_t _lf_time_physical_clock_offset;
extern interval_t _lf_time_test_physical_clock_offset;
interval_t _lf_clock_sync_offset = NSEC(0);
interval_t _lf_clock_sync_constant_bias = NSEC(0);

/**
* Keep a record of connection statistics
Expand Down Expand Up @@ -77,6 +76,12 @@ instant_t _lf_last_clock_sync_instant = 0LL;
*/
int _lf_rti_socket_UDP = -1;

static void adjust_lf_clock_sync_offset(interval_t adjustment) {
// Do an atomic adjustment of the clock sync offset. This is needed
// to ensure thread-safety on 32bit platforms.
lf_atomic_fetch_add64(&_lf_clock_sync_offset, adjustment);
}

#ifdef _LF_CLOCK_SYNC_COLLECT_STATS
/**
* Update statistic on the socket based on the newly calculated network delay
Expand Down Expand Up @@ -380,7 +385,7 @@ void handle_T4_clock_sync_message(unsigned char* buffer, int socket, instant_t r
// Apply a jitter attenuator to the estimated clock error to prevent
// large jumps in the underlying clock.
// Note that estimated_clock_error is calculated using lf_time_physical() which includes
// the _lf_time_physical_clock_offset adjustment.
// the clock sync adjustment.
adjustment = estimated_clock_error / _LF_CLOCK_SYNC_ATTENUATION;
} else {
// Use of TCP socket means we are in the startup phase, so
Expand Down Expand Up @@ -420,21 +425,19 @@ void handle_T4_clock_sync_message(unsigned char* buffer, int socket, instant_t r
// which means we can now adjust the clock offset.
// For the AVG algorithm, history is a running average and can be directly
// applied
_lf_time_physical_clock_offset += _lf_rti_socket_stat.history;
adjust_lf_clock_sync_offset(_lf_rti_socket_stat.history);
// @note AVG and SD will be zero if collect-stats is set to false
LF_PRINT_LOG("Clock sync:"
" New offset: " PRINTF_TIME "."
" Round trip delay to RTI (now): " PRINTF_TIME "."
" (AVG): " PRINTF_TIME "."
" (SD): " PRINTF_TIME "."
" Local round trip delay: " PRINTF_TIME "."
" Test offset: " PRINTF_TIME ".",
_lf_time_physical_clock_offset,
" Local round trip delay: " PRINTF_TIME ".",
_lf_clock_sync_offset,
network_round_trip_delay,
stats.average,
stats.standard_deviation,
_lf_rti_socket_stat.local_delay,
_lf_time_test_physical_clock_offset);
_lf_rti_socket_stat.local_delay);
// Reset the stats
reset_socket_stat(&_lf_rti_socket_stat);
// Set the last instant at which the clocks were synchronized
Expand Down Expand Up @@ -554,4 +557,19 @@ int create_clock_sync_thread(lf_thread_t* thread_id) {
#endif // _LF_CLOCK_SYNC_ON
return 0;
}

#if defined (_LF_CLOCK_SYNC_ON)
void clock_sync_apply_offset(instant_t *t) {
*t += (_lf_clock_sync_offset + _lf_clock_sync_constant_bias);
}

void clock_sync_remove_offset(instant_t *t) {
*t -= (_lf_clock_sync_offset + _lf_clock_sync_constant_bias);
}

void clock_sync_set_constant_bias(interval_t offset) {
_lf_clock_sync_constant_bias = offset;
}
#endif

#endif
13 changes: 3 additions & 10 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
#endif

// Global variables defined in tag.c:
extern instant_t _lf_last_reported_unadjusted_physical_time_ns;
extern instant_t start_time;

// Global variable defined in reactor_common.c:
Expand Down Expand Up @@ -1841,7 +1840,7 @@ void lf_connect_to_federate(uint16_t remote_federate_id) {
remote_federate_id, CONNECT_MAX_RETRIES);
return;
}
lf_print_warning("Could not connect to federate %d. Will try again every %lld nanoseconds.\n",
lf_print_warning("Could not connect to federate %d. Will try again every" PRINTF_TIME "nanoseconds.\n",
remote_federate_id, ADDRESS_QUERY_RETRY_INTERVAL);

// Check whether the RTI is still there.
Expand Down Expand Up @@ -2144,7 +2143,7 @@ void lf_enqueue_port_absent_reactions(environment_t* env){
return;
}
#endif
LF_PRINT_DEBUG("Enqueueing port absent reactions at time %lld.", (long long) (env->current_tag.time - start_time));
LF_PRINT_DEBUG("Enqueueing port absent reactions at time " PRINTF_TIME ".", (env->current_tag.time - start_time));
if (num_port_absent_reactions == 0) {
LF_PRINT_DEBUG("No port absent reactions.");
return;
Expand Down Expand Up @@ -2509,13 +2508,7 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply)
// RTI. That amount of time will be no greater than ADVANCE_MESSAGE_INTERVAL in the future.
LF_PRINT_DEBUG("Waiting for physical time to elapse or an event on the event queue.");

// The above call to bounded_NET called lf_time_physical()
// set _lf_last_reported_unadjusted_physical_time_ns, the
// time obtained using CLOCK_REALTIME before adjustment for
// clock synchronization. Since that is the clock used by
// lf_cond_timedwait, this is the clock we want to use.
instant_t wait_until_time_ns =
_lf_last_reported_unadjusted_physical_time_ns + ADVANCE_MESSAGE_INTERVAL;
instant_t wait_until_time_ns = lf_time_physical() + ADVANCE_MESSAGE_INTERVAL;

// Regardless of the ADVANCE_MESSAGE_INTERVAL, do not let this
// wait exceed the time of the next tag.
Expand Down
8 changes: 4 additions & 4 deletions core/platform/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ set(LF_PLATFORM_FILES
lf_zephyr_clock_counter.c
lf_zephyr_clock_kernel.c
lf_rp2040_support.c
lf_atomic_windows.c
lf_atomic_gcc_clang.c
lf_atomic_irq.c
)

if(${CMAKE_SYSTEM_NAME} STREQUAL "Windows")
set(CMAKE_SYSTEM_VERSION 10.0)
message("Using Windows SDK version ${CMAKE_VS_WINDOWS_TARGET_PLATFORM_VERSION}")
elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Nrf52")
if(${CMAKE_SYSTEM_NAME} STREQUAL "Nrf52")
list(APPEND REACTORC_COMPILE_DEFS PLATFORM_NRF52)
elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Zephyr")
list(APPEND REACTORC_COMPILE_DEFS PLATFORM_ZEPHYR)
Expand Down
4 changes: 2 additions & 2 deletions core/platform/arduino_mbed/ConditionWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ void condition_delete(void* condition){
delete cv;
}

bool condition_wait_for(void* condition, int64_t absolute_time_ns){
bool condition_wait_for(void* condition, int64_t wakeup_time){
ConditionVariable* cv = (ConditionVariable*) condition;
return cv->wait_for(absolute_time_ns / 1000000LL);
return cv->wait_for(wakeup_time / 1000000LL);
}

int condition_wait(void* condition){
Expand Down
17 changes: 9 additions & 8 deletions core/platform/lf_C11_threads_support.c
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#if !defined(LF_SINGLE_THREADED) && !defined(PLATFORM_ARDUINO)
#include "platform.h"
#include "lf_C11_threads_support.h"

#include <threads.h>
#include <stdlib.h>
#include <stdint.h> // For fixed-width integral types
Expand Down Expand Up @@ -45,17 +44,19 @@ int lf_cond_wait(lf_cond_t* cond) {
return cnd_wait((cnd_t*)&cond->condition, (mtx_t*)cond->mutex);
}

int lf_cond_timedwait(lf_cond_t* cond, int64_t absolute_time_ns) {
// Convert the absolute time to a timespec.
// timespec is seconds and nanoseconds.
struct timespec timespec_absolute_time
= {(time_t)absolute_time_ns / 1000000000LL, (long)absolute_time_ns % 1000000000LL};
int return_value = 0;
return_value = cnd_timedwait(
int lf_cond_timedwait(lf_cond_t* cond, instant_t wakeup_time) {
clock_sync_remove_offset(&wakeup_time);
struct timespec timespec_absolute_time = {
.tv_sec = wakeup_time / BILLION,
.tv_nsec = wakeup_time % BILLION
};

int return_value = cnd_timedwait(
(cnd_t*)&cond->condition,
(mtx_t*)cond->mutex,
&timespec_absolute_time
);

switch (return_value) {
case thrd_timedout:
return_value = LF_TIMEOUT;
Expand Down
13 changes: 6 additions & 7 deletions core/platform/lf_POSIX_threads_support.c
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#if !defined(LF_SINGLE_THREADED) && !defined(PLATFORM_ARDUINO)
#include "platform.h"
#include "lf_POSIX_threads_support.h"
#include "lf_unix_clock_support.h"

#include <pthread.h>
#include <errno.h>
Expand Down Expand Up @@ -62,13 +63,11 @@ int lf_cond_wait(lf_cond_t* cond) {
return pthread_cond_wait((pthread_cond_t*)&cond->condition, (pthread_mutex_t*)cond->mutex);
}

int lf_cond_timedwait(lf_cond_t* cond, int64_t absolute_time_ns) {
// Convert the absolute time to a timespec.
// timespec is seconds and nanoseconds.
struct timespec timespec_absolute_time
= {(time_t)absolute_time_ns / 1000000000LL, (long)absolute_time_ns % 1000000000LL};
int return_value = 0;
return_value = pthread_cond_timedwait(
int lf_cond_timedwait(lf_cond_t* cond, instant_t wakeup_time) {
// Remove the clock synchronization offset.
clock_sync_remove_offset(&wakeup_time);
struct timespec timespec_absolute_time = convert_ns_to_timespec(wakeup_time);
int return_value = pthread_cond_timedwait(
(pthread_cond_t*)&cond->condition,
(pthread_mutex_t*)cond->mutex,
&timespec_absolute_time
Expand Down
9 changes: 5 additions & 4 deletions core/platform/lf_arduino_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ static volatile uint32_t _lf_time_us_low_last = 0;
*/
int _lf_interruptable_sleep_until_locked(environment_t* env, instant_t wakeup) {
instant_t now;

_lf_async_event = false;
lf_disable_interrupts_nested();

Expand Down Expand Up @@ -122,11 +123,10 @@ int _lf_clock_now(instant_t* t) {
}

*t = COMBINE_HI_LO(_lf_time_us_high, now_us_low) * 1000ULL;
clock_sync_apply_offset(t);
return 0;
}

#if defined(LF_SINGLE_THREADED)

int lf_enable_interrupts_nested() {
if (_lf_num_nested_critical_sections++ == 0) {
// First nested entry into a critical section.
Expand All @@ -148,6 +148,7 @@ int lf_disable_interrupts_nested() {
return 0;
}

#if defined(LF_SINGLE_THREADED)
/**
* Handle notifications from the runtime of changes to the event queue.
* If a sleep is in progress, it should be interrupted.
Expand Down Expand Up @@ -219,10 +220,10 @@ int lf_cond_wait(lf_cond_t* cond) {
return 0;
}

int lf_cond_timedwait(lf_cond_t* cond, instant_t absolute_time_ns) {
int lf_cond_timedwait(lf_cond_t* cond, instant_t wakeup_time) {
instant_t now;
_lf_clock_now(&now);
interval_t sleep_duration_ns = absolute_time_ns - now;
interval_t sleep_duration_ns = wakeup_time - now;
bool res = condition_wait_for(*cond, sleep_duration_ns);
if (!res) {
return 0;
Expand Down
40 changes: 40 additions & 0 deletions core/platform/lf_atomic_gcc_clang.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
#if defined(PLATFORM_Linux) || defined(PLATFORM_Darwin)
#if defined(__GNUC__) || defined(__clang__)
/**
* @author Soroush Bateni
* @author Erling Rennemo Jellum
* @copyright (c) 2023
* License: <a href="https://github.com/lf-lang/reactor-c/blob/main/LICENSE.md">BSD 2-clause</a>
* @brief Implements the atomics API using GCC/Clang APIs.
*/

#include "lf_atomic.h"
#include "platform.h"

int32_t lf_atomic_fetch_add32(int32_t *ptr, int32_t value) {
return __sync_fetch_and_add(ptr, value);
}
int64_t lf_atomic_fetch_add64(int64_t *ptr, int64_t value) {
return __sync_fetch_and_add(ptr, value);
}
int32_t lf_atomic_add_fetch32(int32_t *ptr, int32_t value) {
return __sync_add_and_fetch(ptr, value);
}
int64_t lf_atomic_add_fetch64(int64_t *ptr, int64_t value) {
return __sync_add_and_fetch(ptr, value);
}
bool lf_atomic_bool_compare_and_swap32(int32_t *ptr, int32_t oldval, int32_t newval) {
return __sync_bool_compare_and_swap(ptr, oldval, newval);
}
bool lf_atomic_bool_compare_and_swap64(int64_t *ptr, int64_t oldval, int64_t newval) {
return __sync_bool_compare_and_swap(ptr, oldval, newval);
}
int32_t lf_atomic_val_compare_and_swap32(int32_t *ptr, int32_t oldval, int32_t newval) {
return __sync_val_compare_and_swap(ptr, oldval, newval);
}
int64_t lf_atomic_val_compare_and_swap64(int64_t *ptr, int64_t oldval, int64_t newval) {
return __sync_val_compare_and_swap(ptr, oldval, newval);
}

#endif
#endif
Loading
Loading