Skip to content

Commit

Permalink
Implement matched event (#101)
Browse files Browse the repository at this point in the history
Signed-off-by: Barry Xu <barry.xu@sony.com>
  • Loading branch information
Barry-Xu-2018 authored Mar 22, 2023
1 parent ecf64a0 commit 364c44c
Show file tree
Hide file tree
Showing 3 changed files with 184 additions and 2 deletions.
66 changes: 66 additions & 0 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_waitset_std.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,12 @@ RMW_Connext_DataWriterListener_liveliness_lost(
DDS_DataWriter * writer,
const struct DDS_LivelinessLostStatus * status);

void
RMW_Connext_DataWriterListener_matched(
void * listener_data,
DDS_DataWriter * writer,
const struct DDS_PublicationMatchedStatus * status);

class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition
{
public:
Expand Down Expand Up @@ -429,6 +435,10 @@ class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition
on_liveliness_lost(
const DDS_LivelinessLostStatus * const status);

void
on_matched(
const DDS_PublicationMatchedStatus * const status);

// Helper functions to retrieve status information
inline rmw_ret_t
get_liveliness_lost_status(rmw_liveliness_lost_status_t * const status)
Expand Down Expand Up @@ -485,6 +495,27 @@ class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition
return RMW_RET_OK;
}

inline rmw_ret_t
get_matched_status(
rmw_matched_status_t * const status)
{
update_state(
[this, status]() {
this->triggered_matched = false;

status->total_count = this->status_matched.total_count;
status->total_count_change = this->status_matched.total_count_change;
status->current_count = this->status_matched.current_count;
status->current_count_change = this->status_matched.current_count_change;

this->status_matched.total_count_change = 0;
this->status_matched.current_count_change = 0;
this->status_matched_last = this->status_matched;
}, false /* notify */);

return RMW_RET_OK;
}

protected:
void update_status_deadline(
const DDS_OfferedDeadlineMissedStatus * const status);
Expand All @@ -495,17 +526,23 @@ class RMW_Connext_PublisherStatusCondition : public RMW_Connext_StatusCondition
void update_status_qos(
const DDS_OfferedIncompatibleQosStatus * const status);

void update_status_matched(
const DDS_PublicationMatchedStatus * const status);

bool triggered_deadline{false};
bool triggered_liveliness{false};
bool triggered_qos{false};
bool triggered_matched{false};

DDS_OfferedDeadlineMissedStatus status_deadline;
DDS_OfferedIncompatibleQosStatus status_qos;
DDS_LivelinessLostStatus status_liveliness;
DDS_PublicationMatchedStatus status_matched;

DDS_OfferedDeadlineMissedStatus status_deadline_last;
DDS_OfferedIncompatibleQosStatus status_qos_last;
DDS_LivelinessLostStatus status_liveliness_last;
DDS_PublicationMatchedStatus status_matched_last;

RMW_Connext_Publisher * pub;
};
Expand Down Expand Up @@ -606,6 +643,9 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
void
on_sample_lost(const DDS_SampleLostStatus * const status);

void
on_matched(const DDS_SubscriptionMatchedStatus * const status);

const bool ignore_local;
const DDS_InstanceHandle_t participant_handle;

Expand Down Expand Up @@ -739,6 +779,26 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
return RMW_RET_OK;
}

inline rmw_ret_t
get_matched_status(rmw_matched_status_t * const status)
{
update_state(
[this, status]() {
this->triggered_matched = false;

status->total_count = static_cast<size_t>(this->status_matched.total_count);
status->total_count_change = static_cast<size_t>(this->status_matched.total_count_change);
status->current_count = static_cast<size_t>(this->status_matched.current_count);
status->current_count_change = this->status_matched.current_count_change;

this->status_matched.total_count_change = 0;
this->status_matched.current_count_change = 0;
this->status_matched_last = this->status_matched;
}, false /* notify */);

return RMW_RET_OK;
}

