Skip to content
This repository has been archived by the owner on Jun 21, 2023. It is now read-only.

Add implementation of matching publisher/subscriber counts #310

Merged
merged 4 commits into from
Nov 28, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,44 @@
#ifndef RMW_CONNEXT_CPP__CONNEXT_STATIC_PUBLISHER_INFO_HPP_
#define RMW_CONNEXT_CPP__CONNEXT_STATIC_PUBLISHER_INFO_HPP_

#include <atomic>

#include "rmw_connext_shared_cpp/types.hpp"

#include "rosidl_typesupport_connext_cpp/message_type_support.h"

class ConnextPublisherListener;

extern "C"
{
struct ConnextStaticPublisherInfo
{
DDSPublisher * dds_publisher_;
ConnextPublisherListener * listener_;
DDSDataWriter * topic_writer_;
const message_type_support_callbacks_t * callbacks_;
rmw_gid_t publisher_gid;
};
} // extern "C"

class ConnextPublisherListener : public DDSPublisherListener
{
public:
virtual void on_publication_matched(
DDSDataWriter * writer,
const DDS_PublicationMatchedStatus & status)
{
(void) writer;
current_count_ = status.current_count;
}

std::size_t current_count() const
{
return current_count_;
}

private:
std::atomic<std::size_t> current_count_;
};

#endif // RMW_CONNEXT_CPP__CONNEXT_STATIC_PUBLISHER_INFO_HPP_
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,45 @@
#ifndef RMW_CONNEXT_CPP__CONNEXT_STATIC_SUBSCRIBER_INFO_HPP_
#define RMW_CONNEXT_CPP__CONNEXT_STATIC_SUBSCRIBER_INFO_HPP_

#include <atomic>

#include "rmw_connext_shared_cpp/ndds_include.hpp"

#include "rosidl_typesupport_connext_cpp/message_type_support.h"

class ConnextSubscriberListener;

extern "C"
{
struct ConnextStaticSubscriberInfo
{
DDSSubscriber * dds_subscriber_;
ConnextSubscriberListener * listener_;
DDSDataReader * topic_reader_;
DDSReadCondition * read_condition_;
bool ignore_local_publications;
const message_type_support_callbacks_t * callbacks_;
};
} // extern "C"

class ConnextSubscriberListener : public DDSSubscriberListener
{
public:
virtual void on_subscription_matched(
DDSDataReader * reader,
const DDS_SubscriptionMatchedStatus & status)
{
(void) reader;
current_count_ = status.current_count;
}

std::size_t current_count() const
{
return current_count_;
}

private:
std::atomic<std::size_t> current_count_;
};

#endif // RMW_CONNEXT_CPP__CONNEXT_STATIC_SUBSCRIBER_INFO_HPP_
85 changes: 76 additions & 9 deletions rmw_connext_cpp/src/rmw_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,9 @@ rmw_create_publisher(
DDSDataWriter * topic_writer = nullptr;
DDSTopic * topic = nullptr;
DDSTopicDescription * topic_description = nullptr;
void * buf = nullptr;
void * info_buf = nullptr;
void * listener_buf = nullptr;
ConnextPublisherListener * publisher_listener = nullptr;
ConnextStaticPublisherInfo * publisher_info = nullptr;
rmw_publisher_t * publisher = nullptr;
std::string mangled_name = "";
Expand Down Expand Up @@ -141,8 +143,18 @@ rmw_create_publisher(
goto fail;
}

// Allocate memory for the PublisherListener object.
listener_buf = rmw_allocate(sizeof(ConnextPublisherListener));
if (!listener_buf) {
RMW_SET_ERROR_MSG("failed to allocate memory for publisher listener");
goto fail;
}
// Use a placement new to construct the PublisherListener in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(publisher_listener, listener_buf, goto fail, ConnextPublisherListener, )
listener_buf = nullptr; // Only free the buffer pointer.

dds_publisher = participant->create_publisher(
publisher_qos, NULL, DDS_STATUS_MASK_NONE);
publisher_qos, publisher_listener, DDS_PUBLICATION_MATCHED_STATUS);
if (!dds_publisher) {
RMW_SET_ERROR_MSG("failed to create publisher");
goto fail;
Expand Down Expand Up @@ -188,18 +200,20 @@ rmw_create_publisher(
}

// Allocate memory for the ConnextStaticPublisherInfo object.
buf = rmw_allocate(sizeof(ConnextStaticPublisherInfo));
if (!buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
info_buf = rmw_allocate(sizeof(ConnextStaticPublisherInfo));
if (!info_buf) {
RMW_SET_ERROR_MSG("failed to allocate memory for publisher info");
goto fail;
}
// Use a placement new to construct the ConnextStaticPublisherInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(publisher_info, buf, goto fail, ConnextStaticPublisherInfo, )
buf = nullptr; // Only free the publisher_info pointer; don't need the buf pointer anymore.
RMW_TRY_PLACEMENT_NEW(publisher_info, info_buf, goto fail, ConnextStaticPublisherInfo, )
info_buf = nullptr; // Only free the publisher_info pointer; don't need the buf pointer anymore.
publisher_info->dds_publisher_ = dds_publisher;
publisher_info->topic_writer_ = topic_writer;
publisher_info->callbacks_ = callbacks;
publisher_info->publisher_gid.implementation_identifier = rti_connext_identifier;
publisher_info->listener_ = publisher_listener;
publisher_listener = nullptr;
static_assert(
sizeof(ConnextPublisherGID) <= RMW_GID_STORAGE_SIZE,
"RMW_GID_STORAGE_SIZE insufficient to store the rmw_connext_cpp GID implemenation."
Expand Down Expand Up @@ -266,18 +280,61 @@ rmw_create_publisher(
(std::cerr << ss.str()).flush();
}
}
if (publisher_listener) {
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
publisher_listener->~ConnextPublisherListener(), ConnextPublisherListener)
rmw_free(publisher_listener);
}
if (publisher_info) {
if (publisher_info->listener_) {
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
publisher_info->listener_->~ConnextPublisherListener(), ConnextPublisherListener)
rmw_free(publisher_info->listener_);
}
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
publisher_info->~ConnextStaticPublisherInfo(), ConnextStaticPublisherInfo)
rmw_free(publisher_info);
}
if (buf) {
rmw_free(buf);
if (info_buf) {
rmw_free(info_buf);
}
if (listener_buf) {
rmw_free(listener_buf);
}

return NULL;
}

