Skip to content

Commit

Permalink
Adapting to eProsima/Fast-DDS#325
Browse files Browse the repository at this point in the history
* Topic info kept inside participant listener.
* Adapting to new listener interfaces.
  • Loading branch information
MiguelCompany committed Nov 27, 2018
1 parent 6ae6b04 commit 146d66e
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 364 deletions.
4 changes: 4 additions & 0 deletions rmw_fastrtps_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

if(SECURITY)
find_package(OpenSSL REQUIRED)
endif()

find_package(ament_cmake_ros REQUIRED)

find_package(rcutils REQUIRED)
Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_dynamic_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

if(SECURITY)
find_package(OpenSSL REQUIRED)
endif()

find_package(ament_cmake_ros REQUIRED)

find_package(rcutils REQUIRED)
Expand Down
4 changes: 4 additions & 0 deletions rmw_fastrtps_shared_cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()

if(SECURITY)
find_package(OpenSSL REQUIRED)
endif()

find_package(ament_cmake_ros REQUIRED)

find_package(rcutils REQUIRED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,21 @@
#define RMW_FASTRTPS_SHARED_CPP__CUSTOM_PARTICIPANT_INFO_HPP_

#include <map>
#include <mutex>
#include <string>
#include <vector>

#include "fastrtps/attributes/ParticipantAttributes.h"
#include "fastrtps/participant/Participant.h"
#include "fastrtps/participant/ParticipantListener.h"

#include "rcutils/logging_macros.h"

#include "rmw/impl/cpp/key_value.hpp"
#include "rmw/rmw.h"

#include "rmw_common.hpp"

class ParticipantListener;
class ReaderInfo;
class WriterInfo;
Expand All @@ -34,30 +39,32 @@ typedef struct CustomParticipantInfo
{
eprosima::fastrtps::Participant * participant;
::ParticipantListener * listener;
ReaderInfo * secondarySubListener;
WriterInfo * secondaryPubListener;
rmw_guard_condition_t * graph_guard_condition;
} CustomParticipantInfo;

class ParticipantListener : public eprosima::fastrtps::ParticipantListener
{
public:
explicit ParticipantListener(rmw_guard_condition_t * graph_guard_condition)
: graph_guard_condition_(graph_guard_condition)
{}

void onParticipantDiscovery(
eprosima::fastrtps::Participant *,
eprosima::fastrtps::ParticipantDiscoveryInfo info) override
eprosima::fastrtps::rtps::ParticipantDiscoveryInfo && info) override
{
if (
info.rtps.m_status != eprosima::fastrtps::rtps::DISCOVERED_RTPSPARTICIPANT &&
info.rtps.m_status != eprosima::fastrtps::rtps::REMOVED_RTPSPARTICIPANT &&
info.rtps.m_status != eprosima::fastrtps::rtps::DROPPED_RTPSPARTICIPANT)
info.status != eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT &&
info.status != eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::REMOVED_PARTICIPANT &&
info.status != eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DROPPED_PARTICIPANT)
{
return;
}

if (eprosima::fastrtps::rtps::DISCOVERED_RTPSPARTICIPANT == info.rtps.m_status) {
if (eprosima::fastrtps::rtps::ParticipantDiscoveryInfo::DISCOVERED_PARTICIPANT == info.status) {
// ignore already known GUIDs
if (discovered_names.find(info.rtps.m_guid) == discovered_names.end()) {
auto map = rmw::impl::cpp::parse_key_value(info.rtps.m_userData);
if (discovered_names.find(info.info.m_guid) == discovered_names.end()) {
auto map = rmw::impl::cpp::parse_key_value(info.info.m_userData);
auto name_found = map.find("name");
auto ns_found = map.find("namespace");

Expand All @@ -73,24 +80,24 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener

if (name.empty()) {
// use participant name if no name was found in the user data
name = info.rtps.m_RTPSParticipantName;
name = info.info.m_participantName;
}
// ignore discovered participants without a name
if (!name.empty()) {
discovered_names[info.rtps.m_guid] = name;
discovered_namespaces[info.rtps.m_guid] = namespace_;
discovered_names[info.info.m_guid] = name;
discovered_namespaces[info.info.m_guid] = namespace_;
}
}
} else {
{
auto it = discovered_names.find(info.rtps.m_guid);
auto it = discovered_names.find(info.info.m_guid);
// only consider known GUIDs
if (it != discovered_names.end()) {
discovered_names.erase(it);
}
}
{
auto it = discovered_namespaces.find(info.rtps.m_guid);
auto it = discovered_namespaces.find(info.info.m_guid);
// only consider known GUIDs
if (it != discovered_namespaces.end()) {
discovered_namespaces.erase(it);
Expand Down Expand Up @@ -119,9 +126,71 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener
return namespaces;
}

void onSubscriberDiscovery(
eprosima::fastrtps::Participant *,
eprosima::fastrtps::rtps::ReaderDiscoveryInfo && info) override
{
if (eprosima::fastrtps::rtps::ReaderDiscoveryInfo::CHANGED_QOS_READER != info.status) {
bool is_alive =
eprosima::fastrtps::rtps::ReaderDiscoveryInfo::DISCOVERED_READER == info.status;
process_discovery_info(info.info, is_alive, true);
}
}

void onPublisherDiscovery(
eprosima::fastrtps::Participant *,
eprosima::fastrtps::rtps::WriterDiscoveryInfo && info) override
{
if (eprosima::fastrtps::rtps::WriterDiscoveryInfo::CHANGED_QOS_WRITER != info.status) {
bool is_alive =
eprosima::fastrtps::rtps::WriterDiscoveryInfo::DISCOVERED_WRITER == info.status;
process_discovery_info(info.info, is_alive, false);
}
}

template<class T>
void process_discovery_info(T & proxyData, bool is_alive, bool is_reader)
{
std::map<std::string, std::vector<std::string>> & topicNtypes =
is_reader ? reader_topic_and_types : writer_topic_and_types;

auto fqdn = proxyData.topicName();
bool trigger = false;
mapmutex.lock();
if (is_alive) {
topicNtypes[fqdn].push_back(proxyData.typeName());
trigger = true;
} else {
auto it = topicNtypes.find(fqdn);
if (it != topicNtypes.end()) {
const auto & loc =
std::find(std::begin(it->second), std::end(it->second), proxyData.typeName());
if (loc != std::end(it->second)) {
topicNtypes[fqdn].erase(loc, loc + 1);
trigger = true;
} else {
RCUTILS_LOG_DEBUG_NAMED(
"rmw_fastrtps_shared_cpp",
"unexpected removal of subscription on topic '%s' with type '%s'",
fqdn.c_str(), proxyData.typeName().c_str());
}
}
}
mapmutex.unlock();

if (trigger) {
rmw_fastrtps_shared_cpp::__rmw_trigger_guard_condition(
graph_guard_condition_->implementation_identifier,
graph_guard_condition_);
}
}

std::map<eprosima::fastrtps::rtps::GUID_t, std::string> discovered_names;
std::map<eprosima::fastrtps::rtps::GUID_t, std::string> discovered_namespaces;
std::map<std::string, std::vector<std::string>> reader_topic_and_types;
std::map<std::string, std::vector<std::string>> writer_topic_and_types;
std::mutex mapmutex;
rmw_guard_condition_t * graph_guard_condition_;
};

#endif // RMW_FASTRTPS_SHARED_CPP__CUSTOM_PARTICIPANT_INFO_HPP_
111 changes: 0 additions & 111 deletions rmw_fastrtps_shared_cpp/src/reader_info.hpp

This file was deleted.

14 changes: 6 additions & 8 deletions rmw_fastrtps_shared_cpp/src/rmw_count.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@
#include "namespace_prefix.hpp"
#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"
#include "reader_info.hpp"
#include "writer_info.hpp"

namespace rmw_fastrtps_shared_cpp
{
Expand Down Expand Up @@ -64,13 +62,13 @@ __rmw_count_publishers(
}

auto impl = static_cast<CustomParticipantInfo *>(node->data);
WriterInfo * slave_target = impl->secondaryPubListener;
::ParticipantListener * slave_target = impl->listener;

slave_target->mapmutex.lock();
// Search and sum up the publisher counts
for (const auto & topic_fqdn : topic_fqdns) {
const auto & it = slave_target->topicNtypes.find(topic_fqdn);
if (it != slave_target->topicNtypes.end()) {
const auto & it = slave_target->writer_topic_and_types.find(topic_fqdn);
if (it != slave_target->writer_topic_and_types.end()) {
*count += it->second.size();
}
}
Expand Down Expand Up @@ -117,13 +115,13 @@ __rmw_count_subscribers(
}

CustomParticipantInfo * impl = static_cast<CustomParticipantInfo *>(node->data);
ReaderInfo * slave_target = impl->secondarySubListener;
::ParticipantListener * slave_target = impl->listener;

slave_target->mapmutex.lock();
// Search and sum up the subscriber counts
for (const auto & topic_fqdn : topic_fqdns) {
const auto & it = slave_target->topicNtypes.find(topic_fqdn);
if (it != slave_target->topicNtypes.end()) {
const auto & it = slave_target->reader_topic_and_types.find(topic_fqdn);
if (it != slave_target->reader_topic_and_types.end()) {
*count += it->second.size();
}
}
Expand Down
Loading

0 comments on commit 146d66e

Please sign in to comment.