protected:
void update_status_deadline(
const DDS_RequestedDeadlineMissedStatus * const status);
Expand All @@ -752,23 +812,29 @@ class RMW_Connext_SubscriberStatusCondition : public RMW_Connext_StatusCondition
void update_status_sample_lost(
const DDS_SampleLostStatus * const status);

void update_status_matched(
const DDS_SubscriptionMatchedStatus * const status);

DDS_GuardCondition * const loan_guard_condition;

bool triggered_deadline{false};
bool triggered_liveliness{false};
bool triggered_qos{false};
bool triggered_sample_lost{false};
bool triggered_matched{false};
bool triggered_data{false};

DDS_RequestedDeadlineMissedStatus status_deadline;
DDS_RequestedIncompatibleQosStatus status_qos;
DDS_LivelinessChangedStatus status_liveliness;
DDS_SampleLostStatus status_sample_lost;
DDS_SubscriptionMatchedStatus status_matched;

DDS_RequestedDeadlineMissedStatus status_deadline_last;
DDS_RequestedIncompatibleQosStatus status_qos_last;
DDS_LivelinessChangedStatus status_liveliness_last;
DDS_SampleLostStatus status_sample_lost_last;
DDS_SubscriptionMatchedStatus status_matched_last;

RMW_Connext_Subscriber * sub;

