Skip to content

Commit

Permalink
Refs #2635. Removed invalid heartbeats.
Browse files Browse the repository at this point in the history
  • Loading branch information
richiware committed Mar 9, 2018
1 parent b87400e commit 3514b09
Show file tree
Hide file tree
Showing 17 changed files with 107 additions and 253 deletions.
17 changes: 12 additions & 5 deletions include/fastrtps/rtps/attributes/ReaderAttributes.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,22 @@ class ReaderAttributes
class RemoteWriterAttributes
{
public:
RemoteWriterAttributes()
RemoteWriterAttributes() : livelinessLeaseDuration(c_TimeInfinite), ownershipStrength(0),
is_eprosima_endpoint(true)
{
endpoint.endpointKind = WRITER;
livelinessLeaseDuration = c_TimeInfinite;
ownershipStrength = 0;
};
}

RemoteWriterAttributes(const VendorId_t& vendor_id) : livelinessLeaseDuration(c_TimeInfinite), ownershipStrength(0),
is_eprosima_endpoint(vendor_id == c_VendorId_eProsima)
{
endpoint.endpointKind = WRITER;
}

virtual ~RemoteWriterAttributes()
{

};
}

//!Attributes of the associated endpoint.
EndpointAttributes endpoint;
Expand All @@ -104,6 +109,8 @@ class RemoteWriterAttributes

//!Ownership Strength of the associated writer.
uint16_t ownershipStrength;

bool is_eprosima_endpoint;
};
}
}
Expand Down
11 changes: 10 additions & 1 deletion include/fastrtps/rtps/attributes/WriterAttributes.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,14 @@ class RemoteReaderAttributes
{
public:

RemoteReaderAttributes() : expectsInlineQos(false)
RemoteReaderAttributes() : expectsInlineQos(false),
is_eprosima_endpoint(true)
{
endpoint.endpointKind = READER;
}

RemoteReaderAttributes(const VendorId_t& vendor_id) : expectsInlineQos(false),
is_eprosima_endpoint(vendor_id == c_VendorId_eProsima)
{
endpoint.endpointKind = READER;
}
Expand All @@ -122,6 +129,8 @@ class RemoteReaderAttributes

//!Expects inline QOS.
bool expectsInlineQos;

bool is_eprosima_endpoint;
};
}
}
Expand Down
3 changes: 0 additions & 3 deletions include/fastrtps/rtps/writer/ReaderProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ namespace eprosima
class StatefulWriter;
class NackResponseDelay;
class NackSupressionDuration;
class InitialHeartbeat;


/**
Expand Down Expand Up @@ -166,8 +165,6 @@ namespace eprosima
NackResponseDelay* mp_nackResponse;
//! Timed Event to manage the delay to mark a change as UNACKED after sending it.
NackSupressionDuration* mp_nackSupression;
//! Timed Event to send initial heartbeat.
InitialHeartbeat* mp_initialHeartbeat;
//! Last ack/nack count
uint32_t m_lastAcknackCount;

Expand Down
5 changes: 3 additions & 2 deletions include/fastrtps/rtps/writer/StatefulWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,16 @@ namespace eprosima
* @brief Sends a heartbeat to a remote reader.
* @remarks This function is non thread-safe.
*/
void send_heartbeat_to_nts(ReaderProxy& remoteReaderProxy, bool final = false);
void send_heartbeat_to_nts(ReaderProxy& remoteReaderProxy, bool final = false,
bool send_empty_history_info = false);

void process_acknack(const GUID_t reader_guid, uint32_t ack_count,
const SequenceNumberSet_t& sn_set, bool final_flag);

private:

void send_heartbeat_nts_(const std::vector<GUID_t>& remote_readers, const LocatorList_t& locators,
RTPSMessageGroup& message_group, bool final = false);
RTPSMessageGroup& message_group, bool final = false, bool send_empty_history_info = false);

void check_acked_status();

Expand Down
69 changes: 0 additions & 69 deletions include/fastrtps/rtps/writer/timedevent/InitialHeartbeat.h

This file was deleted.

