diff --git a/tools/rtpsrelay/Config.h b/tools/rtpsrelay/Config.h index 9150eb2eec..c304f62af3 100644 --- a/tools/rtpsrelay/Config.h +++ b/tools/rtpsrelay/Config.h @@ -36,6 +36,7 @@ class Config { , max_ips_per_client_(0) , admission_max_participants_high_water_(0) , admission_max_participants_low_water_(0) + , handler_threads_(1) {} void relay_id(const std::string& value) @@ -338,6 +339,16 @@ class Config { return admission_max_participants_low_water_; } + void handler_threads(size_t count) + { + handler_threads_ = count; + } + + size_t handler_threads() const + { + return handler_threads_; + } + private: std::string relay_id_; OpenDDS::DCPS::GUID_t application_participant_guid_; @@ -369,6 +380,7 @@ class Config { OpenDDS::DCPS::TimeDuration rejected_address_duration_; size_t admission_max_participants_high_water_; size_t admission_max_participants_low_water_; + size_t handler_threads_; }; } diff --git a/tools/rtpsrelay/RelayEventLoop.cpp b/tools/rtpsrelay/RelayEventLoop.cpp new file mode 100644 index 0000000000..266dd1f22b --- /dev/null +++ b/tools/rtpsrelay/RelayEventLoop.cpp @@ -0,0 +1,115 @@ +/* + * Distributed under the OpenDDS License. + * See: http://www.opendds.org/license.html + */ + +#include "RelayEventLoop.h" + +#include +#include +#include + +#include + +namespace RtpsRelay { + +struct ThreadPool : ACE_Task_Base { + + ThreadPool(const Config& config, ACE_Reactor& reactor, RelayThreadMonitor& monitor) + : config_(config) + , reactor_(reactor) + , monitor_(monitor) + {} + + int svc() override; + int run(); + + const Config& config_; + ACE_Reactor& reactor_; + RelayThreadMonitor& monitor_; + OpenDDS::DCPS::ThreadStatusManager& thread_status_manager_ = TheServiceParticipant->get_thread_status_manager(); +}; + +struct RunThreadMonitor { + + RunThreadMonitor(OpenDDS::DCPS::ThreadStatusManager& thread_status_manager, RelayThreadMonitor& monitor) + : should_run_(thread_status_manager.update_thread_status()) + , monitor_(monitor) + , status_(should_run_ ? monitor.start() : EXIT_SUCCESS) + {} + + ~RunThreadMonitor() + { + if (should_run_) { + monitor_.stop(); + } + } + + const bool should_run_; + RelayThreadMonitor& monitor_; + const int status_; +}; + +int ThreadPool::run() +{ + RunThreadMonitor rtm{thread_status_manager_, monitor_}; + if (rtm.status_ != EXIT_SUCCESS) { + ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: RtpsRelay::ThreadPool::run - failed to start Relay Thread Monitor\n")); + return EXIT_FAILURE; + } + + const auto threads = config_.handler_threads(); + if (threads == 1) { + return svc(); + } + + const auto status = activate(THR_NEW_LWP | THR_JOINABLE | THR_INHERIT_SCHED, static_cast(threads)); + if (status != EXIT_SUCCESS) { + ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: RtpsRelay::ThreadPool::run - failed to start thread pool: %m\n")); + return status; + } + + return wait(); +} + +int ThreadPool::svc() +{ + const auto has_run_time = !config_.run_time().is_zero(); + const auto end_time = OpenDDS::DCPS::MonotonicTimePoint::now() + config_.run_time(); + + if (thread_status_manager_.update_thread_status()) { + OpenDDS::DCPS::ThreadStatusManager::Start thread_status_monitoring_active(thread_status_manager_, "RtpsRelay Event Loop"); + + while (!has_run_time || OpenDDS::DCPS::MonotonicTimePoint::now() < end_time) { + auto t = thread_status_manager_.thread_status_interval().value(); + OpenDDS::DCPS::ThreadStatusManager::Sleeper s(thread_status_manager_); + if (reactor_.run_reactor_event_loop(t, 0) != 0) { + break; + } + } + + } else if (has_run_time) { + while (OpenDDS::DCPS::MonotonicTimePoint::now() < end_time) { + auto t = (end_time - OpenDDS::DCPS::MonotonicTimePoint::now()).value(); + if (reactor_.run_reactor_event_loop(t, 0) != 0) { + break; + } + } + + } else { + reactor_.run_reactor_event_loop(); + } + return EXIT_SUCCESS; +} + +ACE_Reactor_Impl* RelayEventLoop::make_reactor_impl(const Config& config) +{ + return config.handler_threads() == 1 ? new ACE_Select_Reactor : new ACE_TP_Reactor; +} + +int RelayEventLoop::run(const Config& config, ACE_Reactor& reactor, RelayThreadMonitor& monitor) +{ + return ThreadPool{config, reactor, monitor}.run(); +} + +} diff --git a/tools/rtpsrelay/RelayEventLoop.h b/tools/rtpsrelay/RelayEventLoop.h new file mode 100644 index 0000000000..25e11f273e --- /dev/null +++ b/tools/rtpsrelay/RelayEventLoop.h @@ -0,0 +1,23 @@ +/* + * Distributed under the OpenDDS License. + * See: http://www.opendds.org/license.html + */ +#ifndef RTPSRELAY_RELAY_EVENT_LOOP_H_ +#define RTPSRELAY_RELAY_EVENT_LOOP_H_ + +#include "Config.h" +#include "RelayThreadMonitor.h" + +#include + +namespace RtpsRelay { +namespace RelayEventLoop { + +ACE_Reactor_Impl* make_reactor_impl(const Config& config); + +int run(const Config& config, ACE_Reactor& reactor, RelayThreadMonitor& monitor); + +} +} + +#endif diff --git a/tools/rtpsrelay/RtpsRelay.cpp b/tools/rtpsrelay/RtpsRelay.cpp index f1946bb57c..245e129950 100644 --- a/tools/rtpsrelay/RtpsRelay.cpp +++ b/tools/rtpsrelay/RtpsRelay.cpp @@ -10,6 +10,7 @@ #include "ParticipantStatisticsReporter.h" #include "PublicationListener.h" #include "RelayAddressListener.h" +#include "RelayEventLoop.h" #include "RelayHandler.h" #include "RelayHttpMetaDiscovery.h" #include "RelayPartitionTable.h" @@ -38,7 +39,6 @@ #include #include #include -#include #include #include @@ -229,6 +229,9 @@ int run(int argc, ACE_TCHAR* argv[]) } else if ((arg = args.get_the_parameter("-RunTime"))) { config.run_time(OpenDDS::DCPS::TimeDuration(ACE_OS::atoi(arg))); args.consume_arg(); + } else if ((arg = args.get_the_parameter("-HandlerThreads"))) { + config.handler_threads(std::atoi(arg)); + args.consume_arg(); } else if ((arg = args.get_the_parameter("-MaxIpsPerClient"))) { config.max_ips_per_client(ACE_OS::atoi(arg)); args.consume_arg(); @@ -737,7 +740,7 @@ int run(int argc, ACE_TCHAR* argv[]) RelayParticipantStatusReporter relay_participant_status_reporter(config, relay_participant_status_writer, relay_statistics_reporter); RelayThreadMonitor* relay_thread_monitor = new RelayThreadMonitor(config); GuidAddrSet guid_addr_set(config, rtps_discovery, relay_participant_status_reporter, relay_statistics_reporter, *relay_thread_monitor); - ACE_Reactor reactor_(new ACE_Select_Reactor, true); + ACE_Reactor reactor_(RelayEventLoop::make_reactor_impl(config), true); const auto reactor = &reactor_; GuidPartitionTable guid_partition_table(config, spdp_horizontal_addr, relay_partitions_writer, spdp_replay_writer); RelayPartitionTable relay_partition_table; @@ -961,37 +964,9 @@ int run(int argc, ACE_TCHAR* argv[]) } ACE_DEBUG((LM_INFO, "(%P|%t) INFO: Meta Discovery listening on %C\n", OpenDDS::DCPS::LogAddr(meta_discovery_addr).c_str())); - const bool has_run_time = !config.run_time().is_zero(); - const OpenDDS::DCPS::MonotonicTimePoint end_time = OpenDDS::DCPS::MonotonicTimePoint::now() + config.run_time(); - - OpenDDS::DCPS::ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager(); - if (thread_status_manager.update_thread_status()) { - if (relay_thread_monitor->start() == -1) { - ACE_ERROR((LM_ERROR, "(%P:%t) ERROR: failed to start Relay Thread Monitor\n")); - return EXIT_FAILURE; - } - - OpenDDS::DCPS::ThreadStatusManager::Start thread_status_monitoring_active(thread_status_manager, "RtpsRelay Main"); - - while (!has_run_time || OpenDDS::DCPS::MonotonicTimePoint::now() < end_time) { - ACE_Time_Value t = thread_status_manager.thread_status_interval().value(); - OpenDDS::DCPS::ThreadStatusManager::Sleeper s(thread_status_manager); - if (reactor->run_reactor_event_loop(t, 0) != 0) { - break; - } - } - - relay_thread_monitor->stop(); - } else if (has_run_time) { - while (OpenDDS::DCPS::MonotonicTimePoint::now() < end_time) { - ACE_Time_Value t = (end_time - OpenDDS::DCPS::MonotonicTimePoint::now()).value(); - if (reactor->run_reactor_event_loop(t, 0) != 0) { - break; - } - } - - } else { - reactor->run_reactor_event_loop(); + const auto status = RelayEventLoop::run(config, *reactor, *relay_thread_monitor); + if (status != EXIT_SUCCESS) { + return status; } application_participant->delete_contained_entities();