Skip to content

Commit

Permalink
Implemented rmw_get_publishers_info_by_topic and
Browse files Browse the repository at this point in the history
rmw_get_subscriptions_info_by_topic

Signed-off-by: Jaison Titus <jaisontj92@gmail.com>
  • Loading branch information
jaisontj committed Nov 13, 2019
1 parent d0ac7e0 commit 58c1fe8
Show file tree
Hide file tree
Showing 4 changed files with 213 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,6 @@ class ParticipantListener : public eprosima::fastrtps::ParticipantListener
{
auto & topic_cache =
is_reader ? reader_topic_cache : writer_topic_cache;

bool trigger;
{
std::lock_guard<std::mutex> guard(topic_cache.getMutex());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class TopicCache
* Map of topic names to TopicData. Where topic data is vector of tuples containing
* participant GUID, topic type and the qos policy of the respective participant.
* Topics here are represented as one to many, DDS XTypes 1.2
* specifies application code 'generally' uses a 1-1 relationship.
* specifies application code that 'generally' uses a 1-1 relationship.
* However, generic services such as logger and monitor, can discover
* multiple types on the same topic.
*/
Expand Down Expand Up @@ -142,6 +142,7 @@ class TopicCache
* @param rtpsParticipantKey
* @param topic_name
* @param type_name
* @param dds_qos the dds qos policy of the participant
* @return true if a change has been recorded
*/
template<class T>
Expand Down
196 changes: 194 additions & 2 deletions rmw_fastrtps_shared_cpp/src/rmw_get_topic_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,188 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <algorithm>
#include <map>
#include <string>
#include <tuple>
#include <vector>

#include "rmw/rmw.h"
#include "rmw/types.h"
#include "rmw/topic_info_array.h"

#include "rmw_fastrtps_shared_cpp/custom_participant_info.hpp"
#include "rmw_fastrtps_shared_cpp/rmw_common.hpp"
#include "rmw_fastrtps_shared_cpp/namespace_prefix.hpp"

namespace rmw_fastrtps_shared_cpp
{

rmw_ret_t
_validate_params(
const char * identifier,
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * topic_name,
rmw_topic_info_array_t * participants_info)
{
if (!identifier) {
RMW_SET_ERROR_MSG("null implementation identifier provided");
return RMW_RET_ERROR;
}

if (!topic_name) {
RMW_SET_ERROR_MSG("null topic_name provided");
return RMW_RET_ERROR;
}

if (!allocator) {
RMW_SET_ERROR_MSG("null allocator provided");
return RMW_RET_ERROR;
}

if (!node) {
RMW_SET_ERROR_MSG("null node handle");
return RMW_RET_ERROR;
}

// Get participant pointer from node
if (node->implementation_identifier != identifier) {
RMW_SET_ERROR_MSG("node handle not from this implementation");
return RMW_RET_ERROR;
}

if (!participants_info) {
RMW_SET_ERROR_MSG("null participants_info provided");
return RMW_RET_ERROR;
}

return RMW_RET_OK;
}

std::vector<std::string>
_get_topic_fqdns(const char * topic_name, bool no_mangle)
{
std::vector<std::string> topic_fqdns;
topic_fqdns.push_back(topic_name);
// if mangle
if (!no_mangle) {
auto ros_prefixes = _get_all_ros_prefixes();
// Build the list of all possible topic FQDN
std::for_each(ros_prefixes.begin(), ros_prefixes.end(),
[&topic_fqdns, &topic_name](const std::string & prefix) {
topic_fqdns.push_back(prefix + topic_name);
});
}
return topic_fqdns;
}

rmw_ret_t
_set_rmw_topic_info(
std::tuple<GUID_t, std::string, rmw_qos_profile_t> data,
::ParticipantListener * slave_target,
rmw_topic_info_t * topic_info)
{
const auto & gid = std::get<0>(data);
// convert gid to const char * and set it inside topic_info
std::ostringstream gid_stream;
gid_stream << gid;
rmw_ret_t ret = rmw_topic_info_set_gid(gid_stream.str().c_str(), topic_info);
if (ret != RMW_RET_OK) {
return ret;
}
// set topic type
ret = rmw_topic_info_set_topic_type(std::get<1>(data).c_str(), topic_info);
if (ret != RMW_RET_OK) {
return ret;
}
// set qos profile
ret = rmw_topic_info_set_qos_profile(&std::get<2>(data), topic_info);
if (ret != RMW_RET_OK) {
return ret;
}
// get node name
const auto & d_name_it = slave_target->discovered_names.find(gid);
if (d_name_it != slave_target->discovered_names.end()) {
ret = rmw_topic_info_set_node_name(d_name_it->second.c_str(), topic_info);
} else {
ret = rmw_topic_info_set_node_name("_NODE_NAME_UNKNOWN_", topic_info);
}
if (ret != RMW_RET_OK) {
return ret;
}
// get node namespace
const auto & d_namespace_it = slave_target->discovered_namespaces.find(gid);
if (d_namespace_it != slave_target->discovered_namespaces.end()) {
ret = rmw_topic_info_set_node_namespace(d_namespace_it->second.c_str(), topic_info);
} else {
ret = rmw_topic_info_set_node_namespace("_NODE_NAMESPACE_UNKNOWN_", topic_info);
}
return ret;
}


rmw_ret_t
_get_info_by_topic(
const char * identifier,
const rmw_node_t * node,
rcutils_allocator_t * allocator,
const char * topic_name,
bool no_mangle,
bool is_publisher,
rmw_topic_info_array_t * participants_info)
{
rmw_ret_t ret = _validate_params(
identifier,
node,
allocator,
topic_name,
participants_info);
if (ret != RMW_RET_OK) {
return ret;
}

const auto & topic_fqdns = _get_topic_fqdns(topic_name, no_mangle);

auto impl = static_cast<CustomParticipantInfo *>(node->data);
::ParticipantListener * slave_target = impl->listener;
auto & topic_cache =
is_publisher ? slave_target->writer_topic_cache : slave_target->reader_topic_cache;
{
std::lock_guard<std::mutex> guard(topic_cache.getMutex());
auto & topic_name_to_data = topic_cache().getTopicNameToTopicData();
std::vector<rmw_topic_info_t> topic_info_vector;
for (const auto & topic_name : topic_fqdns) {
const auto & it = topic_name_to_data.find(topic_name);
if (it != topic_name_to_data.end()) {
for (const auto & data : it->second) {
rmw_topic_info_t topic_info;
rmw_ret_t ret = _set_rmw_topic_info(data, slave_target, &topic_info);
if (ret != RMW_RET_OK) {
RMW_SET_ERROR_MSG("Failed to create set_rmw_topic_info.");
return ret;
}
// add rmw_topic_info_t to a vector
topic_info_vector.push_back(topic_info);
}
}
}

// add all the elements from the vector to rmw_topic_info_array_t
auto count = topic_info_vector.size();
ret = rmw_topic_info_array_init_with_size(allocator, count, participants_info);
if (ret != RMW_RET_OK) {
RMW_SET_ERROR_MSG("rmw_topic_info_array_init_with_size failed to allocate memory.");
return RMW_RET_BAD_ALLOC;
}
for (auto i = 0u; i < count; i++) {
participants_info->info_array[i] = topic_info_vector.at(i);
}
participants_info->count = count;
}
return RMW_RET_OK;
}

rmw_ret_t
__rmw_get_publishers_info_by_topic(
const char * identifier,
Expand All @@ -25,7 +203,14 @@ __rmw_get_publishers_info_by_topic(
bool no_mangle,
rmw_topic_info_array_t * publishers_info)
{
return RMW_RET_UNSUPPORTED;
return _get_info_by_topic(
identifier,
node,
allocator,
topic_name,
no_mangle,
true, /*is_publisher*/
publishers_info);
}

rmw_ret_t
Expand All @@ -37,6 +222,13 @@ __rmw_get_subscriptions_info_by_topic(
bool no_mangle,
rmw_topic_info_array_t * subscriptions_info)
{
return RMW_RET_UNSUPPORTED;
return _get_info_by_topic(
identifier,
node,
allocator,
topic_name,
no_mangle,
false, /*is_publisher*/
subscriptions_info);
}
} // namespace rmw_fastrtps_shared_cpp
28 changes: 17 additions & 11 deletions rmw_fastrtps_shared_cpp/test/test_topic_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,17 @@ class TopicCacheTestFixture : public ::testing::Test
GUID_t guid[2];
void SetUp()
{
// Create an instance handlers
// Create instance handlers
for (int i = 0; i < 2; i++) {
guid[i] = GUID_t(GuidPrefix_t(), i + 1);
instance_handler[i] = guid[i];
}

// Populating WriterQos -> which is from the DDS layer and
// rmw_qos_profile_t which is from rmw/types.h.
// This is done to test if topic_cache.getTopicNameToTopicData() returns
// the correct value in rmw_qos_profile_t for a given WriterQos

// DDS qos
qos[0].m_durability.kind = eprosima::fastrtps::TRANSIENT_DURABILITY_QOS;
qos[0].m_reliability.kind = eprosima::fastrtps::RELIABLE_RELIABILITY_QOS;
Expand Down Expand Up @@ -85,7 +90,7 @@ class TopicCacheTestFixture : public ::testing::Test
rmw_qos[1].lifespan.sec = 19u;
rmw_qos[1].lifespan.nsec = 5432u;

// Add some topics
// Add data to topic_cache
topic_cache.addTopic(instance_handler[0], "topic1", "type1", qos[0]);
topic_cache.addTopic(instance_handler[0], "topic2", "type2", qos[0]);
topic_cache.addTopic(instance_handler[1], "topic1", "type1", qos[1]);
Expand Down Expand Up @@ -124,7 +129,7 @@ TEST_F(TopicCacheTestFixture, test_topic_cache_get_participant_map)
const auto & it = participant_topic_map.find(this->guid[0]);
ASSERT_TRUE(it != participant_topic_map.end());
// Verify that the topic and respective types are present
auto & topic_type_map = it->second;
const auto & topic_type_map = it->second;

const auto & topic1it = topic_type_map.find("topic1");
ASSERT_TRUE(topic1it != topic_type_map.end());
Expand All @@ -140,7 +145,7 @@ TEST_F(TopicCacheTestFixture, test_topic_cache_get_participant_map)
const auto & it2 = participant_topic_map.find(this->guid[1]);
ASSERT_TRUE(it2 != participant_topic_map.end());
// Verify that the topic and respective types are present
auto & topic_type_map2 = it2->second;
const auto & topic_type_map2 = it2->second;

const auto & topic1it2 = topic_type_map2.find("topic1");
ASSERT_TRUE(topic1it2 != topic_type_map2.end());
Expand All @@ -163,21 +168,21 @@ TEST_F(TopicCacheTestFixture, test_topic_cache_get_topic_name_topic_data_map)
expected_results["topic2"].push_back(std::make_tuple(guid[0], "type2", rmw_qos[0]));
expected_results["topic2"].push_back(std::make_tuple(guid[1], "type1", rmw_qos[1]));
for (auto & result_it : expected_results) {
auto & topic_name = result_it.first;
auto & expected_topic_data = result_it.second;
const auto & topic_name = result_it.first;
const auto & expected_topic_data = result_it.second;

const auto & it = topic_data_map.find(topic_name);
ASSERT_TRUE(it != topic_data_map.end());
// Verify that the topic has all the associated data
auto & topic_data = it->second;
const auto & topic_data = it->second;
for (auto i = 0u; i < expected_topic_data.size(); i++) {
// GUID
EXPECT_EQ(std::get<0>(topic_data.at(i)), std::get<0>(expected_topic_data.at(i)));
// TYPE
EXPECT_EQ(std::get<1>(topic_data.at(i)), std::get<1>(expected_topic_data.at(i)));
// QOS
auto & qos = std::get<2>(topic_data.at(i));
auto & expected_qos = std::get<2>(expected_topic_data.at(i));
const auto & qos = std::get<2>(topic_data.at(i));
const auto & expected_qos = std::get<2>(expected_topic_data.at(i));
EXPECT_EQ(qos.durability, expected_qos.durability);
EXPECT_EQ(qos.reliability, expected_qos.reliability);
EXPECT_EQ(qos.liveliness, expected_qos.liveliness);
Expand All @@ -194,7 +199,8 @@ TEST_F(TopicCacheTestFixture, test_topic_cache_get_topic_name_topic_data_map)
TEST_F(TopicCacheTestFixture, test_topic_cache_add_topic)
{
// Add Topic
bool did_add = this->topic_cache.addTopic(this->instance_handler[1], "TestTopic", "TestType",
const bool did_add = this->topic_cache.addTopic(this->instance_handler[1], "TestTopic",
"TestType",
this->qos[1]);
// Verify that the returned value was true
EXPECT_TRUE(did_add);
Expand Down Expand Up @@ -242,7 +248,7 @@ TEST_F(TopicCacheTestFixture, test_topic_cache_remove_policy_element_does_not_ex
// add topic
this->topic_cache.addTopic(this->instance_handler[1], "TestTopic", "TestType", this->qos[1]);
// Assert that the return was false
auto const did_remove = this->topic_cache.removeTopic(this->instance_handler[1], "NewTestTopic",
const auto did_remove = this->topic_cache.removeTopic(this->instance_handler[1], "NewTestTopic",
"TestType");
ASSERT_FALSE(did_remove);
}

0 comments on commit 58c1fe8

Please sign in to comment.