diff --git a/src/cpp/rtps/transport/TCPTransportInterface.cpp b/src/cpp/rtps/transport/TCPTransportInterface.cpp index bf947089ed..d428bf4a53 100644 --- a/src/cpp/rtps/transport/TCPTransportInterface.cpp +++ b/src/cpp/rtps/transport/TCPTransportInterface.cpp @@ -1098,9 +1098,6 @@ void TCPTransportInterface::perform_listen_operation( { remote_locator = remote_endpoint_to_locator(channel); - uint32_t port = channel->local_endpoint().port(); - set_name_to_current_thread("dds.tcp.%u", port); - if (channel->tcp_connection_type() == TCPChannelResource::TCPConnectionType::TCP_CONNECT_TYPE) { rtcp_message_manager->sendConnectionRequest(channel); diff --git a/src/cpp/utils/threading.hpp b/src/cpp/utils/threading.hpp index db489490c4..83d181b775 100644 --- a/src/cpp/utils/threading.hpp +++ b/src/cpp/utils/threading.hpp @@ -15,6 +15,8 @@ #ifndef UTILS__THREADING_HPP_ #define UTILS__THREADING_HPP_ +#include + #include "./thread.hpp" namespace eprosima { @@ -29,48 +31,55 @@ struct ThreadSettings; /** * @brief Give a name to the thread calling this function. * - * @param[in] name A null-terminated string with the name to give to the calling thread. - * The implementation for certain platforms may truncate the final thread - * name if there is a limit on the length of the name of a thread. + * @param[in, out] thread_name_buffer Buffer to store the name of the thread. + * @param[in] name A null-terminated string with the name to give to the calling thread. + * The implementation for certain platforms may truncate the final thread + * name if there is a limit on the length of the name of a thread. */ void set_name_to_current_thread( + std::array& thread_name_buffer, const char* name); /** * @brief Give a name to the thread calling this function. * - * @param[in] fmt A null-terminated string to be used as the format argument of - * a `snprintf` like function, in order to accomodate the restrictions of - * the OS. Those restrictions may truncate the final thread name if there - * is a limit on the length of the name of a thread. - * @param[in] arg Single variadic argument passed to the formatting function. + * @param[in, out] thread_name_buffer Buffer to store the name of the thread. + * @param[in] fmt A null-terminated string to be used as the format argument of + * a `snprintf` like function, in order to accomodate the restrictions of + * the OS. Those restrictions may truncate the final thread name if there + * is a limit on the length of the name of a thread. + * @param[in] arg Single variadic argument passed to the formatting function. */ void set_name_to_current_thread( + std::array& thread_name_buffer, const char* fmt, uint32_t arg); /** * @brief Give a name to the thread calling this function. * - * @param[in] fmt A null-terminated string to be used as the format argument of - * a `snprintf` like function, in order to accomodate the restrictions of - * the OS. Those restrictions may truncate the final thread name if there - * is a limit on the length of the name of a thread. - * @param[in] arg1 First variadic argument passed to the formatting function. - * @param[in] arg2 Second variadic argument passed to the formatting function. + * @param[in, out] thread_name_buffer Buffer to store the name of the thread. + * @param[in] fmt A null-terminated string to be used as the format argument of + * a `snprintf` like function, in order to accomodate the restrictions of + * the OS. Those restrictions may truncate the final thread name if there + * is a limit on the length of the name of a thread. + * @param[in] arg1 First variadic argument passed to the formatting function. + * @param[in] arg2 Second variadic argument passed to the formatting function. */ void set_name_to_current_thread( + std::array& thread_name_buffer, const char* fmt, uint32_t arg1, uint32_t arg2); - /** * @brief Apply thread settings to the thread calling this function. * + * @param[in] thread_name Name of the thread. * @param[in] settings Thread settings to apply. */ void apply_thread_settings_to_current_thread( + const char* thread_name, const fastdds::rtps::ThreadSettings& settings); /** @@ -94,8 +103,9 @@ eprosima::thread create_thread( { return eprosima::thread(settings.stack_size, [=]() { - apply_thread_settings_to_current_thread(settings); - set_name_to_current_thread(name, args ...); + std::array thread_name_buffer; + set_name_to_current_thread(thread_name_buffer, name, args ...); + apply_thread_settings_to_current_thread(thread_name_buffer.data(), settings); func(); }); } diff --git a/src/cpp/utils/threading/threading_empty.ipp b/src/cpp/utils/threading/threading_empty.ipp index 684664fc29..fa4ca5afc0 100644 --- a/src/cpp/utils/threading/threading_empty.ipp +++ b/src/cpp/utils/threading/threading_empty.ipp @@ -12,20 +12,26 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include + namespace eprosima { void set_name_to_current_thread( + std::array& /* thread_name_buffer */, const char* /* name */) { } void set_name_to_current_thread( + std::array& /* thread_name_buffer */, const char* /* fmt */, uint32_t /* arg */) { } void set_name_to_current_thread( + std::array& /* thread_name_buffer */, const char* /* fmt */, uint32_t /* arg1 */, uint32_t /* arg2 */) @@ -33,6 +39,7 @@ void set_name_to_current_thread( } void apply_thread_settings_to_current_thread( + const char* /* thread_name */, const fastdds::rtps::ThreadSettings& /*settings*/) { } diff --git a/src/cpp/utils/threading/threading_osx.ipp b/src/cpp/utils/threading/threading_osx.ipp index 70c7cf817f..5fa21ee6dd 100644 --- a/src/cpp/utils/threading/threading_osx.ipp +++ b/src/cpp/utils/threading/threading_osx.ipp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -24,37 +25,42 @@ namespace eprosima { -template +template static void set_name_to_current_thread_impl( - const char* fmt, Args... args) + std::array& thread_name_buffer, + const char* fmt, + Args... args) { - char thread_name[16]{}; - snprintf(thread_name, 16, fmt, args...); - pthread_setname_np(thread_name); + snprintf(thread_name_buffer.data(), 16, fmt, args ...); + pthread_setname_np(thread_name_buffer.data()); } void set_name_to_current_thread( + std::array& thread_name_buffer, const char* name) { - set_name_to_current_thread_impl("%s", name); + set_name_to_current_thread_impl(thread_name_buffer, "%s", name); } void set_name_to_current_thread( + std::array& thread_name_buffer, const char* fmt, uint32_t arg) { - set_name_to_current_thread_impl(fmt, arg); + set_name_to_current_thread_impl(thread_name_buffer, fmt, arg); } void set_name_to_current_thread( + std::array& thread_name_buffer, const char* fmt, uint32_t arg1, uint32_t arg2) { - set_name_to_current_thread_impl(fmt, arg1, arg2); + set_name_to_current_thread_impl(thread_name_buffer, fmt, arg1, arg2); } static void configure_current_thread_scheduler( + const char* thread_name, int sched_class, int sched_priority) { @@ -64,67 +70,86 @@ static void configure_current_thread_scheduler( int current_class; int result = 0; bool change_priority = (std::numeric_limits::min() != sched_priority); - + // Get current scheduling parameters memset(¤t_param, 0, sizeof(current_param)); pthread_getschedparam(self_tid, ¤t_class, ¤t_param); - + memset(¶m, 0, sizeof(param)); param.sched_priority = 0; sched_class = (sched_class == -1) ? current_class : sched_class; - + // - // Set Scheduler Class and Priority + // Set Scheduler Class and Priority // - - if(sched_class == SCHED_OTHER) - { - + + if (sched_class == SCHED_OTHER) + { + // // Sched OTHER has a nice value, that we pull from the priority parameter. // - Requires priorty value to be zero (0). - // + // result = pthread_setschedparam(self_tid, sched_class, ¶m); - if(0 == result && change_priority) + if (0 == result && change_priority) { uint64_t tid; pthread_threadid_np(NULL, &tid); result = setpriority(PRIO_PROCESS, tid, sched_priority); - } + if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Problem to set priority of thread with id [" << tid << "," << thread_name << "] to value " << sched_priority << ". Error '" << strerror( + result) << "'"); + } + } + else if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Problem to set scheduler of thread with id [" << self_tid << "," << thread_name << "] to value " << sched_class << ". Error '" << strerror( + result) << "'"); + } } - else if((sched_class == SCHED_FIFO) || + else if ((sched_class == SCHED_FIFO) || (sched_class == SCHED_RR)) { // // RT Policies use a different priority numberspace. // - param.sched_priority = change_priority ? sched_priority : current_param.sched_priority; result = pthread_setschedparam(self_tid, sched_class, ¶m); - } - - if (0 != result) - { - EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << strerror(result) << "' configuring scheduler for thread " << self_tid); + if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Problem to set scheduler of thread with id [" << self_tid << "," << thread_name << "] to value " << sched_class << " with priority " << param.sched_priority << ". Error '" << strerror( + result) << "'"); + } } } static void configure_current_thread_affinity( + const char* thread_name, uint64_t affinity) { if (affinity <= static_cast(std::numeric_limits::max())) { + int result = 0; thread_affinity_policy_data_t policy = { static_cast(affinity) }; pthread_t self_tid = pthread_self(); - thread_policy_set(pthread_mach_thread_np(self_tid), THREAD_AFFINITY_POLICY, (thread_policy_t)&policy, 1); + result = + thread_policy_set(pthread_mach_thread_np(self_tid), THREAD_AFFINITY_POLICY, (thread_policy_t)&policy, + 1); + if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Problem to set affinity of thread with id [" << self_tid << "," << thread_name << "] to value " << affinity << ". Error '" << strerror( + result) << "'"); + } } } void apply_thread_settings_to_current_thread( + const char* thread_name, const fastdds::rtps::ThreadSettings& settings) { - configure_current_thread_scheduler(settings.scheduling_policy, settings.priority); - configure_current_thread_affinity(settings.affinity); + configure_current_thread_scheduler(thread_name, settings.scheduling_policy, settings.priority); + configure_current_thread_affinity(thread_name, settings.affinity); } } // namespace eprosima diff --git a/src/cpp/utils/threading/threading_pthread.ipp b/src/cpp/utils/threading/threading_pthread.ipp index 75ad33f2d6..d9556fc886 100644 --- a/src/cpp/utils/threading/threading_pthread.ipp +++ b/src/cpp/utils/threading/threading_pthread.ipp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -27,38 +28,43 @@ namespace eprosima { -template +template static void set_name_to_current_thread_impl( - const char* fmt, Args... args) + std::array& thread_name_buffer, + const char* fmt, + Args... args) { - char thread_name[16]{}; - snprintf(thread_name, 16, fmt, args...); + snprintf(thread_name_buffer.data(), 16, fmt, args ...); auto id = pthread_self(); - pthread_setname_np(id, thread_name); + pthread_setname_np(id, thread_name_buffer.data()); } void set_name_to_current_thread( + std::array& thread_name_buffer, const char* name) { - set_name_to_current_thread_impl("%s", name); + set_name_to_current_thread_impl(thread_name_buffer, "%s", name); } void set_name_to_current_thread( + std::array& thread_name_buffer, const char* fmt, uint32_t arg) { - set_name_to_current_thread_impl(fmt, arg); + set_name_to_current_thread_impl(thread_name_buffer, fmt, arg); } void set_name_to_current_thread( + std::array& thread_name_buffer, const char* fmt, uint32_t arg1, uint32_t arg2) { - set_name_to_current_thread_impl(fmt, arg1, arg2); + set_name_to_current_thread_impl(thread_name_buffer, fmt, arg1, arg2); } static void configure_current_thread_scheduler( + const char* thread_name, int sched_class, int sched_priority) { @@ -81,9 +87,9 @@ static void configure_current_thread_scheduler( // Set Scheduler Class and Priority // - if((sched_class == SCHED_OTHER) || - (sched_class == SCHED_BATCH) || - (sched_class == SCHED_IDLE)) + if ((sched_class == SCHED_OTHER) || + (sched_class == SCHED_BATCH) || + (sched_class == SCHED_IDLE)) { // // BATCH and IDLE do not have explicit priority values. @@ -95,12 +101,22 @@ static void configure_current_thread_scheduler( // Sched OTHER has a nice value, that we pull from the priority parameter. // - if(0 == result && sched_class == SCHED_OTHER && change_priority) + if (0 == result && sched_class == SCHED_OTHER && change_priority) { result = setpriority(PRIO_PROCESS, gettid(), sched_priority); + if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Problem to set priority of thread with id [" << self_tid << "," << thread_name << "] to value " << sched_priority << ". Error '" << strerror( + result) << "'"); + } + } + else if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Problem to set scheduler of thread with id [" << self_tid << "," << thread_name << "] to value " << sched_class << ". Error '" << strerror( + result) << "'"); } } - else if((sched_class == SCHED_FIFO) || + else if ((sched_class == SCHED_FIFO) || (sched_class == SCHED_RR)) { // @@ -109,15 +125,16 @@ static void configure_current_thread_scheduler( param.sched_priority = change_priority ? sched_priority : current_param.sched_priority; result = pthread_setschedparam(self_tid, sched_class, ¶m); - } - - if (0 != result) - { - EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << strerror(result) << "' configuring scheduler for thread " << self_tid); + if (0 != result) + { + EPROSIMA_LOG_ERROR(SYSTEM, "Problem to set scheduler of thread with id [" << self_tid << "," << thread_name << "] to value " << sched_class << " with priority " << param.sched_priority << ". Error '" << strerror( + result) << "'"); + } } } static void configure_current_thread_affinity( + const char* thread_name, uint64_t affinity_mask) { int a; @@ -141,9 +158,9 @@ static void configure_current_thread_affinity( // cpu_count = get_nprocs_conf(); - for(a = 0; a < cpu_count; a++) + for (a = 0; a < cpu_count; a++) { - if(0 != (affinity_mask & 1)) + if (0 != (affinity_mask & 1)) { CPU_SET(a, &cpu_set); result++; @@ -156,26 +173,28 @@ static void configure_current_thread_affinity( EPROSIMA_LOG_ERROR(SYSTEM, "Affinity mask has more processors than the ones present in the system"); } - if(result > 0) + if (result > 0) { #ifdef ANDROID result = sched_setaffinity(self_tid, sizeof(cpu_set_t), &cpu_set); #else result = pthread_setaffinity_np(self_tid, sizeof(cpu_set_t), &cpu_set); -#endif +#endif // ifdef ANDROID } if (0 != result) { - EPROSIMA_LOG_ERROR(SYSTEM, "Error '" << strerror(result) << "' configuring affinity for thread " << self_tid); + EPROSIMA_LOG_ERROR(SYSTEM, "Problem to set affinity of thread with id [" << self_tid << "," << thread_name << "] to value " << affinity_mask << ". Error '" << strerror( + result) << "'"); } } void apply_thread_settings_to_current_thread( + const char* thread_name, const fastdds::rtps::ThreadSettings& settings) { - configure_current_thread_scheduler(settings.scheduling_policy, settings.priority); - configure_current_thread_affinity(settings.affinity); + configure_current_thread_scheduler(thread_name, settings.scheduling_policy, settings.priority); + configure_current_thread_affinity(thread_name, settings.affinity); } } // namespace eprosima diff --git a/src/cpp/utils/threading/threading_win32.ipp b/src/cpp/utils/threading/threading_win32.ipp index 70db0740e5..60ea4acf7a 100644 --- a/src/cpp/utils/threading/threading_win32.ipp +++ b/src/cpp/utils/threading/threading_win32.ipp @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include #include #include #include @@ -23,41 +24,45 @@ namespace eprosima { template static void set_name_to_current_thread_impl( + std::array& thread_name_buffer, const char* fmt, Args... args) { - char thread_name[16]{}; - snprintf(thread_name, 16, fmt, args ...); + snprintf(thread_name_buffer.data(), 16, fmt, args ...); std::wstringstream stream; - stream << thread_name; + stream << thread_name_buffer.data(); std::wstring w_thread_name = stream.str(); SetThreadDescription(GetCurrentThread(), w_thread_name.c_str()); } void set_name_to_current_thread( + std::array& thread_name_buffer, const char* name) { - set_name_to_current_thread_impl("%s", name); + set_name_to_current_thread_impl(thread_name_buffer, "%s", name); } void set_name_to_current_thread( + std::array& thread_name_buffer, const char* fmt, uint32_t arg) { - set_name_to_current_thread_impl(fmt, arg); + set_name_to_current_thread_impl(thread_name_buffer, fmt, arg); } void set_name_to_current_thread( + std::array& thread_name_buffer, const char* fmt, uint32_t arg1, uint32_t arg2) { - set_name_to_current_thread_impl(fmt, arg1, arg2); + set_name_to_current_thread_impl(thread_name_buffer, fmt, arg1, arg2); } static void configure_current_thread_priority( + const char* thread_name, int32_t priority) { if (priority != std::numeric_limits::min()) @@ -65,12 +70,14 @@ static void configure_current_thread_priority( if (0 == SetThreadPriority(GetCurrentThread(), priority)) { EPROSIMA_LOG_ERROR(SYSTEM, - "Error '" << GetLastError() << "' configuring priority for thread " << GetCurrentThread()); + "Problem to set priority of thread with id [" << GetCurrentThreadId() << "," << thread_name << "] to value " << priority << + ". Error '" << GetLastError() << "'"); } } } static void configure_current_thread_affinity( + const char* thread_name, uint64_t affinity_mask) { if (affinity_mask != 0) @@ -78,16 +85,18 @@ static void configure_current_thread_affinity( if (0 == SetThreadAffinityMask(GetCurrentThread(), static_cast(affinity_mask))) { EPROSIMA_LOG_ERROR(SYSTEM, - "Error '" << GetLastError() << "' configuring affinity for thread " << GetCurrentThread()); + "Problem to set affinity of thread with id [" << GetCurrentThreadId() << "," << thread_name << "] to value " << affinity_mask << + ". Error '" << GetLastError() << "'"); } } } void apply_thread_settings_to_current_thread( + const char* thread_name, const fastdds::rtps::ThreadSettings& settings) { - configure_current_thread_priority(settings.priority); - configure_current_thread_affinity(settings.affinity); + configure_current_thread_priority(thread_name, settings.priority); + configure_current_thread_affinity(thread_name, settings.affinity); } } // namespace eprosima diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index 47072b0824..d4c51199c1 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -1539,6 +1539,13 @@ class PubSubWriter return *this; } + PubSubWriter& set_events_thread_settings( + const eprosima::fastdds::rtps::ThreadSettings& settings) + { + participant_qos_.timed_events_thread(settings); + return *this; + } + const std::string& topic_name() const { return topic_name_; diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp index 25cfb9a1be..0dafd62432 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp @@ -1274,6 +1274,13 @@ class PubSubWriter return *this; } + PubSubWriter& set_events_thread_settings( + const eprosima::fastdds::rtps::ThreadSettings& settings) + { + participant_attr_.rtps.timed_events_thread = settings; + return *this; + } + const std::string& topic_name() const { return topic_name_; diff --git a/test/blackbox/common/DDSBlackboxTestsThreadSettingsQos.cpp b/test/blackbox/common/DDSBlackboxTestsThreadSettingsQos.cpp new file mode 100644 index 0000000000..d0b9d8072e --- /dev/null +++ b/test/blackbox/common/DDSBlackboxTestsThreadSettingsQos.cpp @@ -0,0 +1,101 @@ +// Copyright 2024 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include +#include + +#include "BlackboxTests.hpp" +#include "PubSubReader.hpp" +#include "PubSubWriter.hpp" + +using namespace eprosima::fastrtps; +using namespace eprosima::fastrtps::rtps; +using namespace eprosima::fastdds::dds; + +class CustomLogConsumer : public LogConsumer +{ + +public: + + CustomLogConsumer( + const char* regex_str) + : regex_(regex_str) + { + } + + void set_regex( + const char* regex_str) + { + std::lock_guard lock(mutex_); + regex_ = std::regex(regex_str); + } + + virtual void Consume( + const Log::Entry& entry) + { + std::stringstream stream; + print_timestamp(stream, entry, false); + print_header(stream, entry, false); + print_message(stream, entry, false); + print_context(stream, entry, false); + print_new_line(stream, false); + + { + std::lock_guard lock(mutex_); + EXPECT_TRUE(std::regex_match(stream.str(), regex_)); + } + } + +private: + + std::mutex mutex_; + std::regex regex_; +}; + +TEST(ThreadSettingsQoS, thread_settings_qos_outputs_proper_logging_on_failure) +{ + std::string expected_log = "^(.+)(SYSTEM)(.+)(Problem to set priority of thread with id)(.+)(\n)$"; + CustomLogConsumer* custom_consumer = new CustomLogConsumer(expected_log.c_str()); + + Log::RegisterConsumer(std::unique_ptr(custom_consumer)); + Log::SetVerbosity(Log::Error); + Log::SetCategoryFilter(std::regex("(SYSTEM)")); + + PubSubWriter writer(TEST_TOPIC_NAME); + + eprosima::fastdds::rtps::ThreadSettings th_settings; + th_settings.priority = -5; + + writer.set_events_thread_settings(th_settings) + .init(); + + Log::Flush(); + +#ifndef _WIN32 + custom_consumer->set_regex("^(.+)(SYSTEM)(.+)(Problem to set scheduler of thread with id)(.+)(\n)$"); + expected_log = "^(.+)(SYSTEM)(.+)(Problem to set scheduler of thread with id)(.+)(\n)$"; + writer.destroy(); + + th_settings = eprosima::fastdds::rtps::ThreadSettings(); + th_settings.scheduling_policy = 1; + + writer.set_events_thread_settings(th_settings) + .init(); + + Log::Flush(); +#endif // ifndef _WIN32 +}