diff --git a/include/fastrtps/rtps/attributes/ReaderAttributes.h b/include/fastrtps/rtps/attributes/ReaderAttributes.h index 8201a76dea0..cb5d539e2a9 100644 --- a/include/fastrtps/rtps/attributes/ReaderAttributes.h +++ b/include/fastrtps/rtps/attributes/ReaderAttributes.h @@ -81,17 +81,22 @@ class ReaderAttributes class RemoteWriterAttributes { public: - RemoteWriterAttributes() + RemoteWriterAttributes() : livelinessLeaseDuration(c_TimeInfinite), ownershipStrength(0), + is_eprosima_endpoint(true) { endpoint.endpointKind = WRITER; - livelinessLeaseDuration = c_TimeInfinite; - ownershipStrength = 0; - }; + } + + RemoteWriterAttributes(const VendorId_t& vendor_id) : livelinessLeaseDuration(c_TimeInfinite), ownershipStrength(0), + is_eprosima_endpoint(vendor_id == c_VendorId_eProsima) + { + endpoint.endpointKind = WRITER; + } virtual ~RemoteWriterAttributes() { - }; + } //!Attributes of the associated endpoint. EndpointAttributes endpoint; @@ -104,6 +109,8 @@ class RemoteWriterAttributes //!Ownership Strength of the associated writer. uint16_t ownershipStrength; + + bool is_eprosima_endpoint; }; } } diff --git a/include/fastrtps/rtps/attributes/WriterAttributes.h b/include/fastrtps/rtps/attributes/WriterAttributes.h index 0e5a752feac..b92b44dbfe2 100644 --- a/include/fastrtps/rtps/attributes/WriterAttributes.h +++ b/include/fastrtps/rtps/attributes/WriterAttributes.h @@ -104,7 +104,14 @@ class RemoteReaderAttributes { public: - RemoteReaderAttributes() : expectsInlineQos(false) + RemoteReaderAttributes() : expectsInlineQos(false), + is_eprosima_endpoint(true) + { + endpoint.endpointKind = READER; + } + + RemoteReaderAttributes(const VendorId_t& vendor_id) : expectsInlineQos(false), + is_eprosima_endpoint(vendor_id == c_VendorId_eProsima) { endpoint.endpointKind = READER; } @@ -122,6 +129,8 @@ class RemoteReaderAttributes //!Expects inline QOS. bool expectsInlineQos; + + bool is_eprosima_endpoint; }; } } diff --git a/include/fastrtps/rtps/writer/ReaderProxy.h b/include/fastrtps/rtps/writer/ReaderProxy.h index 326cb2cbf36..1c0541f6851 100644 --- a/include/fastrtps/rtps/writer/ReaderProxy.h +++ b/include/fastrtps/rtps/writer/ReaderProxy.h @@ -41,7 +41,6 @@ namespace eprosima class StatefulWriter; class NackResponseDelay; class NackSupressionDuration; - class InitialHeartbeat; /** @@ -166,8 +165,6 @@ namespace eprosima NackResponseDelay* mp_nackResponse; //! Timed Event to manage the delay to mark a change as UNACKED after sending it. NackSupressionDuration* mp_nackSupression; - //! Timed Event to send initial heartbeat. - InitialHeartbeat* mp_initialHeartbeat; //! Last ack/nack count uint32_t m_lastAcknackCount; diff --git a/include/fastrtps/rtps/writer/StatefulWriter.h b/include/fastrtps/rtps/writer/StatefulWriter.h index 3317d36417b..2728d896c72 100644 --- a/include/fastrtps/rtps/writer/StatefulWriter.h +++ b/include/fastrtps/rtps/writer/StatefulWriter.h @@ -182,7 +182,8 @@ namespace eprosima * @brief Sends a heartbeat to a remote reader. * @remarks This function is non thread-safe. */ - void send_heartbeat_to_nts(ReaderProxy& remoteReaderProxy, bool final = false); + void send_heartbeat_to_nts(ReaderProxy& remoteReaderProxy, bool final = false, + bool send_empty_history_info = false); void process_acknack(const GUID_t reader_guid, uint32_t ack_count, const SequenceNumberSet_t& sn_set, bool final_flag); @@ -190,7 +191,7 @@ namespace eprosima private: void send_heartbeat_nts_(const std::vector& remote_readers, const LocatorList_t& locators, - RTPSMessageGroup& message_group, bool final = false); + RTPSMessageGroup& message_group, bool final = false, bool send_empty_history_info = false); void check_acked_status(); diff --git a/include/fastrtps/rtps/writer/timedevent/InitialHeartbeat.h b/include/fastrtps/rtps/writer/timedevent/InitialHeartbeat.h deleted file mode 100644 index 3f8ede74b54..00000000000 --- a/include/fastrtps/rtps/writer/timedevent/InitialHeartbeat.h +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file InitialHeartbeat.h - * - */ - -#ifndef INITIALHEARTBEAT_H_ -#define INITIALHEARTBEAT_H_ -#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC -#include -#include -#include -#include -#include - -namespace eprosima { -namespace fastrtps{ -namespace rtps{ - -class ReaderProxy; - - -/** - * InitialHeartbeat class, controls the initial send operation of HB. - * @ingroup WRITER_MODULE - */ -class InitialHeartbeat: public TimedEvent -{ - public: - /** - * - * @param p_RP - * @param interval - */ - InitialHeartbeat(ReaderProxy* rp, double interval); - virtual ~InitialHeartbeat(); - - /** - * Method invoked when the event occurs - * - * @param code Code representing the status of the event - * @param msg Message associated to the event - */ - void event(EventCode code, const char* msg= nullptr); - - //! - RTPSMessageGroup_t m_cdrmessages; - //! - ReaderProxy* rp_; -}; - -} -} -} /* namespace eprosima */ -#endif -#endif /* INITIALHEARTBEAT_H_ */ diff --git a/src/cpp/CMakeLists.txt b/src/cpp/CMakeLists.txt index a1e91fdf49f..a46b4ccf5a1 100644 --- a/src/cpp/CMakeLists.txt +++ b/src/cpp/CMakeLists.txt @@ -34,7 +34,6 @@ set(${PROJECT_NAME}_source_files rtps/writer/ReaderProxy.cpp rtps/writer/StatelessWriter.cpp rtps/writer/ReaderLocator.cpp - rtps/writer/timedevent/InitialHeartbeat.cpp rtps/writer/timedevent/PeriodicHeartbeat.cpp rtps/writer/timedevent/NackResponseDelay.cpp rtps/writer/timedevent/NackSupressionDuration.cpp diff --git a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp index bf9e6ef4bf4..062baefdbdf 100644 --- a/src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp +++ b/src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp @@ -419,7 +419,7 @@ void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata) if(auxendp!=0 && mp_PubReader.first!=nullptr) //Exist Pub Writer and i have pub reader { logInfo(RTPS_EDP,"Adding SEDP Pub Writer to my Pub Reader"); - RemoteWriterAttributes watt; + RemoteWriterAttributes watt(pdata.m_VendorId); watt.guid.guidPrefix = pdata.m_guid.guidPrefix; watt.guid.entityId = c_EntityId_SEDPPubWriter; watt.endpoint.unicastLocatorList = pdata.m_metatrafficUnicastLocatorList; @@ -435,7 +435,7 @@ void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata) if(auxendp!=0 && mp_PubWriter.first!=nullptr) //Exist Pub Detector { logInfo(RTPS_EDP,"Adding SEDP Pub Reader to my Pub Writer"); - RemoteReaderAttributes ratt; + RemoteReaderAttributes ratt(pdata.m_VendorId); ratt.expectsInlineQos = false; ratt.guid.guidPrefix = pdata.m_guid.guidPrefix; ratt.guid.entityId = c_EntityId_SEDPPubReader; @@ -452,7 +452,7 @@ void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata) if(auxendp!=0 && mp_SubReader.first!=nullptr) //Exist Pub Announcer { logInfo(RTPS_EDP,"Adding SEDP Sub Writer to my Sub Reader"); - RemoteWriterAttributes watt; + RemoteWriterAttributes watt(pdata.m_VendorId); watt.guid.guidPrefix = pdata.m_guid.guidPrefix; watt.guid.entityId = c_EntityId_SEDPSubWriter; watt.endpoint.unicastLocatorList = pdata.m_metatrafficUnicastLocatorList; @@ -468,7 +468,7 @@ void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata) if(auxendp!=0 && mp_SubWriter.first!=nullptr) //Exist Pub Announcer { logInfo(RTPS_EDP,"Adding SEDP Sub Reader to my Sub Writer"); - RemoteReaderAttributes ratt; + RemoteReaderAttributes ratt(pdata.m_VendorId); ratt.expectsInlineQos = false; ratt.guid.guidPrefix = pdata.m_guid.guidPrefix; ratt.guid.entityId = c_EntityId_SEDPSubReader; diff --git a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp index dbc21c51513..cfdbe4bf3d9 100644 --- a/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp +++ b/src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp @@ -575,7 +575,7 @@ void PDPSimple::assignRemoteEndpoints(ParticipantProxyData* pdata) auxendp &=DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER; if(auxendp!=0) { - RemoteWriterAttributes watt; + RemoteWriterAttributes watt(pdata->m_VendorId); watt.guid.guidPrefix = pdata->m_guid.guidPrefix; watt.guid.entityId = c_EntityId_SPDPWriter; watt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList; @@ -588,7 +588,7 @@ void PDPSimple::assignRemoteEndpoints(ParticipantProxyData* pdata) auxendp &=DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR; if(auxendp!=0) { - RemoteReaderAttributes ratt; + RemoteReaderAttributes ratt(pdata->m_VendorId); ratt.expectsInlineQos = false; ratt.guid.guidPrefix = pdata->m_guid.guidPrefix; ratt.guid.entityId = c_EntityId_SPDPReader; diff --git a/src/cpp/rtps/builtin/liveliness/WLP.cpp b/src/cpp/rtps/builtin/liveliness/WLP.cpp index a54726f9b03..7b04f68734f 100644 --- a/src/cpp/rtps/builtin/liveliness/WLP.cpp +++ b/src/cpp/rtps/builtin/liveliness/WLP.cpp @@ -162,7 +162,7 @@ bool WLP::assignRemoteEndpoints(const ParticipantProxyData& pdata) if((auxendp!=0 || partdet!=0) && this->mp_builtinReader!=nullptr) { logInfo(RTPS_LIVELINESS,"Adding remote writer to my local Builtin Reader"); - RemoteWriterAttributes watt; + RemoteWriterAttributes watt(pdata.m_VendorId); watt.guid.guidPrefix = pdata.m_guid.guidPrefix; watt.guid.entityId = c_EntityId_WriterLiveliness; watt.endpoint.unicastLocatorList = pdata.m_metatrafficUnicastLocatorList; @@ -179,7 +179,7 @@ bool WLP::assignRemoteEndpoints(const ParticipantProxyData& pdata) if((auxendp!=0 || partdet!=0) && this->mp_builtinWriter!=nullptr) { logInfo(RTPS_LIVELINESS,"Adding remote reader to my local Builtin Writer"); - RemoteReaderAttributes ratt; + RemoteReaderAttributes ratt(pdata.m_VendorId); ratt.expectsInlineQos = false; ratt.guid.guidPrefix = pdata.m_guid.guidPrefix; ratt.guid.entityId = c_EntityId_ReaderLiveliness; diff --git a/src/cpp/rtps/messages/MessageReceiver.cpp b/src/cpp/rtps/messages/MessageReceiver.cpp index 6755e664ea5..ea7173b0f2d 100644 --- a/src/cpp/rtps/messages/MessageReceiver.cpp +++ b/src/cpp/rtps/messages/MessageReceiver.cpp @@ -825,7 +825,8 @@ bool MessageReceiver::proc_Submsg_Heartbeat(CDRMessage_t* msg,SubmessageHeader_t CDRMessage::readSequenceNumber(msg,&lastSN); if(lastSN < firstSN && lastSN != SequenceNumber_t(0, 0)) { - logInfo(RTPS_MSG_IN,IDSTRING"HB Received with lastSN < firstSN, ignoring"); + logWarning(RTPS_MSG_IN, IDSTRING"Invalid Heartbeat received (" << firstSN << ") - (" << + lastSN << "), ignoring"); return false; } uint32_t HBCount; diff --git a/src/cpp/rtps/security/SecurityManager.cpp b/src/cpp/rtps/security/SecurityManager.cpp index 3191ac18b21..904c129ae7c 100644 --- a/src/cpp/rtps/security/SecurityManager.cpp +++ b/src/cpp/rtps/security/SecurityManager.cpp @@ -1469,7 +1469,7 @@ void SecurityManager::match_builtin_endpoints(const ParticipantProxyData& partic if(participant_stateless_message_reader_ != nullptr && builtin_endpoints & BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_WRITER) { - RemoteWriterAttributes watt; + RemoteWriterAttributes watt(participant_data.m_VendorId); watt.guid.guidPrefix = participant_data.m_guid.guidPrefix; watt.guid.entityId = participant_stateless_message_writer_entity_id; watt.endpoint.unicastLocatorList = participant_data.m_metatrafficUnicastLocatorList; @@ -1480,7 +1480,7 @@ void SecurityManager::match_builtin_endpoints(const ParticipantProxyData& partic if(participant_stateless_message_writer_ != nullptr && builtin_endpoints & BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_READER) { - RemoteReaderAttributes ratt; + RemoteReaderAttributes ratt(participant_data.m_VendorId); ratt.expectsInlineQos = false; ratt.guid.guidPrefix = participant_data.m_guid.guidPrefix; ratt.guid.entityId = participant_stateless_message_reader_entity_id; @@ -1492,7 +1492,7 @@ void SecurityManager::match_builtin_endpoints(const ParticipantProxyData& partic if(participant_volatile_message_secure_reader_ != nullptr && builtin_endpoints & BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER) { - RemoteWriterAttributes watt; + RemoteWriterAttributes watt(participant_data.m_VendorId); watt.guid.guidPrefix = participant_data.m_guid.guidPrefix; watt.guid.entityId = participant_volatile_message_secure_writer_entity_id; watt.endpoint.unicastLocatorList = participant_data.m_metatrafficUnicastLocatorList; @@ -1504,7 +1504,7 @@ void SecurityManager::match_builtin_endpoints(const ParticipantProxyData& partic if(participant_volatile_message_secure_writer_ != nullptr && builtin_endpoints & BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER) { - RemoteReaderAttributes ratt; + RemoteReaderAttributes ratt(participant_data.m_VendorId); ratt.expectsInlineQos = false; ratt.guid.guidPrefix = participant_data.m_guid.guidPrefix; ratt.guid.entityId = participant_volatile_message_secure_reader_entity_id; diff --git a/src/cpp/rtps/writer/ReaderProxy.cpp b/src/cpp/rtps/writer/ReaderProxy.cpp index 6395e671de2..80b302b6926 100644 --- a/src/cpp/rtps/writer/ReaderProxy.cpp +++ b/src/cpp/rtps/writer/ReaderProxy.cpp @@ -23,7 +23,6 @@ #include #include #include -#include #include #include #include @@ -37,14 +36,13 @@ using namespace eprosima::fastrtps::rtps; ReaderProxy::ReaderProxy(const RemoteReaderAttributes& rdata,const WriterTimes& times,StatefulWriter* SW) : m_att(rdata), mp_SFW(SW), - mp_nackResponse(nullptr), mp_nackSupression(nullptr), mp_initialHeartbeat(nullptr), m_lastAcknackCount(0), + mp_nackResponse(nullptr), mp_nackSupression(nullptr), m_lastAcknackCount(0), mp_mutex(new std::recursive_mutex()), lastNackfragCount_(0) { if(rdata.endpoint.reliabilityKind == RELIABLE) { mp_nackResponse = new NackResponseDelay(this,TimeConv::Time_t2MilliSecondsDouble(times.nackResponseDelay)); mp_nackSupression = new NackSupressionDuration(this,TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration)); - mp_initialHeartbeat = new InitialHeartbeat(this, TimeConv::Time_t2MilliSecondsDouble(times.initialHeartbeatDelay)); } logInfo(RTPS_WRITER,"Reader Proxy created"); @@ -70,12 +68,6 @@ void ReaderProxy::destroy_timers() delete(mp_nackSupression); mp_nackSupression = nullptr; } - - if(mp_initialHeartbeat != nullptr) - { - delete(mp_initialHeartbeat); - mp_initialHeartbeat = nullptr; - } } void ReaderProxy::addChange(const ChangeForReader_t& change) diff --git a/src/cpp/rtps/writer/StatefulWriter.cpp b/src/cpp/rtps/writer/StatefulWriter.cpp index 6b037b31ee9..8d04d17c215 100644 --- a/src/cpp/rtps/writer/StatefulWriter.cpp +++ b/src/cpp/rtps/writer/StatefulWriter.cpp @@ -30,7 +30,6 @@ #include #include #include -#include #include @@ -420,16 +419,17 @@ bool StatefulWriter::matched_reader_add(const RemoteReaderAttributes& rdata) update_cached_info_nts(std::move(allRemoteReaders), allLocatorLists); - ReaderProxy* rp = new ReaderProxy(rdata,m_times,this); + ReaderProxy* rp = new ReaderProxy(rdata, m_times, this); std::set not_relevant_changes; SequenceNumber_t current_seq = get_seq_num_min(); - SequenceNumber_t max_seq = get_seq_num_max(); + SequenceNumber_t last_seq = get_seq_num_max(); if(current_seq != SequenceNumber_t::unknown()) { - (void)max_seq; - assert(max_seq != SequenceNumber_t::unknown()); + (void)last_seq; + assert(last_seq != SequenceNumber_t::unknown()); + assert(current_seq <= last_seq); for(std::vector::iterator cit = mp_history->changesBegin(); cit != mp_history->changesEnd(); ++cit) @@ -463,29 +463,35 @@ bool StatefulWriter::matched_reader_add(const RemoteReaderAttributes& rdata) ++current_seq; } - assert(max_seq + 1 == current_seq); - } + assert(last_seq + 1 == current_seq); + + RTPSMessageGroup group(mp_RTPSParticipant, this, RTPSMessageGroup::WRITER, m_cdrmessages); + LocatorList_t locatorsList(rp->m_att.endpoint.unicastLocatorList); + locatorsList.push_back(rp->m_att.endpoint.multicastLocatorList); + + // Send initial heartbeat + send_heartbeat_nts_({rp->m_att.guid}, locatorsList, group, false); - // Send a initial heartbeat - if(rp->mp_initialHeartbeat != nullptr) // It is reliable - rp->mp_initialHeartbeat->restart_timer(); + // Send Gap + if(!not_relevant_changes.empty()) + { + group.add_gap(not_relevant_changes, {rp->m_att.guid}, locatorsList); + } - // TODO(Ricardo) In the heartbeat event? - // Send Gap - if(!not_relevant_changes.empty()) + // Always activate heartbeat period. We need a confirmation of the reader. + // The state has to be updated. + this->mp_periodicHB->restart_timer(); + } + else if(rdata.is_eprosima_endpoint) { RTPSMessageGroup group(mp_RTPSParticipant, this, RTPSMessageGroup::WRITER, m_cdrmessages); - //TODO (Ricardo) Temporal LocatorList_t locatorsList(rp->m_att.endpoint.unicastLocatorList); locatorsList.push_back(rp->m_att.endpoint.multicastLocatorList); - group.add_gap(not_relevant_changes, {rp->m_att.guid}, locatorsList); + // Send initial heartbeat + send_heartbeat_nts_({rp->m_att.guid}, locatorsList, group, false, true); } - // Always activate heartbeat period. We need a confirmation of the reader. - // The state has to be updated. - this->mp_periodicHB->restart_timer(); - matched_readers.push_back(rp); logInfo(RTPS_WRITER, "Reader Proxy "<< rp->m_att.guid<< " added to " << this->m_guid.entityId << " with " @@ -799,30 +805,39 @@ SequenceNumber_t StatefulWriter::next_sequence_number() const return mp_history->next_sequence_number(); } -void StatefulWriter::send_heartbeat_to_nts(ReaderProxy& remoteReaderProxy, bool final) +void StatefulWriter::send_heartbeat_to_nts(ReaderProxy& remoteReaderProxy, bool final, + bool send_empty_history_info) { RTPSMessageGroup group(mp_RTPSParticipant, this, RTPSMessageGroup::WRITER, m_cdrmessages); LocatorList_t locators(remoteReaderProxy.m_att.endpoint.unicastLocatorList); locators.push_back(remoteReaderProxy.m_att.endpoint.multicastLocatorList); - send_heartbeat_nts_(std::vector{remoteReaderProxy.m_att.guid}, - locators, group, final); + send_heartbeat_nts_({remoteReaderProxy.m_att.guid}, + locators, group, final, send_empty_history_info); } void StatefulWriter::send_heartbeat_nts_(const std::vector& remote_readers, const LocatorList_t &locators, - RTPSMessageGroup& message_group, bool final) + RTPSMessageGroup& message_group, bool final, bool send_empty_history_info) { SequenceNumber_t firstSeq = get_seq_num_min(); SequenceNumber_t lastSeq = get_seq_num_max(); if (firstSeq == c_SequenceNumber_Unknown || lastSeq == c_SequenceNumber_Unknown) { - firstSeq = next_sequence_number(); - lastSeq = SequenceNumber_t(0, 0); + assert(firstSeq == c_SequenceNumber_Unknown && lastSeq == c_SequenceNumber_Unknown); + + if(send_empty_history_info && remote_readers.size() == 1) + { + firstSeq = next_sequence_number(); + lastSeq = {0, 0}; + } + else + { + return; + } } else { - (void)firstSeq; assert(firstSeq <= lastSeq); } @@ -830,8 +845,7 @@ void StatefulWriter::send_heartbeat_nts_(const std::vector& remote_reade // FinalFlag is always false because this class is used only by StatefulWriter in Reliable. message_group.add_heartbeat(remote_readers, - firstSeq, lastSeq, m_heartbeatCount, final, false, locators); - + firstSeq, lastSeq, m_heartbeatCount, final, false, locators); // Update calculate of heartbeat piggyback. currentUsageSendBufferSize_ = static_cast(sendBufferSize_); @@ -852,21 +866,23 @@ void StatefulWriter::process_acknack(const GUID_t reader_guid, uint32_t ack_coun if(remote_reader->m_lastAcknackCount < ack_count) { remote_reader->m_lastAcknackCount = ack_count; - // Sequence numbers before Base are set as Acknowledged. - remote_reader->acked_changes_set(sn_set.base); - std::vector set_vec = sn_set.get_set(); - if (remote_reader->requested_changes_set(set_vec) && remote_reader->mp_nackResponse != nullptr) - { - remote_reader->mp_nackResponse->restart_timer(); - } - else if(!final_flag) + if(sn_set.base != SequenceNumber_t(0, 0)) { - if(sn_set.base == SequenceNumber_t(0, 0) && sn_set.isSetEmpty()) + // Sequence numbers before Base are set as Acknowledged. + remote_reader->acked_changes_set(sn_set.base); + std::vector set_vec = sn_set.get_set(); + if (remote_reader->requested_changes_set(set_vec) && remote_reader->mp_nackResponse != nullptr) { - send_heartbeat_to_nts(*remote_reader, true); + remote_reader->mp_nackResponse->restart_timer(); } - - mp_periodicHB->restart_timer(); + else if(!final_flag) + { + mp_periodicHB->restart_timer(); + } + } + else if(sn_set.isSetEmpty() && !final_flag) + { + send_heartbeat_to_nts(*remote_reader, true, remote_reader->m_att.is_eprosima_endpoint); } // Check if all CacheChange are acknowledge, because a user could be waiting diff --git a/src/cpp/rtps/writer/timedevent/InitialHeartbeat.cpp b/src/cpp/rtps/writer/timedevent/InitialHeartbeat.cpp deleted file mode 100644 index 2c943d77c81..00000000000 --- a/src/cpp/rtps/writer/timedevent/InitialHeartbeat.cpp +++ /dev/null @@ -1,106 +0,0 @@ -// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima). -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -/** - * @file InitialHeartbeat.cpp - * - */ - -#include -#include - -#include - -#include -#include - -#include "../../participant/RTPSParticipantImpl.h" - -#include - -#include - -namespace eprosima { -namespace fastrtps{ -namespace rtps{ - - -InitialHeartbeat::~InitialHeartbeat() -{ - logInfo(RTPS_WRITER,"Destroying InitialHB"); - destroy(); -} - -InitialHeartbeat::InitialHeartbeat(ReaderProxy* rp, double interval) : - TimedEvent(rp->mp_SFW->getRTPSParticipant()->getEventResource().getIOService(), - rp->mp_SFW->getRTPSParticipant()->getEventResource().getThread(), interval), - m_cdrmessages(rp->mp_SFW->getRTPSParticipant()->getMaxMessageSize(), - rp->mp_SFW->getRTPSParticipant()->getGuid().guidPrefix), rp_(rp) -{ -} - -void InitialHeartbeat::event(EventCode code, const char* msg) -{ - - // Unused in release mode. - (void)msg; - - if(code == EVENT_SUCCESS) - { - SequenceNumber_t firstSeq, lastSeq; - Count_t heartbeatCount = 0; - - {//BEGIN PROTECTION - std::lock_guard guard_writer(*rp_->mp_SFW->getMutex()); - - firstSeq = rp_->mp_SFW->get_seq_num_min(); - lastSeq = rp_->mp_SFW->get_seq_num_max(); - - if(firstSeq == c_SequenceNumber_Unknown || lastSeq == c_SequenceNumber_Unknown) - { - firstSeq = rp_->mp_SFW->next_sequence_number(); - lastSeq = SequenceNumber_t(0, 0); - } - else - { - (void)firstSeq; - assert(firstSeq <= lastSeq); - } - - rp_->mp_SFW->incrementHBCount(); - heartbeatCount = rp_->mp_SFW->getHeartbeatCount(); - } - - RTPSMessageGroup group(rp_->mp_SFW->getRTPSParticipant(), rp_->mp_SFW, RTPSMessageGroup::WRITER, m_cdrmessages); - - LocatorList_t locators(rp_->m_att.endpoint.unicastLocatorList); - locators.push_back(rp_->m_att.endpoint.multicastLocatorList); - - // FinalFlag is always false because this is a StatefulWriter in Reliable. - group.add_heartbeat(std::vector{rp_->m_att.guid}, firstSeq, lastSeq, heartbeatCount, false, false, locators); - logInfo(RTPS_WRITER, rp_->mp_SFW->getGuid().entityId << " Sending Heartbeat (" << firstSeq << " - " << lastSeq << ")"); - } - else if(code == EVENT_ABORT) - { - logInfo(RTPS_WRITER,"Aborted"); - } - else - { - logInfo(RTPS_WRITER,"Event message: " <next_sequence_number(); - lastSeq = SequenceNumber_t(0, 0); + return; } else { diff --git a/test/mock/rtps/ParticipantProxyData/fastrtps/rtps/builtin/data/ParticipantProxyData.h b/test/mock/rtps/ParticipantProxyData/fastrtps/rtps/builtin/data/ParticipantProxyData.h index 6db1996f985..08f9776fc63 100644 --- a/test/mock/rtps/ParticipantProxyData/fastrtps/rtps/builtin/data/ParticipantProxyData.h +++ b/test/mock/rtps/ParticipantProxyData/fastrtps/rtps/builtin/data/ParticipantProxyData.h @@ -38,6 +38,7 @@ class ParticipantProxyData LocatorList_t m_metatrafficUnicastLocatorList; LocatorList_t m_metatrafficMulticastLocatorList; IdentityToken identity_token_; + VendorId_t m_VendorId; }; } // namespace rtps diff --git a/test/unittest/rtps/resources/timedevent/mock/MockEvent.cpp b/test/unittest/rtps/resources/timedevent/mock/MockEvent.cpp index d5c246089cf..fc76edb70ae 100644 --- a/test/unittest/rtps/resources/timedevent/mock/MockEvent.cpp +++ b/test/unittest/rtps/resources/timedevent/mock/MockEvent.cpp @@ -18,7 +18,7 @@ int MockEvent::destructed_ = 0; std::mutex MockEvent::destruction_mutex_; std::condition_variable MockEvent::destruction_cond_; -MockEvent::MockEvent(asio::io_service& service, const std::thread& event_thread, double milliseconds, bool autorestart, TimedEvent::AUTODESTRUCTION_MODE autodestruction) : +MockEvent::MockEvent(asio::io_service& service, const std::thread& event_thread, double milliseconds, bool autorestart, TimedEvent::AUTODESTRUCTION_MODE autodestruction) : TimedEvent(service, event_thread, milliseconds, autodestruction), successed_(0), cancelled_(0), sem_count_(0), autorestart_(autorestart) { } @@ -42,10 +42,14 @@ void MockEvent::event(EventCode code, const char* msg) successed_.fetch_add(1, std::memory_order_relaxed); if(autorestart_) + { restart_timer(); + } } else if(code == EventCode::EVENT_ABORT) + { cancelled_.fetch_add(1, std::memory_order_relaxed); + } sem_mutex_.lock(); ++sem_count_; @@ -59,8 +63,11 @@ bool MockEvent::wait(unsigned int milliseconds) if(sem_count_ == 0) { - if(sem_cond_.wait_for(lock, std::chrono::milliseconds(milliseconds)) != std::cv_status::no_timeout) + if(!sem_cond_.wait_for(lock, std::chrono::milliseconds(milliseconds), + [&]() -> bool { return sem_count_ != 0; } )) + { return false; + } } --sem_count_;