From dff29b5b3b64dede337988b3a0b1aed1cf55a413 Mon Sep 17 00:00:00 2001 From: Adam Mitz Date: Thu, 19 Sep 2024 20:08:33 +0000 Subject: [PATCH] RtpsRelay cleanup and simplification Make use of C++11 features --- tools/dds/rtpsrelaylib/Name.cpp | 7 +- tools/dds/rtpsrelaylib/Name.h | 16 +-- tools/dds/rtpsrelaylib/PartitionIndex.h | 30 ++--- tools/dds/rtpsrelaylib/Utility.h | 2 +- tools/rtpsrelay/GuidAddrSet.cpp | 97 +++++++++++++++ tools/rtpsrelay/GuidAddrSet.h | 124 ++++--------------- tools/rtpsrelay/GuidPartitionTable.cpp | 127 +++++++++++++++++--- tools/rtpsrelay/GuidPartitionTable.h | 110 +++-------------- tools/rtpsrelay/HandlerStatisticsReporter.h | 2 +- tools/rtpsrelay/README.rst | 3 + tools/rtpsrelay/ReaderListenerBase.h | 2 - tools/rtpsrelay/RelayHandler.h | 10 +- tools/rtpsrelay/RelayHttpMetaDiscovery.cpp | 16 +-- tools/rtpsrelay/RelayHttpMetaDiscovery.h | 20 ++- tools/rtpsrelay/RelayPartitionTable.h | 14 +-- tools/rtpsrelay/RelayStatisticsReporter.h | 2 +- tools/rtpsrelay/RelayStatusReporter.cpp | 3 +- tools/rtpsrelay/RelayStatusReporter.h | 2 +- 18 files changed, 303 insertions(+), 284 deletions(-) diff --git a/tools/dds/rtpsrelaylib/Name.cpp b/tools/dds/rtpsrelaylib/Name.cpp index 4e2bee23ea6..ffdbe6ce024 100644 --- a/tools/dds/rtpsrelaylib/Name.cpp +++ b/tools/dds/rtpsrelaylib/Name.cpp @@ -72,7 +72,7 @@ void Name::parse(Name& name, const std::string& buffer, size_t& idx) char Name::parse_character(Name& name, const std::string& buffer, size_t& idx) { - char c = buffer[idx++]; + const auto c = buffer[idx++]; if (c == '\\') { if (idx == buffer.size()) { name.is_valid_ = false; @@ -123,8 +123,7 @@ void Name::parse_character_class_tail(Name& name, const std::string& buffer, siz return; } - char c = buffer[idx]; - if (c == ']') { + if (buffer[idx] == ']') { ++idx; return; } @@ -154,7 +153,7 @@ void Name::parse_character_or_range(Name& name, const std::string& buffer, size_ return; } - char last = parse_character(name, buffer, idx); + const auto last = parse_character(name, buffer, idx); if (first > last) { name.is_valid_ = false; return; diff --git a/tools/dds/rtpsrelaylib/Name.h b/tools/dds/rtpsrelaylib/Name.h index 3d0c616d81d..bb7e632b9ee 100644 --- a/tools/dds/rtpsrelaylib/Name.h +++ b/tools/dds/rtpsrelaylib/Name.h @@ -3,6 +3,8 @@ #include "export.h" +#include +#include #include #include #include @@ -73,7 +75,7 @@ class OpenDDS_RtpsRelayLib_Export Atom { }; struct AtomHash { - std::size_t operator() (const Atom& atom) const + std::size_t operator()(const Atom& atom) const { std::size_t result = atom.kind(); result ^= (atom.character() << 8); @@ -88,12 +90,12 @@ OpenDDS_RtpsRelayLib_Export std::ostream& operator<<(std::ostream& out, const At class OpenDDS_RtpsRelayLib_Export Name { public: - typedef std::vector Atoms; - typedef Atoms::const_iterator const_iterator; + using Atoms = std::vector; + using const_iterator = Atoms::const_iterator; - Name() : is_pattern_(false), is_valid_(true) {} + Name() = default; - explicit Name(const std::string& name) : is_pattern_(false), is_valid_(true) + explicit Name(const std::string& name) { size_t idx = 0; parse(*this, name, idx); @@ -129,8 +131,8 @@ class OpenDDS_RtpsRelayLib_Export Name { private: Atoms atoms_; - bool is_pattern_; - bool is_valid_; + bool is_pattern_ = false; + bool is_valid_ = true; static void parse(Name& name, const std::string& buffer, size_t& idx); static char parse_character(Name& name, const std::string& buffer, size_t& idx); diff --git a/tools/dds/rtpsrelaylib/PartitionIndex.h b/tools/dds/rtpsrelaylib/PartitionIndex.h index 2a4d577435a..57f8047c22c 100644 --- a/tools/dds/rtpsrelaylib/PartitionIndex.h +++ b/tools/dds/rtpsrelaylib/PartitionIndex.h @@ -26,14 +26,15 @@ struct Identity { template class TrieNode { public: - typedef std::shared_ptr NodePtr; + using NodePtr = std::shared_ptr; + using Value = typename T::value_type; - static void insert(NodePtr node, const Name& name, const typename T::value_type& guid) + static void insert(NodePtr node, const Name& name, const Value& guid) { for (const auto& atom : name) { const auto iter = node->children_.find(atom); if (iter == node->children_.end()) { - NodePtr child(new TrieNode()); + NodePtr child = std::make_shared(); node->children_[atom] = child; node = std::move(child); } else { @@ -44,7 +45,7 @@ class TrieNode { node->guids_.insert(guid); } - static void remove(NodePtr node, const Name& name, const typename T::value_type& guid) + static void remove(NodePtr node, const Name& name, const Value& guid) { remove(std::move(node), name.begin(), name.end(), guid); } @@ -64,12 +65,11 @@ class TrieNode { } private: - typedef std::unordered_map ChildrenType; + using ChildrenType = std::unordered_map; ChildrenType children_; T guids_; - static void insert_guids(NodePtr node, - T& guids) + static void insert_guids(NodePtr node, T& guids) { std::transform(node->guids_.begin(), node->guids_.end(), std::inserter(guids, guids.begin()), Transformer()); } @@ -120,8 +120,7 @@ class TrieNode { } } - static void lookup_globs(NodePtr node, - T& guids) + static void lookup_globs(NodePtr node, T& guids) { for (const auto& pos : node->children_) { if (pos.first.kind() == Atom::GLOB) { @@ -189,7 +188,7 @@ class TrieNode { static void remove(NodePtr node, Name::const_iterator begin, Name::const_iterator end, - const typename T::value_type& guid) + const Value& guid) { if (begin == end) { node->guids_.erase(guid); @@ -210,19 +209,20 @@ class TrieNode { template class PartitionIndex { public: - typedef TrieNode TrieNodeT; + using TrieNodeT = TrieNode; + using Value = typename T::value_type; PartitionIndex() - : root_(new TrieNodeT()) + : root_(std::make_shared()) {} - void insert(const std::string& name, const typename T::value_type& guid) + void insert(const std::string& name, const Value& guid) { TrieNodeT::insert(root_, Name(name), guid); cache_.clear(); } - void remove(const std::string& name, const typename T::value_type& guid) + void remove(const std::string& name, const Value& guid) { TrieNodeT::remove(root_, Name(name), guid); cache_.clear(); @@ -269,7 +269,7 @@ class PartitionIndex { private: typename TrieNodeT::NodePtr root_; - typedef std::unordered_map Cache; + using Cache = std::unordered_map; mutable Cache cache_; }; diff --git a/tools/dds/rtpsrelaylib/Utility.h b/tools/dds/rtpsrelaylib/Utility.h index d7fbd928eb5..4cc4d3aa0c6 100644 --- a/tools/dds/rtpsrelaylib/Utility.h +++ b/tools/dds/rtpsrelaylib/Utility.h @@ -16,7 +16,7 @@ namespace RtpsRelay { -typedef std::set StringSet; +using StringSet = std::set; inline std::string guid_to_string(const OpenDDS::DCPS::GUID_t& a_guid) { diff --git a/tools/rtpsrelay/GuidAddrSet.cpp b/tools/rtpsrelay/GuidAddrSet.cpp index d17e21d12dc..c8d76a5ec22 100644 --- a/tools/rtpsrelay/GuidAddrSet.cpp +++ b/tools/rtpsrelay/GuidAddrSet.cpp @@ -7,6 +7,103 @@ namespace RtpsRelay { +bool AddrSetStats::upsert_address(const AddrPort& remote_address, + const OpenDDS::DCPS::MonotonicTimePoint& now, + const OpenDDS::DCPS::MonotonicTimePoint& expiration, + size_t max_ip_addresses) +{ + ACE_INET_Addr addr_only(remote_address.addr); + addr_only.set_port_number(0); + auto iter = ip_to_ports.find(addr_only); + if (iter == ip_to_ports.end()) { + if (max_ip_addresses > 0 && ip_to_ports.size() == max_ip_addresses) { + return false; + } + iter = ip_to_ports.insert(std::make_pair(addr_only, PortSet())).first; + } + + relay_stats_reporter_.max_ips_per_client(static_cast(ip_to_ports.size()), now); + + std::map* port_map = nullptr; + switch (remote_address.port) { + case SPDP: + port_map = &iter->second.spdp_ports; + break; + case SEDP: + port_map = &iter->second.sedp_ports; + break; + case DATA: + port_map = &iter->second.data_ports; + break; + } + if (!port_map) { + return false; + } + const auto pair = port_map->insert(std::make_pair(remote_address.addr.get_port_number(), expiration)); + if (pair.second) { + return true; + } + pair.first->second = expiration; + return false; +} + +bool AddrSetStats::remove_if_expired(const AddrPort& remote_address, const OpenDDS::DCPS::MonotonicTimePoint& now, + bool& ip_now_unused, OpenDDS::DCPS::MonotonicTimePoint& updated_expiration) +{ + ACE_INET_Addr addr_only(remote_address.addr); + addr_only.set_port_number(0); + const auto iter = ip_to_ports.find(addr_only); + if (iter == ip_to_ports.end()) { + return false; + } + + std::map* port_map = nullptr; + switch (remote_address.port) { + case SPDP: + port_map = &iter->second.spdp_ports; + break; + case SEDP: + port_map = &iter->second.sedp_ports; + break; + case DATA: + port_map = &iter->second.data_ports; + break; + } + + if (!port_map) { + return false; + } + + const auto port_iter = port_map->find(remote_address.addr.get_port_number()); + if (port_iter == port_map->end()) { + return false; + } + + if (port_iter->second <= now) { + port_map->erase(port_iter); + if (iter->second.empty()) { + ip_to_ports.erase(addr_only); + ip_now_unused = true; + } + return true; + } + updated_expiration = port_iter->second; + return false; +} + +GuidAddrSet::CreatedAddrSetStats GuidAddrSet::find_or_create(const OpenDDS::DCPS::GUID_t& guid, + const OpenDDS::DCPS::MonotonicTimePoint& now) +{ + auto it = guid_addr_set_map_.find(guid); + const bool create = it == guid_addr_set_map_.end(); + if (create) { + const auto it_bool_pair = + guid_addr_set_map_.insert(std::make_pair(guid, AddrSetStats(guid, now, relay_stats_reporter_))); + it = it_bool_pair.first; + } + return {create, it->second}; +} + ParticipantStatisticsReporter& GuidAddrSet::record_activity(const AddrPort& remote_address, const OpenDDS::DCPS::MonotonicTimePoint& now, diff --git a/tools/rtpsrelay/GuidAddrSet.h b/tools/rtpsrelay/GuidAddrSet.h index c8c1d36e794..f0df65913e1 100644 --- a/tools/rtpsrelay/GuidAddrSet.h +++ b/tools/rtpsrelay/GuidAddrSet.h @@ -20,13 +20,10 @@ struct PortSet { }; struct InetAddrHash { - std::size_t operator()(const ACE_INET_Addr& addr) const noexcept - { - return addr.hash(); - } + std::size_t operator()(const ACE_INET_Addr& addr) const noexcept { return addr.hash(); } }; -typedef std::unordered_map IpToPorts; +using IpToPorts = std::unordered_map; struct AddrSetStats { bool allow_rtps; @@ -54,42 +51,7 @@ struct AddrSetStats { bool upsert_address(const AddrPort& remote_address, const OpenDDS::DCPS::MonotonicTimePoint& now, const OpenDDS::DCPS::MonotonicTimePoint& expiration, - size_t max_ip_addresses) - { - ACE_INET_Addr addr_only(remote_address.addr); - addr_only.set_port_number(0); - auto iter = ip_to_ports.find(addr_only); - if (iter == ip_to_ports.end()) { - if (max_ip_addresses > 0 && ip_to_ports.size() == max_ip_addresses) { - return false; - } - iter = ip_to_ports.insert(std::make_pair(addr_only, PortSet())).first; - } - - relay_stats_reporter_.max_ips_per_client(static_cast(ip_to_ports.size()), now); - - std::map* port_map = nullptr; - switch (remote_address.port) { - case SPDP: - port_map = &iter->second.spdp_ports; - break; - case SEDP: - port_map = &iter->second.sedp_ports; - break; - case DATA: - port_map = &iter->second.data_ports; - break; - } - if (!port_map) { - return false; - } - const auto pair = port_map->insert(std::make_pair(remote_address.addr.get_port_number(), expiration)); - if (pair.second) { - return true; - } - pair.first->second = expiration; - return false; - } + size_t max_ip_addresses); template void foreach_addr(Port port, const Function& func) const @@ -118,48 +80,7 @@ struct AddrSetStats { } bool remove_if_expired(const AddrPort& remote_address, const OpenDDS::DCPS::MonotonicTimePoint& now, - bool& ip_now_unused, OpenDDS::DCPS::MonotonicTimePoint& updated_expiration) - { - ACE_INET_Addr addr_only(remote_address.addr); - addr_only.set_port_number(0); - const auto iter = ip_to_ports.find(addr_only); - if (iter == ip_to_ports.end()) { - return false; - } - - std::map* port_map = nullptr; - switch (remote_address.port) { - case SPDP: - port_map = &iter->second.spdp_ports; - break; - case SEDP: - port_map = &iter->second.sedp_ports; - break; - case DATA: - port_map = &iter->second.data_ports; - break; - } - - if (!port_map) { - return false; - } - - const auto port_iter = port_map->find(remote_address.addr.get_port_number()); - if (port_iter == port_map->end()) { - return false; - } - - if (port_iter->second <= now) { - port_map->erase(port_iter); - if (iter->second.empty()) { - ip_to_ports.erase(addr_only); - ip_now_unused = true; - } - return true; - } - updated_expiration = port_iter->second; - return false; - } + bool& ip_now_unused, OpenDDS::DCPS::MonotonicTimePoint& updated_expiration); bool has_discovery_addrs() const { @@ -186,7 +107,7 @@ struct AddrSetStats { return &data_stats_reporter; } - return 0; + return nullptr; } OpenDDS::DCPS::TimeDuration get_session_time(const OpenDDS::DCPS::MonotonicTimePoint& now) const @@ -217,7 +138,7 @@ struct Remote { }; struct RemoteHash { - size_t operator() (const Remote& remote) const + size_t operator()(const Remote& remote) const { return remote.addr_.hash() ^ ((static_cast(remote.guid_prefix_prefix_[0]) << 0) | @@ -234,7 +155,7 @@ class RelayParticipantStatusReporter; class GuidAddrSet { public: - typedef std::unordered_map GuidAddrSetMap; + using GuidAddrSetMap = std::unordered_map; GuidAddrSet(const Config& config, OpenDDS::RTPS::RtpsDiscovery_rch rtps_discovery, @@ -362,17 +283,7 @@ class GuidAddrSet { private: CreatedAddrSetStats find_or_create(const OpenDDS::DCPS::GUID_t& guid, - const OpenDDS::DCPS::MonotonicTimePoint& now) - { - auto it = guid_addr_set_map_.find(guid); - const bool create = it == guid_addr_set_map_.end(); - if (create) { - const auto it_bool_pair = - guid_addr_set_map_.insert(std::make_pair(guid, AddrSetStats(guid, now, relay_stats_reporter_))); - it = it_bool_pair.first; - } - return CreatedAddrSetStats(create, it->second); - } + const OpenDDS::DCPS::MonotonicTimePoint& now); ParticipantStatisticsReporter& record_activity(const AddrPort& remote_address, @@ -427,7 +338,6 @@ class GuidAddrSet { OpenDDS::DCPS::GuidPrefix_t prefix_; OpenDDS::DCPS::MonotonicTimePoint admitted_; }; - typedef std::deque AdmissionControlQueue; const Config& config_; OpenDDS::RTPS::RtpsDiscovery_rch rtps_discovery_; @@ -438,17 +348,25 @@ class GuidAddrSet { RelayHandler* sedp_vertical_handler_; RelayHandler* data_vertical_handler_; GuidAddrSetMap guid_addr_set_map_; - typedef std::unordered_map RemoteMap; + + using RemoteMap = std::unordered_map; RemoteMap remote_map_; - typedef std::list > DeactivationGuidQueue; + + using DeactivationGuidQueue = std::list>; DeactivationGuidQueue deactivation_guid_queue_; - typedef std::list > ExpirationGuidAddrQueue; + + using ExpirationGuidAddrQueue = std::list>; ExpirationGuidAddrQueue expiration_guid_addr_queue_; + + using AdmissionControlQueue = std::deque; AdmissionControlQueue admission_control_queue_; - typedef std::unordered_map RejectedAddressMapType; + + using RejectedAddressMapType = std::unordered_map; RejectedAddressMapType rejected_address_map_; - typedef std::list RejectedAddressExpirationQueue; + + using RejectedAddressExpirationQueue = std::list; RejectedAddressExpirationQueue rejected_address_expiration_queue_; + mutable ACE_Thread_Mutex mutex_; bool participant_admission_limit_reached_; }; diff --git a/tools/rtpsrelay/GuidPartitionTable.cpp b/tools/rtpsrelay/GuidPartitionTable.cpp index e77afa137c3..6d325fecf88 100644 --- a/tools/rtpsrelay/GuidPartitionTable.cpp +++ b/tools/rtpsrelay/GuidPartitionTable.cpp @@ -36,7 +36,6 @@ GuidPartitionTable::Result GuidPartitionTable::insert(const OpenDDS::DCPS::GUID_ std::set_difference(x.begin(), x.end(), parts.begin(), parts.end(), std::back_inserter(to_remove)); if (to_add.empty() && to_remove.empty()) { - // No change. return NO_CHANGE; } @@ -45,28 +44,26 @@ GuidPartitionTable::Result GuidPartitionTable::insert(const OpenDDS::DCPS::GUID_ populate_replay(spdp_replay, guid, to_add); StringSet globally_new; - { - x.insert(to_add.begin(), to_add.end()); - for (const auto& part : to_add) { - const auto q = partition_to_guid_.insert(std::make_pair(part, OrderedGuidSet())); - q.first->second.insert(guid); - partition_index_.insert(part, guid); - if (q.second) { - globally_new.insert(part); - } - } - for (const auto& part : to_remove) { - x.erase(part); - partition_to_guid_[part].erase(guid); - partition_index_.remove(part, guid); - if (partition_to_guid_[part].empty()) { - partition_to_guid_.erase(part); - } + x.insert(to_add.begin(), to_add.end()); + for (const auto& part : to_add) { + const auto q = partition_to_guid_.insert(std::make_pair(part, OrderedGuidSet())); + q.first->second.insert(guid); + partition_index_.insert(part, guid); + if (q.second) { + globally_new.insert(part); } - if (x.empty()) { - guid_to_partitions_.erase(r.first); + } + for (const auto& part : to_remove) { + x.erase(part); + partition_to_guid_[part].erase(guid); + partition_index_.remove(part, guid); + if (partition_to_guid_[part].empty()) { + partition_to_guid_.erase(part); } } + if (x.empty()) { + guid_to_partitions_.erase(r.first); + } add_new(relay_partitions, globally_new); } @@ -92,4 +89,94 @@ GuidPartitionTable::Result GuidPartitionTable::insert(const OpenDDS::DCPS::GUID_ return result; } +void GuidPartitionTable::remove(const OpenDDS::DCPS::GUID_t& guid) +{ + std::vector relay_partitions; + { + ACE_GUARD(ACE_Thread_Mutex, g, mutex_); + + StringSet defunct; + + const auto pos = guid_to_partitions_.find(guid); + if (pos != guid_to_partitions_.end()) { + for (const auto& partition : pos->second) { + partition_index_.remove(partition, guid); + const auto pos2 = partition_to_guid_.find(partition); + if (pos2 != partition_to_guid_.end()) { + pos2->second.erase(guid); + if (pos2->second.empty()) { + defunct.insert(pos2->first); + partition_to_guid_.erase(pos2); + } + } + } + guid_to_partitions_.erase(pos); + remove_from_cache(guid); + } + + remove_defunct(relay_partitions, defunct); + } + + { + ACE_GUARD(ACE_Thread_Mutex, g, write_mutex_); + write_relay_partitions(relay_partitions); + } +} + +void GuidPartitionTable::lookup(StringSet& partitions, const OpenDDS::DCPS::GUID_t& from) const +{ + ACE_GUARD(ACE_Thread_Mutex, g, mutex_); + + // Match on the prefix. + const auto prefix = make_id(from, OpenDDS::DCPS::ENTITYID_UNKNOWN); + + const auto p = guid_to_partitions_cache_.find(prefix); + if (p != guid_to_partitions_cache_.end()) { + partitions.insert(p->second.begin(), p->second.end()); + return; + } + + auto& c = guid_to_partitions_cache_[prefix]; + + for (auto pos = guid_to_partitions_.lower_bound(prefix), limit = guid_to_partitions_.end(); + pos != limit && std::memcmp(pos->first.guidPrefix, prefix.guidPrefix, sizeof(prefix.guidPrefix)) == 0; ++pos) { + partitions.insert(pos->second.begin(), pos->second.end()); + c.insert(pos->second.begin(), pos->second.end()); + } + + if (!config_.allow_empty_partition()) { + partitions.erase(""); + c.erase(""); + } +} + +void GuidPartitionTable::populate_replay(SpdpReplay& spdp_replay, + const OpenDDS::DCPS::GUID_t& guid, + const std::vector& to_add) const +{ + // The partitions are new for this reader/writer. + // Check if they are new for the participant. + + const auto prefix = make_id(guid, OpenDDS::DCPS::ENTITYID_UNKNOWN); + + for (const auto& part : to_add) { + const auto pos1 = partition_to_guid_.find(part); + if (pos1 == partition_to_guid_.end()) { + if (config_.allow_empty_partition() || !part.empty()) { + spdp_replay.partitions().push_back(part); + } + continue; + } + + const auto pos2 = pos1->second.lower_bound(prefix); + + if (pos2 == pos1->second.end() || + std::memcmp(pos2->guidPrefix, prefix.guidPrefix, sizeof(prefix.guidPrefix)) != 0) { + if (config_.allow_empty_partition() || !part.empty()) { + spdp_replay.partitions().push_back(part); + } + } + } +} + } diff --git a/tools/rtpsrelay/GuidPartitionTable.h b/tools/rtpsrelay/GuidPartitionTable.h index 6befd323a45..1994591cb8f 100644 --- a/tools/rtpsrelay/GuidPartitionTable.h +++ b/tools/rtpsrelay/GuidPartitionTable.h @@ -41,67 +41,10 @@ class GuidPartitionTable { Result insert(const OpenDDS::DCPS::GUID_t& guid, const DDS::StringSeq& partitions); - void remove(const OpenDDS::DCPS::GUID_t& guid) - { - std::vector relay_partitions; - { - ACE_GUARD(ACE_Thread_Mutex, g, mutex_); - - StringSet defunct; - - const auto pos = guid_to_partitions_.find(guid); - if (pos != guid_to_partitions_.end()) { - for (const auto& partition : pos->second) { - partition_index_.remove(partition, guid); - const auto pos2 = partition_to_guid_.find(partition); - if (pos2 != partition_to_guid_.end()) { - pos2->second.erase(guid); - if (pos2->second.empty()) { - defunct.insert(pos2->first); - partition_to_guid_.erase(pos2); - } - } - } - guid_to_partitions_.erase(pos); - remove_from_cache(guid); - } - - remove_defunct(relay_partitions, defunct); - } - - { - ACE_GUARD(ACE_Thread_Mutex, g, write_mutex_); - write_relay_partitions(relay_partitions); - } - } - - // Look up the partitions for the participant from. - void lookup(StringSet& partitions, const OpenDDS::DCPS::GUID_t& from) const - { - ACE_GUARD(ACE_Thread_Mutex, g, mutex_); - - // Match on the prefix. - const OpenDDS::DCPS::GUID_t prefix = make_id(from, OpenDDS::DCPS::ENTITYID_UNKNOWN); - - const auto p = guid_to_partitions_cache_.find(prefix); - if (p != guid_to_partitions_cache_.end()) { - partitions.insert(p->second.begin(), p->second.end()); - return; - } - - auto& c = guid_to_partitions_cache_[prefix]; + void remove(const OpenDDS::DCPS::GUID_t& guid); - for (auto pos = guid_to_partitions_.lower_bound(prefix), limit = guid_to_partitions_.end(); - pos != limit && std::memcmp(pos->first.guidPrefix, prefix.guidPrefix, sizeof(prefix.guidPrefix)) == 0; ++pos) { - partitions.insert(pos->second.begin(), pos->second.end()); - c.insert(pos->second.begin(), pos->second.end()); - } - - if (!config_.allow_empty_partition()) { - partitions.erase(""); - c.erase(""); - } - } + // Look up the partitions for the participant "from". + void lookup(StringSet& partitions, const OpenDDS::DCPS::GUID_t& from) const; /// Add to 'guids' the GUIDs of participants that should receive messages based on 'partitions'. /// If 'allowed' is empty, it has no effect. Otherwise all entires added to 'guids' must be in 'allowed'. @@ -122,38 +65,12 @@ class GuidPartitionTable { void remove_from_cache(const OpenDDS::DCPS::GUID_t& guid) { // Invalidate the cache. - const OpenDDS::DCPS::GUID_t prefix = make_id(guid, OpenDDS::DCPS::ENTITYID_UNKNOWN); - guid_to_partitions_cache_.erase(prefix); + guid_to_partitions_cache_.erase(make_id(guid, OpenDDS::DCPS::ENTITYID_UNKNOWN)); } void populate_replay(SpdpReplay& spdp_replay, const OpenDDS::DCPS::GUID_t& guid, - const std::vector& to_add) const - { - // The partitions are new for this reader/writer. - // Check if they are new for the participant. - - const OpenDDS::DCPS::GUID_t prefix = make_id(guid, OpenDDS::DCPS::ENTITYID_UNKNOWN); - - for (const auto& part : to_add) { - const auto pos1 = partition_to_guid_.find(part); - if (pos1 == partition_to_guid_.end()) { - if (config_.allow_empty_partition() || !part.empty()) { - spdp_replay.partitions().push_back(part); - } - continue; - } - - const auto pos2 = pos1->second.lower_bound(prefix); - - if (pos2 == pos1->second.end() || - std::memcmp(pos2->guidPrefix, prefix.guidPrefix, sizeof(prefix.guidPrefix)) != 0) { - if (config_.allow_empty_partition() || !part.empty()) { - spdp_replay.partitions().push_back(part); - } - } - } - } + const std::vector& to_add) const; void add_new(std::vector& relay_partitions, const StringSet& partitions) { @@ -245,22 +162,25 @@ class GuidPartitionTable { const std::string address_; RelayPartitionsDataWriter_var relay_partitions_writer_; - typedef std::vector Slots; + using Slots = std::vector; Slots slots_; - typedef std::list FreeSlotList; + + using FreeSlotList = std::list; FreeSlotList free_slot_list_; - typedef std::unordered_map PartitionToSlot; + + using PartitionToSlot = std::unordered_map; PartitionToSlot partition_to_slot_; SpdpReplayDataWriter_var spdp_replay_writer_; - typedef std::map GuidToPartitions; + using GuidToPartitions = std::map; GuidToPartitions guid_to_partitions_; - typedef std::unordered_map GuidToPartitionsCache; + + using GuidToPartitionsCache = std::unordered_map; mutable GuidToPartitionsCache guid_to_partitions_cache_; - typedef std::set OrderedGuidSet; - typedef std::unordered_map PartitionToGuid; + using OrderedGuidSet = std::set; + using PartitionToGuid = std::unordered_map; PartitionToGuid partition_to_guid_; PartitionIndex partition_index_; diff --git a/tools/rtpsrelay/HandlerStatisticsReporter.h b/tools/rtpsrelay/HandlerStatisticsReporter.h index e784d6ad6d3..24e2b4cc00c 100644 --- a/tools/rtpsrelay/HandlerStatisticsReporter.h +++ b/tools/rtpsrelay/HandlerStatisticsReporter.h @@ -143,7 +143,7 @@ class HandlerStatisticsReporter { const Config& config_; - typedef CommonIoStatsReportHelper Helper; + using Helper = CommonIoStatsReportHelper; HandlerStatistics log_handler_statistics_; Helper log_helper_; diff --git a/tools/rtpsrelay/README.rst b/tools/rtpsrelay/README.rst index a98141b4cc2..2dab4160356 100644 --- a/tools/rtpsrelay/README.rst +++ b/tools/rtpsrelay/README.rst @@ -2,6 +2,9 @@ RTPS Relay ========== +Note: More up-to-date information about the RtpsRelay is in the OpenDDS Developer's Guide +see docs/devguide/internet_enabled_rtps.rst + Motivation ========== diff --git a/tools/rtpsrelay/ReaderListenerBase.h b/tools/rtpsrelay/ReaderListenerBase.h index 0b94672678e..4033be870a3 100644 --- a/tools/rtpsrelay/ReaderListenerBase.h +++ b/tools/rtpsrelay/ReaderListenerBase.h @@ -6,8 +6,6 @@ namespace RtpsRelay { class ReaderListenerBase : public DDS::DataReaderListener { -public: - private: void on_requested_deadline_missed(DDS::DataReader_ptr /*reader*/, const DDS::RequestedDeadlineMissedStatus & /*status*/) override {} diff --git a/tools/rtpsrelay/RelayHandler.h b/tools/rtpsrelay/RelayHandler.h index 8f6e283b002..d9a7ec83598 100644 --- a/tools/rtpsrelay/RelayHandler.h +++ b/tools/rtpsrelay/RelayHandler.h @@ -75,7 +75,7 @@ class RelayHandler : public ACE_Event_Handler { , type(type) {} }; - typedef std::queue OutgoingType; + using OutgoingType = std::queue; OutgoingType outgoing_; mutable ACE_Thread_Mutex outgoing_mutex_; @@ -106,16 +106,14 @@ class VerticalHandler : public RelayHandler { const ACE_INET_Addr& application_participant_addr, HandlerStatisticsReporter& stats_reporter, OpenDDS::DCPS::Lockable_Message_Block_Ptr::Lock_Policy message_block_locking = OpenDDS::DCPS::Lockable_Message_Block_Ptr::Lock_Policy::No_Lock); + void stop(); void horizontal_handler(HorizontalHandler* horizontal_handler) { horizontal_handler_ = horizontal_handler; } void spdp_handler(SpdpHandler* spdp_handler) { spdp_handler_ = spdp_handler; } - GuidAddrSet& guid_addr_set() - { - return guid_addr_set_; - } + GuidAddrSet& guid_addr_set() { return guid_addr_set_; } void venqueue_message(const ACE_INET_Addr& addr, ParticipantStatisticsReporter& stats_reporter, @@ -237,7 +235,7 @@ class SpdpHandler : public VerticalHandler { const OpenDDS::DCPS::MonotonicTimePoint& now); private: - typedef std::vector ReplayQueue; + using ReplayQueue = std::vector; ReplayQueue replay_queue_; ACE_Thread_Mutex replay_queue_mutex_; diff --git a/tools/rtpsrelay/RelayHttpMetaDiscovery.cpp b/tools/rtpsrelay/RelayHttpMetaDiscovery.cpp index 7077fa48b1c..752c3fdc7c2 100644 --- a/tools/rtpsrelay/RelayHttpMetaDiscovery.cpp +++ b/tools/rtpsrelay/RelayHttpMetaDiscovery.cpp @@ -17,7 +17,9 @@ const HttpStatus HTTP_OK(200, "OK"); const HttpStatus HTTP_NOT_FOUND(404, "Not Found"); const HttpStatus HTTP_SERVICE_UNAVAILABLE(503, "Service Unavailable"); -const int HANDLER_ERROR = -1, HANDLER_REMOVE = -1, HANDLER_OK = 0; +namespace { + const int HANDLER_ERROR = -1, HANDLER_REMOVE = -1, HANDLER_OK = 0; +} int HttpConnection::open(void* x) { @@ -117,9 +119,9 @@ bool RelayHttpMetaDiscovery::requestIsComplete(const std::string& request, void RelayHttpMetaDiscovery::respond(std::stringstream& response) const { response << "HTTP/1.1 200 OK\r\n" - << "Content-Type: " << meta_discovery_content_type_ << "\r\n" - << "Content-Length: " << meta_discovery_content_.size() << "\r\n" - << "\r\n" + "Content-Type: " << meta_discovery_content_type_ << "\r\n" + "Content-Length: " << meta_discovery_content_.size() << "\r\n" + "\r\n" << meta_discovery_content_; } @@ -133,9 +135,9 @@ void RelayHttpMetaDiscovery::respondStatus(std::stringstream& response, const HttpStatus& status) const { response << "HTTP/1.1 " << status.status() << " " << status.message() << "\r\n" - << "Content-Type: text/plain\r\n" - << "Content-Length: " << status.message().size() << "\r\n" - << "\r\n" + "Content-Type: text/plain\r\n" + "Content-Length: " << status.message().size() << "\r\n" + "\r\n" << status.message(); } diff --git a/tools/rtpsrelay/RelayHttpMetaDiscovery.h b/tools/rtpsrelay/RelayHttpMetaDiscovery.h index 46278300ae6..d8aa545cd26 100644 --- a/tools/rtpsrelay/RelayHttpMetaDiscovery.h +++ b/tools/rtpsrelay/RelayHttpMetaDiscovery.h @@ -24,8 +24,8 @@ class HttpStatus { const std::string& message() const { return message_; } private: - int status_; - std::string message_; + const int status_; + const std::string message_; }; class RelayHttpMetaDiscovery; @@ -33,18 +33,13 @@ class RelayHttpMetaDiscovery; // ACE_NULL_SYNCH is appropriate since the reactor is single threaded. class HttpConnection : public ACE_Svc_Handler { public: - HttpConnection() - : relay_http_meta_discovery_(0) - , buffer_(BUFFER_SIZE) - {} - - int open(void* acceptor); // called by ACE_Acceptor + int open(void* acceptor) override; // called by ACE_Acceptor private: - int handle_input(ACE_HANDLE h); + int handle_input(ACE_HANDLE h) override; - const RelayHttpMetaDiscovery* relay_http_meta_discovery_; - ACE_Message_Block buffer_; + const RelayHttpMetaDiscovery* relay_http_meta_discovery_{nullptr}; + ACE_Message_Block buffer_{BUFFER_SIZE}; }; class RelayHttpMetaDiscovery : public ACE_Acceptor { @@ -61,6 +56,8 @@ class RelayHttpMetaDiscovery : public ACE_Acceptor AddressSet; -typedef std::pair SlotKey; +using AddressSet = std::set; +using SlotKey = std::pair; struct SlotKeyHash { - std::size_t operator() (const SlotKey& slot_key) const + std::size_t operator()(const SlotKey& slot_key) const { return std::hash()(slot_key.first) ^ std::hash()(slot_key.second); } @@ -65,12 +65,12 @@ class RelayPartitionTable { } private: - typedef std::unordered_map NameToAddress; - typedef std::unordered_map RelayToAddress; + using NameToAddress = std::unordered_map; + using RelayToAddress = std::unordered_map; RelayToAddress relay_to_address_; struct Map { - Map(RelayToAddress& relay_to_address) + explicit Map(RelayToAddress& relay_to_address) : relay_to_address_(relay_to_address) {} @@ -129,7 +129,7 @@ class RelayPartitionTable { PartitionIndex partition_index_; - typedef std::unordered_map RelayToPartitions; + using RelayToPartitions = std::unordered_map; RelayToPartitions relay_to_partitions_; }; diff --git a/tools/rtpsrelay/RelayStatisticsReporter.h b/tools/rtpsrelay/RelayStatisticsReporter.h index d499ec0091d..c6ff19b0151 100644 --- a/tools/rtpsrelay/RelayStatisticsReporter.h +++ b/tools/rtpsrelay/RelayStatisticsReporter.h @@ -324,7 +324,7 @@ class RelayStatisticsReporter { mutable ACE_Thread_Mutex mutex_; const Config& config_; - typedef CommonIoStatsReportHelper Helper; + using Helper = CommonIoStatsReportHelper; RelayStatistics log_relay_statistics_; Helper log_helper_; diff --git a/tools/rtpsrelay/RelayStatusReporter.cpp b/tools/rtpsrelay/RelayStatusReporter.cpp index 6c405aff556..bef13206bd1 100644 --- a/tools/rtpsrelay/RelayStatusReporter.cpp +++ b/tools/rtpsrelay/RelayStatusReporter.cpp @@ -15,10 +15,9 @@ RelayStatusReporter::RelayStatusReporter(const Config& config, if (config.publish_relay_status() != OpenDDS::DCPS::TimeDuration::zero_value) { this->reactor()->schedule_timer(this, 0, ACE_Time_Value(), config.publish_relay_status().value()); } - } -int RelayStatusReporter::handle_timeout(const ACE_Time_Value& /*now*/, const void* /*token*/) +int RelayStatusReporter::handle_timeout(const ACE_Time_Value&, const void*) { OpenDDS::DCPS::ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager()); diff --git a/tools/rtpsrelay/RelayStatusReporter.h b/tools/rtpsrelay/RelayStatusReporter.h index 3d2ca943e0f..0c9b8adfbe6 100644 --- a/tools/rtpsrelay/RelayStatusReporter.h +++ b/tools/rtpsrelay/RelayStatusReporter.h @@ -15,9 +15,9 @@ class RelayStatusReporter : public ACE_Event_Handler { RelayStatusDataWriter_var writer, ACE_Reactor* reactor); +private: int handle_timeout(const ACE_Time_Value& now, const void* token) override; -private: GuidAddrSet& guid_addr_set_; RelayStatusDataWriter_var writer_; RelayStatus relay_status_;