diff --git a/src/core/ddsc/src/dds__entity.h b/src/core/ddsc/src/dds__entity.h index c94ff3e4cc..ef4b6dbb14 100644 --- a/src/core/ddsc/src/dds__entity.h +++ b/src/core/ddsc/src/dds__entity.h @@ -71,7 +71,7 @@ DDS_EXPORT inline bool dds_entity_is_enabled (const dds_entity *e) { return (e->m_flags & DDS_ENTITY_ENABLED) != 0; } -DDS_EXPORT void dds_entity_status_set (dds_entity *e, status_mask_t t); +DDS_EXPORT bool dds_entity_status_set (dds_entity *e, status_mask_t t) ddsrt_attribute_warn_unused_result; DDS_EXPORT void dds_entity_trigger_set (dds_entity *e, uint32_t t); DDS_EXPORT inline void dds_entity_status_reset (dds_entity *e, status_mask_t t) { @@ -82,6 +82,8 @@ DDS_EXPORT inline dds_entity_kind_t dds_entity_kind (const dds_entity *e) { return e->m_kind; } +DDS_EXPORT void dds_entity_observers_signal (dds_entity *observed, uint32_t status); + DDS_EXPORT void dds_entity_status_signal (dds_entity *e, uint32_t status); union dds_status_union { @@ -102,11 +104,12 @@ union dds_status_union { static void status_cb_##name_ (dds_##entity_kind_ * const e, const status_cb_data_t *data, bool enabled) \ { \ struct dds_listener const * const listener = &e->m_entity.m_listener; \ + const status_mask_t status_mask = (status_mask_t) (1u << DDS_##NAME_##_STATUS_ID); \ const bool invoke = (listener->on_##name_ != 0) && enabled; \ union dds_status_union lst; \ update_##name_ (&e->m_##name_##_status, invoke ? &lst.name_ : NULL, data); \ + bool signal = dds_entity_status_set (&e->m_entity, status_mask); \ if (invoke) { \ - dds_entity_status_reset (&e->m_entity, (status_mask_t) (1u << DDS_##NAME_##_STATUS_ID)); \ e->m_entity.m_cb_pending_count++; \ e->m_entity.m_cb_count++; \ ddsrt_mutex_unlock (&e->m_entity.m_observers_lock); \ @@ -114,8 +117,10 @@ union dds_status_union { ddsrt_mutex_lock (&e->m_entity.m_observers_lock); \ e->m_entity.m_cb_count--; \ e->m_entity.m_cb_pending_count--; \ - } else if (enabled) { \ - dds_entity_status_set (&e->m_entity, (status_mask_t) (1u << DDS_##NAME_##_STATUS_ID)); \ + signal = signal && (ddsrt_atomic_ld32 (&e->m_entity.m_status.m_status_and_mask) & status_mask); \ + } \ + if (signal) { \ + dds_entity_observers_signal (&e->m_entity, status_mask); \ } \ } diff --git a/src/core/ddsc/src/dds_entity.c b/src/core/ddsc/src/dds_entity.c index b8f973ba4b..569a9a9674 100644 --- a/src/core/ddsc/src/dds_entity.c +++ b/src/core/ddsc/src/dds_entity.c @@ -88,7 +88,6 @@ static int compare_instance_handle (const void *va, const void *vb) const ddsrt_avl_treedef_t dds_entity_children_td = DDSRT_AVL_TREEDEF_INITIALIZER (offsetof (struct dds_entity, m_avlnode_child), offsetof (struct dds_entity, m_iid), compare_instance_handle, 0); -static void dds_entity_observers_signal (dds_entity *observed, uint32_t status); static void dds_entity_observers_signal_delete (dds_entity *observed); static dds_return_t dds_delete_impl (dds_entity_t entity, enum delete_impl_state delstate); @@ -1398,7 +1397,7 @@ dds_return_t dds_entity_observer_unregister (dds_entity *observed, dds_waitset * return rc; } -static void dds_entity_observers_signal (dds_entity *observed, uint32_t status) +void dds_entity_observers_signal (dds_entity *observed, uint32_t status) { for (dds_entity_observer *idx = observed->m_observers; idx; idx = idx->m_next) idx->m_cb (idx->m_observer, observed->m_hdllink.hdl, status); @@ -1425,7 +1424,7 @@ void dds_entity_status_signal (dds_entity *e, uint32_t status) ddsrt_mutex_unlock (&e->m_observers_lock); } -void dds_entity_status_set (dds_entity *e, status_mask_t status) +bool dds_entity_status_set (dds_entity *e, status_mask_t status) { assert (entity_has_status (e)); uint32_t old, delta, new; @@ -1433,11 +1432,10 @@ void dds_entity_status_set (dds_entity *e, status_mask_t status) old = ddsrt_atomic_ld32 (&e->m_status.m_status_and_mask); delta = ((uint32_t) status & (old >> SAM_ENABLED_SHIFT)); if (delta == 0) - return; + return false; new = old | delta; } while (!ddsrt_atomic_cas32 (&e->m_status.m_status_and_mask, old, new)); - if (delta) - dds_entity_observers_signal (e, status); + return (delta != 0); } void dds_entity_trigger_set (dds_entity *e, uint32_t t) diff --git a/src/core/ddsc/src/dds_reader.c b/src/core/ddsc/src/dds_reader.c index def654c0b7..a96c4ea8a1 100644 --- a/src/core/ddsc/src/dds_reader.c +++ b/src/core/ddsc/src/dds_reader.c @@ -139,6 +139,10 @@ void dds_reader_data_available_cb (struct dds_reader *rd) struct dds_listener const * const lst = &rd->m_entity.m_listener; dds_entity * const sub = rd->m_entity.m_parent; + + bool rd_signal = dds_entity_status_set (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS); + bool sub_signal = dds_entity_status_set (sub, DDS_DATA_ON_READERS_STATUS); + if (lst->on_data_on_readers) { ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); @@ -168,17 +172,23 @@ void dds_reader_data_available_cb (struct dds_reader *rd) lst->on_data_available (rd->m_entity.m_hdllink.hdl, lst->on_data_available_arg); ddsrt_mutex_lock (&rd->m_entity.m_observers_lock); } - else + + rd->m_entity.m_cb_count--; + rd->m_entity.m_cb_pending_count--; + + if (sub_signal) { - dds_entity_status_set (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS); ddsrt_mutex_lock (&sub->m_observers_lock); - dds_entity_status_set (sub, DDS_DATA_ON_READERS_STATUS); + if (ddsrt_atomic_ld32 (&sub->m_status.m_status_and_mask) & DDS_DATA_ON_READERS_STATUS) + dds_entity_observers_signal (sub, DDS_DATA_ON_READERS_STATUS); ddsrt_mutex_unlock (&sub->m_observers_lock); } - rd->m_entity.m_cb_count--; - rd->m_entity.m_cb_pending_count--; - + if (rd_signal) + { + if (ddsrt_atomic_ld32 (&rd->m_entity.m_status.m_status_and_mask) & DDS_DATA_AVAILABLE_STATUS) + dds_entity_observers_signal (&rd->m_entity, DDS_DATA_AVAILABLE_STATUS); + } ddsrt_cond_broadcast (&rd->m_entity.m_observers_cond); ddsrt_mutex_unlock (&rd->m_entity.m_observers_lock); }