rmw_ret_t
rmw_publisher_count_matched_subscriptions(
const rmw_publisher_t * publisher,
size_t * subscription_count)
{
if (!publisher) {
RMW_SET_ERROR_MSG("publisher handle is null");
return RMW_RET_INVALID_ARGUMENT;
}

if (!subscription_count) {
RMW_SET_ERROR_MSG("subscription_count is null");
return RMW_RET_INVALID_ARGUMENT;
}

auto info = static_cast<ConnextStaticPublisherInfo *>(publisher->data);
if (info == nullptr) {
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
RMW_SET_ERROR_MSG("publisher internal data is invalid");
return RMW_RET_ERROR;
}
if (info->listener_ == nullptr) {
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
RMW_SET_ERROR_MSG("publisher internal listener is invalid");
return RMW_RET_ERROR;
}

*subscription_count = info->listener_->current_count();

return RMW_RET_OK;
}

rmw_ret_t
rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
{
Expand Down Expand Up @@ -317,6 +374,7 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
publisher_info->dds_publisher_->get_instance_handle(), EntityType::Publisher);
node_info->publisher_listener->trigger_graph_guard_condition();
DDSPublisher * dds_publisher = publisher_info->dds_publisher_;

if (dds_publisher) {
if (publisher_info->topic_writer_) {
if (dds_publisher->delete_datawriter(publisher_info->topic_writer_) != DDS_RETCODE_OK) {
Expand All @@ -334,6 +392,15 @@ rmw_destroy_publisher(rmw_node_t * node, rmw_publisher_t * publisher)
RMW_SET_ERROR_MSG("cannot delete datawriter because the publisher is null");
return RMW_RET_ERROR;
}

ConnextPublisherListener * pub_listener = publisher_info->listener_;
if (pub_listener) {
RMW_TRY_DESTRUCTOR(
pub_listener->~ConnextPublisherListener(),
ConnextPublisherListener, return RMW_RET_ERROR)
rmw_free(pub_listener);
}

RMW_TRY_DESTRUCTOR(
publisher_info->~ConnextStaticPublisherInfo(),
ConnextStaticPublisherInfo, return RMW_RET_ERROR)
Expand Down
73 changes: 65 additions & 8 deletions rmw_connext_cpp/src/rmw_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,9 @@ rmw_create_subscription(
DDSTopicDescription * topic_description = nullptr;
DDSDataReader * topic_reader = nullptr;
DDSReadCondition * read_condition = nullptr;
void * buf = nullptr;
void * info_buf = nullptr;
void * listener_buf = nullptr;
ConnextSubscriberListener * subscriber_listener = nullptr;
ConnextStaticSubscriberInfo * subscriber_info = nullptr;
rmw_subscription_t * subscription = nullptr;
std::string mangled_name;
Expand Down Expand Up @@ -137,8 +139,18 @@ rmw_create_subscription(
goto fail;
}

// Allocate memory for the SubscriberListener object.
listener_buf = rmw_allocate(sizeof(ConnextSubscriberListener));
if (!listener_buf) {
RMW_SET_ERROR_MSG("failed to allocate memory for subscriber listener");
goto fail;
}
// Use a placement new to construct the ConnextSubscriberListener in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(subscriber_listener, listener_buf, goto fail, ConnextSubscriberListener, )
listener_buf = nullptr; // Only free the buffer pointer.

dds_subscriber = participant->create_subscriber(
subscriber_qos, NULL, DDS_STATUS_MASK_NONE);
subscriber_qos, subscriber_listener, DDS_SUBSCRIPTION_MATCHED_STATUS);
if (!dds_subscriber) {
RMW_SET_ERROR_MSG("failed to create subscriber");
goto fail;
Expand Down Expand Up @@ -192,19 +204,21 @@ rmw_create_subscription(
}

// Allocate memory for the ConnextStaticSubscriberInfo object.
buf = rmw_allocate(sizeof(ConnextStaticSubscriberInfo));
if (!buf) {
info_buf = rmw_allocate(sizeof(ConnextStaticSubscriberInfo));
if (!info_buf) {
RMW_SET_ERROR_MSG("failed to allocate memory");
goto fail;
}
// Use a placement new to construct the ConnextStaticSubscriberInfo in the preallocated buffer.
RMW_TRY_PLACEMENT_NEW(subscriber_info, buf, goto fail, ConnextStaticSubscriberInfo, )
buf = nullptr; // Only free the subscriber_info pointer; don't need the buf pointer anymore.
RMW_TRY_PLACEMENT_NEW(subscriber_info, info_buf, goto fail, ConnextStaticSubscriberInfo, )
info_buf = nullptr; // Only free the subscriber_info pointer; don't need the buf pointer anymore.
subscriber_info->dds_subscriber_ = dds_subscriber;
subscriber_info->topic_reader_ = topic_reader;
subscriber_info->read_condition_ = read_condition;
subscriber_info->callbacks_ = callbacks;
subscriber_info->ignore_local_publications = ignore_local_publications;
subscriber_info->listener_ = subscriber_listener;
subscriber_listener = nullptr;

subscription->implementation_identifier = rti_connext_identifier;
subscription->data = subscriber_info;
Expand Down Expand Up @@ -272,18 +286,61 @@ rmw_create_subscription(
(std::cerr << ss.str()).flush();
}
}
if (subscriber_listener) {
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
subscriber_listener->~ConnextSubscriberListener(), ConnextSubscriberListener)
rmw_free(subscriber_listener);
}
if (subscriber_info) {
if (subscriber_info->listener_) {
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
subscriber_info->listener_->~ConnextSubscriberListener(), ConnextSubscriberListener)
rmw_free(subscriber_info->listener_);
}
RMW_TRY_DESTRUCTOR_FROM_WITHIN_FAILURE(
subscriber_info->~ConnextStaticSubscriberInfo(), ConnextStaticSubscriberInfo)
rmw_free(subscriber_info);
}
if (buf) {
rmw_free(buf);
if (info_buf) {
rmw_free(info_buf);
}
if (listener_buf) {
rmw_free(listener_buf);
}

return NULL;
}

rmw_ret_t
rmw_subscription_count_matched_publishers(
const rmw_subscription_t * subscription,
size_t * publisher_count)
{
if (!subscription) {
RMW_SET_ERROR_MSG("subscription handle is null");
return RMW_RET_INVALID_ARGUMENT;
}

if (!publisher_count) {
RMW_SET_ERROR_MSG("publisher_count is null");
return RMW_RET_INVALID_ARGUMENT;
}

auto info = static_cast<ConnextStaticSubscriberInfo *>(subscription->data);
if (info == nullptr) {
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
RMW_SET_ERROR_MSG("subscriber internal data is invalid");
return RMW_RET_ERROR;
}
if (info->listener_ == nullptr) {
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
RMW_SET_ERROR_MSG("subscriber internal listener is invalid");
return RMW_RET_ERROR;
}

*publisher_count = info->listener_->current_count();

return RMW_RET_OK;
}

rmw_ret_t
rmw_destroy_subscription(rmw_node_t * node, rmw_subscription_t * subscription)
{
Expand Down