Skip to content

Commit

Permalink
Add in implementation of inconsistent topic. (#103)
Browse files Browse the repository at this point in the history
* Add in implementation of inconsistent topic.

Signed-off-by: Chris Lalancette <clalancette@openrobotics.org>
  • Loading branch information
clalancette authored Mar 13, 2023
1 parent 035fbf1 commit ecf64a0
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 1 deletion.
29 changes: 28 additions & 1 deletion rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,8 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition
public:
explicit RMW_Connext_StatusCondition(
DDS_Entity * const entity)
: scond(DDS_Entity_get_statuscondition(entity))
: scond(DDS_Entity_get_statuscondition(entity)),
status_inconsistent_topic(DDS_InconsistentTopicStatus_INITIALIZER)
{
this->scond = DDS_Entity_get_statuscondition(entity);
if (nullptr == this->scond) {
Expand Down Expand Up @@ -333,8 +334,34 @@ class RMW_Connext_StatusCondition : public RMW_Connext_Condition
virtual bool
has_status(const rmw_event_type_t event_type) = 0;

void
on_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status);

void
update_status_inconsistent_topic(const struct DDS_InconsistentTopicStatus * status);

inline rmw_ret_t
get_incompatible_type_status(
rmw_incompatible_type_status_t * const status)
{
update_state(
[this, status]() {
status->total_count = this->status_inconsistent_topic.total_count;
status->total_count_change = this->status_inconsistent_topic.total_count_change;

this->triggered_inconsistent_topic = false;
this->status_inconsistent_topic.total_count_change = 0;
}, false /* notify */);

return RMW_RET_OK;
}

protected:
DDS_StatusCondition * scond;

bool triggered_inconsistent_topic{false};

struct DDS_InconsistentTopicStatus status_inconsistent_topic;
};

void
Expand Down
26 changes: 26 additions & 0 deletions rmw_connextdds_common/src/common/rmw_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3074,6 +3074,11 @@ ros_event_to_dds(const rmw_event_type_t ros, bool * const invalid)
{
return DDS_SAMPLE_LOST_STATUS;
}
case RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE:
case RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE:
{
return DDS_INCONSISTENT_TOPIC_STATUS;
}
default:
{
if (nullptr != invalid) {
Expand Down Expand Up @@ -3116,6 +3121,10 @@ dds_event_to_str(const DDS_StatusKind event)
{
return "SAMPLE_LOST";
}
case DDS_INCONSISTENT_TOPIC_STATUS:
{
return "INCONSISTENT_TOPIC";
}
default:
{
return "UNSUPPORTED";
Expand All @@ -3131,6 +3140,7 @@ ros_event_for_reader(const rmw_event_type_t ros)
case RMW_EVENT_REQUESTED_DEADLINE_MISSED:
case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE:
case RMW_EVENT_MESSAGE_LOST:
case RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE:
{
return true;
}
Expand Down Expand Up @@ -3180,6 +3190,14 @@ RMW_Connext_SubscriberStatusCondition::get_status(
rc = this->get_message_lost_status(status);
break;
}
case RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE:
{
rmw_incompatible_type_status_t * const status =
reinterpret_cast<rmw_incompatible_type_status_t *>(event_info);

rc = this->get_incompatible_type_status(status);
break;
}
default:
{
RMW_CONNEXT_LOG_ERROR_A_SET(
Expand Down Expand Up @@ -3223,6 +3241,14 @@ RMW_Connext_PublisherStatusCondition::get_status(
rc = this->get_offered_qos_incompatible_status(status);
break;
}
case RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE:
{
rmw_incompatible_type_status_t * const status =
reinterpret_cast<rmw_incompatible_type_status_t *>(event_info);

rc = this->get_incompatible_type_status(status);
break;
}
default:
{
RMW_CONNEXT_LOG_ERROR_A_SET("unsupported publisher qos: %d", event_type)
Expand Down
68 changes: 68 additions & 0 deletions rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,20 @@ RMW_Connext_DataWriterListener_liveliness_lost(
self->on_liveliness_lost(status);
}

void
RMW_Connext_TopicListener_on_inconsistent_topic(
void * listener_data,
DDS_Topic * topic,
const struct DDS_InconsistentTopicStatus * status)
{
RMW_Connext_StatusCondition * const self =
reinterpret_cast<RMW_Connext_StatusCondition *>(listener_data);

UNUSED_ARG(topic);

self->on_inconsistent_topic(status);
}


bool
RMW_Connext_WaitSet::on_condition_active(
Expand Down Expand Up @@ -550,6 +564,24 @@ RMW_Connext_WaitSet::wait(
return RMW_RET_OK;
}

void
RMW_Connext_StatusCondition::on_inconsistent_topic(
const struct DDS_InconsistentTopicStatus * status)
{
update_state(
[this, status]() {
this->update_status_inconsistent_topic(status);
}, true /* notify */);
}

void
RMW_Connext_StatusCondition::update_status_inconsistent_topic(
const struct DDS_InconsistentTopicStatus * status)
{
this->status_inconsistent_topic = *status;
this->triggered_inconsistent_topic = true;
}

rmw_ret_t
RMW_Connext_SubscriberStatusCondition::install(
RMW_Connext_Subscriber * const sub)
Expand Down Expand Up @@ -586,6 +618,17 @@ RMW_Connext_SubscriberStatusCondition::install(
return RMW_RET_ERROR;
}

struct DDS_TopicListener topic_listener = DDS_TopicListener_INITIALIZER;
topic_listener.on_inconsistent_topic = RMW_Connext_TopicListener_on_inconsistent_topic;
topic_listener.as_listener.listener_data = this;

if (DDS_RETCODE_OK !=
DDS_Topic_set_listener(sub->topic(), &topic_listener, DDS_INCONSISTENT_TOPIC_STATUS))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to set topic listener");
return RMW_RET_ERROR;
}

this->sub = sub;

return RMW_RET_OK;
Expand Down Expand Up @@ -649,6 +692,10 @@ RMW_Connext_SubscriberStatusCondition::has_status(
{
return this->triggered_sample_lost;
}
case RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE:
{
return this->triggered_inconsistent_topic;
}
default:
{
RMW_CONNEXT_ASSERT(0)
Expand Down Expand Up @@ -773,6 +820,23 @@ RMW_Connext_PublisherStatusCondition::install(
return RMW_RET_ERROR;
}

DDS_Topic * topic = DDS_DataWriter_get_topic(pub->writer());
if (topic == nullptr) {
RMW_CONNEXT_LOG_ERROR_SET("failed to get topic associated with data writer");
return RMW_RET_ERROR;
}

struct DDS_TopicListener topic_listener = DDS_TopicListener_INITIALIZER;
topic_listener.on_inconsistent_topic = RMW_Connext_TopicListener_on_inconsistent_topic;
topic_listener.as_listener.listener_data = this;

if (DDS_RETCODE_OK !=
DDS_Topic_set_listener(topic, &topic_listener, DDS_INCONSISTENT_TOPIC_STATUS))
{
RMW_CONNEXT_LOG_ERROR_SET("failed to set topic listener");
return RMW_RET_ERROR;
}

this->pub = pub;

return RMW_RET_OK;
Expand Down Expand Up @@ -806,6 +870,10 @@ RMW_Connext_PublisherStatusCondition::has_status(
{
return this->triggered_qos;
}
case RMW_EVENT_PUBLISHER_INCOMPATIBLE_TYPE:
{
return this->triggered_inconsistent_topic;
}
default:
RMW_CONNEXT_ASSERT(0)
return false;
Expand Down

0 comments on commit ecf64a0

Please sign in to comment.