Skip to content
This repository has been archived by the owner on Oct 7, 2021. It is now read-only.

Use matched count to determine server_is_available #185

Merged
merged 2 commits into from
Aug 24, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions rmw_opensplice_cpp/src/rmw_service_server_is_available.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ rmw_service_server_is_available(
return RMW_RET_ERROR;
}

const char * err = callbacks->server_is_available(
client_info->requester_, node, is_available, &rmw_count_publishers, &rmw_count_subscribers);
const char * err = callbacks->server_is_available(client_info->requester_, node, is_available);
if (err) {
RMW_SET_ERROR_MSG(err);
return RMW_RET_ERROR;
Expand Down
63 changes: 58 additions & 5 deletions rmw_opensplice_cpp/src/types.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
#include <map>
#include <set>
#include <string>
#include <vector>

#include "rosidl_typesupport_opensplice_cpp/misc.hpp"
#include "rmw/error_handling.h"

std::string
Expand All @@ -31,6 +33,31 @@ create_type_name(
"::" + sep + "::dds_::" + callbacks->message_name + "_";
}

/// Return the ROS specific prefix if it exists, otherwise "".
static inline
std::string
_get_ros_prefix_if_exists(const std::string & topic_name)
{
for (auto prefix : rosidl_typesupport_opensplice_cpp::get_ros_prefixes()) {
if (topic_name.rfind(std::string(prefix) + "/", 0) == 0) {
return prefix;
}
}
return "";
}

/// Return the demangle ROS topic or the original if not a ROS topic.
static inline
std::string
_demangle_if_ros_topic(const std::string & topic_name)
{
std::string prefix = _get_ros_prefix_if_exists(topic_name);
if (prefix.length()) {
return topic_name.substr(prefix.length());
}
return topic_name;
}

CustomDataReaderListener::CustomDataReaderListener()
: print_discovery_logging_(false)
{
Expand Down Expand Up @@ -65,11 +92,23 @@ size_t
CustomDataReaderListener::count_topic(const char * topic_name)
{
std::lock_guard<std::mutex> lock(mutex_);
auto it = topic_names_and_types_.find(topic_name);
auto it = std::find_if(
topic_names_and_types_.begin(),
topic_names_and_types_.end(),
[&](auto tnt) -> bool {
auto fqdn = _demangle_if_ros_topic(tnt.first);
if (fqdn == topic_name) {
return true;
}
return false;
});
size_t count;
if (it == topic_names_and_types_.end()) {
return 0;
count = 0;
} else {
count = it->second.size();
}
return it->second.size();
return count;
}

void
Expand Down Expand Up @@ -176,10 +215,17 @@ CustomPublisherListener::on_data_available(DDS::DataReader * reader)
}

for (DDS::ULong i = 0; i < data_seq.length(); ++i) {
std::string topic_name = "";
if (info_seq[i].valid_data) {
if (info_seq[i].instance_state == DDS::ALIVE_INSTANCE_STATE) {
for (DDS::ULong j = 0; j < data_seq[i].partition.name.length(); ++j) {
topic_name += data_seq[i].partition.name[j];
topic_name += "/";
}
topic_name += data_seq[i].topic_name.in();

add_information(
info_seq[i], data_seq[i].topic_name.in(), data_seq[i].type_name.in(), PublisherEP);
info_seq[i], topic_name, data_seq[i].type_name.in(), PublisherEP);
} else {
remove_information(info_seq[i], PublisherEP);
}
Expand Down Expand Up @@ -225,10 +271,17 @@ CustomSubscriberListener::on_data_available(DDS::DataReader * reader)
}

for (DDS::ULong i = 0; i < data_seq.length(); ++i) {
std::string topic_name("");
if (info_seq[i].valid_data) {
if (info_seq[i].instance_state == DDS::ALIVE_INSTANCE_STATE) {
for (DDS::ULong j = 0; j < data_seq[i].partition.name.length(); ++j) {
topic_name += data_seq[i].partition.name[j];
topic_name += "/";
}
topic_name += data_seq[i].topic_name.in();

add_information(
info_seq[i], data_seq[i].topic_name.in(), data_seq[i].type_name.in(), SubscriberEP);
info_seq[i], topic_name, data_seq[i].type_name.in(), SubscriberEP);
} else {
remove_information(info_seq[i], SubscriberEP);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,10 @@ destroy_responder__@(spec.srv_name)(void * untyped_responder, void (* deallocato

const char *
server_is_available__@(spec.srv_name)(
void * requester, const rmw_node_t * node, bool * is_available,
rmw_ret_t (* count_publishers)(const rmw_node_t *, const char *, size_t *),
rmw_ret_t (* count_subscribers)(const rmw_node_t *, const char *, size_t *))
void * requester, const rmw_node_t * node, bool * is_available)
{
return @(spec.pkg_name)::srv::typesupport_opensplice_cpp::server_is_available__@(spec.srv_name)(
requester, node, is_available, count_publishers, count_subscribers);
requester, node, is_available);
}

static service_type_support_callbacks_t __callbacks = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@

#include <rosidl_typesupport_opensplice_cpp/visibility_control.h>
#include <string>
#include <vector>

namespace rosidl_typesupport_opensplice_cpp
{

ROSIDL_TYPESUPPORT_OPENSPLICE_CPP_PUBLIC
const std::vector<std::string> & get_ros_prefixes();

ROSIDL_TYPESUPPORT_OPENSPLICE_CPP_PUBLIC
bool process_topic_name(
const char * topic_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,39 +321,32 @@ class Requester

/// Return NULL on success, otherwise an error string.
const char *
server_is_available(
const rmw_node_t * node, bool * is_available,
rmw_ret_t (* count_publishers)(const rmw_node_t *, const char *, size_t *),
rmw_ret_t (* count_subscribers)(const rmw_node_t *, const char *, size_t *)
) noexcept
server_is_available(const rmw_node_t * node, bool * is_available) noexcept
{
(void)node;
if (!is_available) {
return "argument is_available is null";
}
*is_available = false;
// In the OpenSplice RPC implementation, a server is ready when:
// - At least one server is subscribed to the request topic.
// - At least one server is publishing to the reponse topic.
size_t number_of_request_subscriptions = 0;
rmw_ret_t ret = count_subscribers(
node,
request_topic_->get_name(),
&number_of_request_subscriptions);
if (ret != RMW_RET_OK) {
return rmw_get_error_string_safe();
// - At least one reader is matched to the request writer.
// - At least one writer is matched to the reponse reader.
DDS::ReturnCode_t retcode;
DDS::PublicationMatchedStatus publication_status;
retcode = request_datawriter_->get_publication_matched_status(publication_status);
if (retcode != DDS::RETCODE_OK) {
return "DataWriter::get_publication_matched_status: failed";
}
if (number_of_request_subscriptions == 0) {
return nullptr;
}
size_t number_of_response_publishers = 0;
ret = count_publishers(
node,
response_topic_->get_name(),
&number_of_response_publishers);
if (ret != RMW_RET_OK) {
return rmw_get_error_string_safe();

DDS::SubscriptionMatchedStatus subscription_status;
retcode = response_datareader_->get_subscription_matched_status(subscription_status);
if (retcode != DDS::RETCODE_OK) {
return "DataReader::get_subscription_matched_status: failed";
}
if (number_of_response_publishers == 0) {

if (publication_status.current_count == 0 ||
subscription_status.current_count == 0)
{
return nullptr;
}
*is_available = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,7 @@ typedef struct service_type_support_callbacks_t
// Returns NULL if the check was successfully, otherwise an error string.
// If no server is available, NULL is returned but is_available will be set to false.
const char * (*server_is_available)(
void * requester, const rmw_node_t * node, bool * is_available,
rmw_ret_t (* count_publishers)(const rmw_node_t *, const char *, size_t *),
rmw_ret_t (* count_subscribers)(const rmw_node_t *, const char *, size_t *));
void * requester, const rmw_node_t * node, bool * is_available);
} service_type_support_callbacks_t;

#endif // ROSIDL_TYPESUPPORT_OPENSPLICE_CPP__SERVICE_TYPE_SUPPORT_H_
Original file line number Diff line number Diff line change
Expand Up @@ -560,9 +560,7 @@ destroy_responder__@(spec.srv_name)(void * untyped_responder, void (* deallocato

const char *
server_is_available__@(spec.srv_name)(
void * requester, const rmw_node_t * node, bool * is_available,
rmw_ret_t (* count_publishers)(const rmw_node_t *, const char *, size_t *),
rmw_ret_t (* count_subscribers)(const rmw_node_t *, const char *, size_t *))
void * requester, const rmw_node_t * node, bool * is_available)
{
using RequesterT = rosidl_typesupport_opensplice_cpp::Requester<
@(__dds_msg_type_prefix)_Request_,
Expand All @@ -571,8 +569,7 @@ server_is_available__@(spec.srv_name)(

auto typed_requester = reinterpret_cast<RequesterT *>(requester);

return typed_requester->server_is_available(
node, is_available, count_publishers, count_subscribers);
return typed_requester->server_is_available(node, is_available);
}

static service_type_support_callbacks_t callbacks = {
Expand Down
10 changes: 10 additions & 0 deletions rosidl_typesupport_opensplice_cpp/src/misc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <string>
#include <vector>
#include "rosidl_typesupport_opensplice_cpp/misc.hpp"

namespace rosidl_typesupport_opensplice_cpp
Expand All @@ -21,6 +22,15 @@ static const char * const ros_topic_prefix = "rt";
static const char * const ros_service_request_prefix = "rq";
static const char * const ros_service_response_prefix = "rr";

const std::vector<std::string> &
get_ros_prefixes()
{
static const std::vector<std::string> ros_prefixes =
{ros_topic_prefix, ros_service_request_prefix, ros_service_response_prefix};

return ros_prefixes;
}

bool
process_topic_name(
const char * topic_name,
Expand Down