1 change: 0 additions & 1 deletion src/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ set(${PROJECT_NAME}_source_files
rtps/writer/ReaderProxy.cpp
rtps/writer/StatelessWriter.cpp
rtps/writer/ReaderLocator.cpp
rtps/writer/timedevent/InitialHeartbeat.cpp
rtps/writer/timedevent/PeriodicHeartbeat.cpp
rtps/writer/timedevent/NackResponseDelay.cpp
rtps/writer/timedevent/NackSupressionDuration.cpp
Expand Down
8 changes: 4 additions & 4 deletions src/cpp/rtps/builtin/discovery/endpoint/EDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,7 @@ void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata)
if(auxendp!=0 && mp_PubReader.first!=nullptr) //Exist Pub Writer and i have pub reader
{
logInfo(RTPS_EDP,"Adding SEDP Pub Writer to my Pub Reader");
RemoteWriterAttributes watt;
RemoteWriterAttributes watt(pdata.m_VendorId);
watt.guid.guidPrefix = pdata.m_guid.guidPrefix;
watt.guid.entityId = c_EntityId_SEDPPubWriter;
watt.endpoint.unicastLocatorList = pdata.m_metatrafficUnicastLocatorList;
Expand All @@ -435,7 +435,7 @@ void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata)
if(auxendp!=0 && mp_PubWriter.first!=nullptr) //Exist Pub Detector
{
logInfo(RTPS_EDP,"Adding SEDP Pub Reader to my Pub Writer");
RemoteReaderAttributes ratt;
RemoteReaderAttributes ratt(pdata.m_VendorId);
ratt.expectsInlineQos = false;
ratt.guid.guidPrefix = pdata.m_guid.guidPrefix;
ratt.guid.entityId = c_EntityId_SEDPPubReader;
Expand All @@ -452,7 +452,7 @@ void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata)
if(auxendp!=0 && mp_SubReader.first!=nullptr) //Exist Pub Announcer
{
logInfo(RTPS_EDP,"Adding SEDP Sub Writer to my Sub Reader");
RemoteWriterAttributes watt;
RemoteWriterAttributes watt(pdata.m_VendorId);
watt.guid.guidPrefix = pdata.m_guid.guidPrefix;
watt.guid.entityId = c_EntityId_SEDPSubWriter;
watt.endpoint.unicastLocatorList = pdata.m_metatrafficUnicastLocatorList;
Expand All @@ -468,7 +468,7 @@ void EDPSimple::assignRemoteEndpoints(const ParticipantProxyData& pdata)
if(auxendp!=0 && mp_SubWriter.first!=nullptr) //Exist Pub Announcer
{
logInfo(RTPS_EDP,"Adding SEDP Sub Reader to my Sub Writer");
RemoteReaderAttributes ratt;
RemoteReaderAttributes ratt(pdata.m_VendorId);
ratt.expectsInlineQos = false;
ratt.guid.guidPrefix = pdata.m_guid.guidPrefix;
ratt.guid.entityId = c_EntityId_SEDPSubReader;
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/builtin/discovery/participant/PDPSimple.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ void PDPSimple::assignRemoteEndpoints(ParticipantProxyData* pdata)
auxendp &=DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER;
if(auxendp!=0)
{
RemoteWriterAttributes watt;
RemoteWriterAttributes watt(pdata->m_VendorId);
watt.guid.guidPrefix = pdata->m_guid.guidPrefix;
watt.guid.entityId = c_EntityId_SPDPWriter;
watt.endpoint.unicastLocatorList = pdata->m_metatrafficUnicastLocatorList;
Expand All @@ -588,7 +588,7 @@ void PDPSimple::assignRemoteEndpoints(ParticipantProxyData* pdata)
auxendp &=DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR;
if(auxendp!=0)
{
RemoteReaderAttributes ratt;
RemoteReaderAttributes ratt(pdata->m_VendorId);
ratt.expectsInlineQos = false;
ratt.guid.guidPrefix = pdata->m_guid.guidPrefix;
ratt.guid.entityId = c_EntityId_SPDPReader;
Expand Down
4 changes: 2 additions & 2 deletions src/cpp/rtps/builtin/liveliness/WLP.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ bool WLP::assignRemoteEndpoints(const ParticipantProxyData& pdata)
if((auxendp!=0 || partdet!=0) && this->mp_builtinReader!=nullptr)
{
logInfo(RTPS_LIVELINESS,"Adding remote writer to my local Builtin Reader");
RemoteWriterAttributes watt;
RemoteWriterAttributes watt(pdata.m_VendorId);
watt.guid.guidPrefix = pdata.m_guid.guidPrefix;
watt.guid.entityId = c_EntityId_WriterLiveliness;
watt.endpoint.unicastLocatorList = pdata.m_metatrafficUnicastLocatorList;
Expand All @@ -179,7 +179,7 @@ bool WLP::assignRemoteEndpoints(const ParticipantProxyData& pdata)
if((auxendp!=0 || partdet!=0) && this->mp_builtinWriter!=nullptr)
{
logInfo(RTPS_LIVELINESS,"Adding remote reader to my local Builtin Writer");
RemoteReaderAttributes ratt;
RemoteReaderAttributes ratt(pdata.m_VendorId);
ratt.expectsInlineQos = false;
ratt.guid.guidPrefix = pdata.m_guid.guidPrefix;
ratt.guid.entityId = c_EntityId_ReaderLiveliness;
Expand Down
3 changes: 2 additions & 1 deletion src/cpp/rtps/messages/MessageReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -825,7 +825,8 @@ bool MessageReceiver::proc_Submsg_Heartbeat(CDRMessage_t* msg,SubmessageHeader_t
CDRMessage::readSequenceNumber(msg,&lastSN);
if(lastSN < firstSN && lastSN != SequenceNumber_t(0, 0))
{
logInfo(RTPS_MSG_IN,IDSTRING"HB Received with lastSN < firstSN, ignoring");
logWarning(RTPS_MSG_IN, IDSTRING"Invalid Heartbeat received (" << firstSN << ") - (" <<
lastSN << "), ignoring");
return false;
}
uint32_t HBCount;
Expand Down
8 changes: 4 additions & 4 deletions src/cpp/rtps/security/SecurityManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1469,7 +1469,7 @@ void SecurityManager::match_builtin_endpoints(const ParticipantProxyData& partic
if(participant_stateless_message_reader_ != nullptr &&
builtin_endpoints & BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_WRITER)
{
RemoteWriterAttributes watt;
RemoteWriterAttributes watt(participant_data.m_VendorId);
watt.guid.guidPrefix = participant_data.m_guid.guidPrefix;
watt.guid.entityId = participant_stateless_message_writer_entity_id;
watt.endpoint.unicastLocatorList = participant_data.m_metatrafficUnicastLocatorList;
Expand All @@ -1480,7 +1480,7 @@ void SecurityManager::match_builtin_endpoints(const ParticipantProxyData& partic
if(participant_stateless_message_writer_ != nullptr &&
builtin_endpoints & BUILTIN_ENDPOINT_PARTICIPANT_STATELESS_MESSAGE_READER)
{
RemoteReaderAttributes ratt;
RemoteReaderAttributes ratt(participant_data.m_VendorId);
ratt.expectsInlineQos = false;
ratt.guid.guidPrefix = participant_data.m_guid.guidPrefix;
ratt.guid.entityId = participant_stateless_message_reader_entity_id;
Expand All @@ -1492,7 +1492,7 @@ void SecurityManager::match_builtin_endpoints(const ParticipantProxyData& partic
if(participant_volatile_message_secure_reader_ != nullptr &&
builtin_endpoints & BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER)
{
RemoteWriterAttributes watt;
RemoteWriterAttributes watt(participant_data.m_VendorId);
watt.guid.guidPrefix = participant_data.m_guid.guidPrefix;
watt.guid.entityId = participant_volatile_message_secure_writer_entity_id;
watt.endpoint.unicastLocatorList = participant_data.m_metatrafficUnicastLocatorList;
Expand All @@ -1504,7 +1504,7 @@ void SecurityManager::match_builtin_endpoints(const ParticipantProxyData& partic
if(participant_volatile_message_secure_writer_ != nullptr &&
builtin_endpoints & BUILTIN_ENDPOINT_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER)
{
RemoteReaderAttributes ratt;
RemoteReaderAttributes ratt(participant_data.m_VendorId);
ratt.expectsInlineQos = false;
ratt.guid.guidPrefix = participant_data.m_guid.guidPrefix;
ratt.guid.entityId = participant_volatile_message_secure_reader_entity_id;
Expand Down
10 changes: 1 addition & 9 deletions src/cpp/rtps/writer/ReaderProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#include <fastrtps/utils/TimeConversion.h>
#include <fastrtps/rtps/writer/timedevent/NackResponseDelay.h>
#include <fastrtps/rtps/writer/timedevent/NackSupressionDuration.h>
#include <fastrtps/rtps/writer/timedevent/InitialHeartbeat.h>
#include <fastrtps/log/Log.h>
#include <fastrtps/rtps/resources/AsyncWriterThread.h>
#include <fastrtps/rtps/history/WriterHistory.h>
Expand All @@ -37,14 +36,13 @@ using namespace eprosima::fastrtps::rtps;

ReaderProxy::ReaderProxy(const RemoteReaderAttributes& rdata,const WriterTimes& times,StatefulWriter* SW) :
m_att(rdata), mp_SFW(SW),
mp_nackResponse(nullptr), mp_nackSupression(nullptr), mp_initialHeartbeat(nullptr), m_lastAcknackCount(0),
mp_nackResponse(nullptr), mp_nackSupression(nullptr), m_lastAcknackCount(0),
mp_mutex(new std::recursive_mutex()), lastNackfragCount_(0)
{
if(rdata.endpoint.reliabilityKind == RELIABLE)
{
mp_nackResponse = new NackResponseDelay(this,TimeConv::Time_t2MilliSecondsDouble(times.nackResponseDelay));
mp_nackSupression = new NackSupressionDuration(this,TimeConv::Time_t2MilliSecondsDouble(times.nackSupressionDuration));
mp_initialHeartbeat = new InitialHeartbeat(this, TimeConv::Time_t2MilliSecondsDouble(times.initialHeartbeatDelay));
}

logInfo(RTPS_WRITER,"Reader Proxy created");
Expand All @@ -70,12 +68,6 @@ void ReaderProxy::destroy_timers()
delete(mp_nackSupression);
mp_nackSupression = nullptr;
}

if(mp_initialHeartbeat != nullptr)
{
delete(mp_initialHeartbeat);
mp_initialHeartbeat = nullptr;
}
}

void ReaderProxy::addChange(const ChangeForReader_t& change)
Expand Down
Loading

0 comments on commit 3514b09

Please sign in to comment.