Expand Down
25 changes: 25 additions & 0 deletions rmw_connextdds_common/src/common/rmw_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3079,6 +3079,14 @@ ros_event_to_dds(const rmw_event_type_t ros, bool * const invalid)
{
return DDS_INCONSISTENT_TOPIC_STATUS;
}
case RMW_EVENT_PUBLICATION_MATCHED:
{
return DDS_PUBLICATION_MATCHED_STATUS;
}
case RMW_EVENT_SUBSCRIPTION_MATCHED:
{
return DDS_SUBSCRIPTION_MATCHED_STATUS;
}
default:
{
if (nullptr != invalid) {
Expand Down Expand Up @@ -3141,6 +3149,7 @@ ros_event_for_reader(const rmw_event_type_t ros)
case RMW_EVENT_REQUESTED_QOS_INCOMPATIBLE:
case RMW_EVENT_MESSAGE_LOST:
case RMW_EVENT_SUBSCRIPTION_INCOMPATIBLE_TYPE:
case RMW_EVENT_SUBSCRIPTION_MATCHED:
{
return true;
}
Expand Down Expand Up @@ -3198,6 +3207,14 @@ RMW_Connext_SubscriberStatusCondition::get_status(
rc = this->get_incompatible_type_status(status);
break;
}
case RMW_EVENT_SUBSCRIPTION_MATCHED:
{
rmw_matched_status_t * const status =
reinterpret_cast<rmw_matched_status_t *>(event_info);

rc = this->get_matched_status(status);
break;
}
default:
{
RMW_CONNEXT_LOG_ERROR_A_SET(
Expand Down Expand Up @@ -3249,6 +3266,14 @@ RMW_Connext_PublisherStatusCondition::get_status(
rc = this->get_incompatible_type_status(status);
break;
}
case RMW_EVENT_PUBLICATION_MATCHED:
{
rmw_matched_status_t * status =
reinterpret_cast<rmw_matched_status_t *>(event_info);

rc = this->get_matched_status(status);
break;
}
default:
{
RMW_CONNEXT_LOG_ERROR_A_SET("unsupported publisher qos: %d", event_type)
Expand Down
95 changes: 93 additions & 2 deletions rmw_connextdds_common/src/common/rmw_impl_waitset_std.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,20 @@ RMW_Connext_DataReaderListener_sample_lost(
self->on_sample_lost(status);
}

void
RMW_Connext_DataReaderListener_matched(
void * listener_data,
DDS_DataReader * reader,
const struct DDS_SubscriptionMatchedStatus * status)
{
RMW_Connext_SubscriberStatusCondition * const self =
reinterpret_cast<RMW_Connext_SubscriberStatusCondition *>(listener_data);

UNUSED_ARG(reader);

self->on_matched(status);
}

void
RMW_Connext_DataReaderListener_on_data_available(
void * listener_data,
Expand Down Expand Up @@ -180,6 +194,19 @@ RMW_Connext_TopicListener_on_inconsistent_topic(
self->on_inconsistent_topic(status);
}

void
RMW_Connext_DataWriterListener_matched(
void * listener_data,
DDS_DataWriter * writer,
const struct DDS_PublicationMatchedStatus * status)
{
RMW_Connext_PublisherStatusCondition * const self =
reinterpret_cast<RMW_Connext_PublisherStatusCondition *>(listener_data);

UNUSED_ARG(writer);

self->on_matched(status);
}

bool
RMW_Connext_WaitSet::on_condition_active(
Expand Down Expand Up @@ -599,13 +626,16 @@ RMW_Connext_SubscriberStatusCondition::install(
RMW_Connext_DataReaderListener_sample_lost;
listener.on_data_available =
RMW_Connext_DataReaderListener_on_data_available;
listener.on_subscription_matched =
RMW_Connext_DataReaderListener_matched;
listener.as_listener.listener_data = this;

listener_mask =
DDS_REQUESTED_DEADLINE_MISSED_STATUS |
DDS_REQUESTED_INCOMPATIBLE_QOS_STATUS |
DDS_LIVELINESS_CHANGED_STATUS |
DDS_SAMPLE_LOST_STATUS |
DDS_SUBSCRIPTION_MATCHED_STATUS |
DDS_DATA_AVAILABLE_STATUS;

rmw_connextdds_configure_subscriber_condition_listener(
Expand Down Expand Up @@ -650,10 +680,12 @@ RMW_Connext_SubscriberStatusCondition::RMW_Connext_SubscriberStatusCondition(
status_qos(DDS_RequestedIncompatibleQosStatus_INITIALIZER),
status_liveliness(DDS_LivelinessChangedStatus_INITIALIZER),
status_sample_lost(DDS_SampleLostStatus_INITIALIZER),
status_matched(DDS_SubscriptionMatchedStatus_INITIALIZER),
status_deadline_last(DDS_RequestedDeadlineMissedStatus_INITIALIZER),
status_qos_last(DDS_RequestedIncompatibleQosStatus_INITIALIZER),
status_liveliness_last(DDS_LivelinessChangedStatus_INITIALIZER),
status_sample_lost_last(DDS_SampleLostStatus_INITIALIZER),
status_matched_last(DDS_SubscriptionMatchedStatus_INITIALIZER),
sub(nullptr)
{
if (internal && nullptr == this->loan_guard_condition) {
Expand Down Expand Up @@ -696,6 +728,10 @@ RMW_Connext_SubscriberStatusCondition::has_status(
{
return this->triggered_inconsistent_topic;
}
case RMW_EVENT_SUBSCRIPTION_MATCHED:
{
return this->triggered_matched;
}
default:
{
RMW_CONNEXT_ASSERT(0)
Expand Down Expand Up @@ -744,6 +780,16 @@ RMW_Connext_SubscriberStatusCondition::on_sample_lost(
}, true /* notify */);
}

void
RMW_Connext_SubscriberStatusCondition::on_matched(
const DDS_SubscriptionMatchedStatus * const status)
{
update_state(
[this, status]() {
this->update_status_matched(status);
}, true /* notify */);
}

void
RMW_Connext_SubscriberStatusCondition::update_status_deadline(
const DDS_RequestedDeadlineMissedStatus * const status)
Expand Down Expand Up @@ -792,6 +838,19 @@ RMW_Connext_SubscriberStatusCondition::update_status_sample_lost(
this->status_sample_lost_last.total_count;
}

void
RMW_Connext_SubscriberStatusCondition::update_status_matched(
const DDS_SubscriptionMatchedStatus * const status)
{
this->status_matched = *status;
this->triggered_matched = true;

this->status_matched.total_count_change =
this->status_matched.total_count - this->status_matched_last.total_count;
this->status_matched.current_count_change =
this->status_matched.current_count - this->status_matched_last.current_count;
}

rmw_ret_t
RMW_Connext_PublisherStatusCondition::install(
RMW_Connext_Publisher * const pub)
Expand All @@ -805,12 +864,15 @@ RMW_Connext_PublisherStatusCondition::install(
RMW_Connext_DataWriterListener_offered_incompatible_qos;
listener.on_liveliness_lost =
RMW_Connext_DataWriterListener_liveliness_lost;
listener.on_publication_matched =
RMW_Connext_DataWriterListener_matched;
listener.as_listener.listener_data = this;

listener_mask =
DDS_OFFERED_DEADLINE_MISSED_STATUS |
DDS_OFFERED_INCOMPATIBLE_QOS_STATUS |
DDS_LIVELINESS_LOST_STATUS;
DDS_LIVELINESS_LOST_STATUS |
DDS_PUBLICATION_MATCHED_STATUS;

if (DDS_RETCODE_OK !=
DDS_DataWriter_set_listener(
Expand Down Expand Up @@ -848,9 +910,11 @@ RMW_Connext_PublisherStatusCondition::RMW_Connext_PublisherStatusCondition(
status_deadline(DDS_OfferedDeadlineMissedStatus_INITIALIZER),
status_qos(DDS_OfferedIncompatibleQosStatus_INITIALIZER),
status_liveliness(DDS_LivelinessLostStatus_INITIALIZER),
status_matched(DDS_PublicationMatchedStatus_INITIALIZER),
status_deadline_last(DDS_OfferedDeadlineMissedStatus_INITIALIZER),
status_qos_last(DDS_OfferedIncompatibleQosStatus_INITIALIZER),
status_liveliness_last(DDS_LivelinessLostStatus_INITIALIZER)
status_liveliness_last(DDS_LivelinessLostStatus_INITIALIZER),
status_matched_last(DDS_PublicationMatchedStatus_INITIALIZER)
{}

bool
Expand All @@ -874,6 +938,10 @@ RMW_Connext_PublisherStatusCondition::has_status(
{
return this->triggered_inconsistent_topic;
}
case RMW_EVENT_PUBLICATION_MATCHED:
{
return this->triggered_matched;
}
default:
RMW_CONNEXT_ASSERT(0)
return false;
Expand Down Expand Up @@ -911,6 +979,16 @@ RMW_Connext_PublisherStatusCondition::on_liveliness_lost(
}, true /* notify */);
}

void
RMW_Connext_PublisherStatusCondition::on_matched(
const DDS_PublicationMatchedStatus * const status)
{
update_state(
[this, status]() {
this->update_status_matched(status);
}, true /* notify */);
}

void
RMW_Connext_PublisherStatusCondition::update_status_deadline(
const DDS_OfferedDeadlineMissedStatus * const status)
Expand Down Expand Up @@ -943,3 +1021,16 @@ RMW_Connext_PublisherStatusCondition::update_status_qos(
this->status_qos.total_count_change = this->status_qos.total_count;
this->status_qos.total_count_change -= this->status_qos_last.total_count;
}

void
RMW_Connext_PublisherStatusCondition::update_status_matched(
const DDS_PublicationMatchedStatus * const status)
{
this->status_matched = *status;
this->triggered_matched = true;

this->status_matched.total_count_change =
this->status_matched.total_count - this->status_matched_last.total_count;
this->status_matched.current_count_change =
this->status_matched.current_count - this->status_matched_last.current_count;
}

0 comments on commit 364c44c

Please sign in to comment.